From 993de9a38bfe19c3dc0ee406d371b4b2bb750ec6 Mon Sep 17 00:00:00 2001 From: Wojciech Kozlowski Date: Sat, 21 Sep 2024 13:43:25 +0200 Subject: [PATCH] Some clean up --- src/main.rs | 4 +- src/tui/app/machine/fetch_state.rs | 23 +- .../lib/external/musicbrainz/daemon/mod.rs | 451 +++++++++--------- src/tui/mod.rs | 2 +- 4 files changed, 243 insertions(+), 237 deletions(-) diff --git a/src/main.rs b/src/main.rs index 347d373..a307943 100644 --- a/src/main.rs +++ b/src/main.rs @@ -26,7 +26,7 @@ use musichoard::{ }; use tui::{ - App, EventChannel, EventHandler, EventListener, MusicBrainz, MusicBrainzDaemon, RequestChannel, + App, EventChannel, EventHandler, EventListener, JobChannel, MusicBrainz, MusicBrainzDaemon, Tui, Ui, }; @@ -94,7 +94,7 @@ fn with( let listener = EventListener::new(listener_sender); let handler = EventHandler::new(channel.receiver()); - let mb_request_channel = RequestChannel::new(); + let mb_request_channel = JobChannel::new(); thread::spawn(|| MusicBrainzDaemon::run(musicbrainz, mb_request_channel.receiver, app_sender)); let app = App::new(music_hoard, mb_request_channel.sender); diff --git a/src/tui/app/machine/fetch_state.rs b/src/tui/app/machine/fetch_state.rs index 8ab9532..56f80d6 100644 --- a/src/tui/app/machine/fetch_state.rs +++ b/src/tui/app/machine/fetch_state.rs @@ -1,4 +1,7 @@ -use std::{collections::VecDeque, sync::mpsc::{self, TryRecvError}}; +use std::{ + collections::VecDeque, + sync::mpsc::{self, TryRecvError}, +}; use musichoard::collection::{artist::Artist, musicbrainz::IMusicBrainzRef}; @@ -8,10 +11,7 @@ use crate::tui::{ AppPublicState, AppState, IAppEventFetch, IAppInteractFetch, MatchStateInfo, }, lib::{ - external::musicbrainz::daemon::{ - ApiParams, Job, JobInstance, JobPriority, ReturnSender, SearchArtistParams, - SearchParams, SearchReleaseGroupParams, - }, + external::musicbrainz::daemon::{ApiParams, Job, JobInstance, JobPriority, ResultSender}, interface::musicbrainz::api::Error as MbError, }, }; @@ -79,21 +79,18 @@ impl AppMachine { } } - fn submit_fetch_job(inner: &AppInner, artist: &Artist, tx: ReturnSender) { + fn submit_fetch_job(inner: &AppInner, artist: &Artist, tx: ResultSender) { let mut queue = VecDeque::new(); match artist.meta.musicbrainz { Some(ref arid) => { - let arid = arid.mbid().clone(); + let arid = arid.mbid(); for album in artist.albums.iter() { - queue.push_back(ApiParams::search(SearchParams::release_group( - SearchReleaseGroupParams::new(arid.clone(), album.meta.clone()), - ))); + let meta = &album.meta; + queue.push_back(ApiParams::search_release_group(arid.clone(), meta.clone())); } } None => { - queue.push_back(ApiParams::search(SearchParams::artist( - SearchArtistParams::new(artist.meta.clone()), - ))); + queue.push_back(ApiParams::search_artist(artist.meta.clone())); } } let job = Job::new(JobPriority::Background, JobInstance::new(tx, queue)); diff --git a/src/tui/lib/external/musicbrainz/daemon/mod.rs b/src/tui/lib/external/musicbrainz/daemon/mod.rs index dc812b9..eeafe7e 100644 --- a/src/tui/lib/external/musicbrainz/daemon/mod.rs +++ b/src/tui/lib/external/musicbrainz/daemon/mod.rs @@ -9,27 +9,20 @@ use crate::tui::{ }; pub enum Error { - RequestRecv(String), - Api(String), -} - -impl From for Error { - fn from(value: mpsc::RecvError) -> Self { - Error::RequestRecv(value.to_string()) - } -} - -impl From for Error { - fn from(value: ApiError) -> Self { - Error::Api(value.to_string()) - } + EventChannelDisconnected, + RequestChannelDisconnected, } pub struct MusicBrainzDaemon { - api: Box, - request_receiver: mpsc::Receiver, + musicbrainz: Box, + job_receiver: mpsc::Receiver, job_queue: JobQueue, - events: EventSender, + event_sender: EventSender, +} + +enum JobError { + JobQueueEmpty, + EventChannelDisconnected, } struct JobQueue { @@ -37,6 +30,226 @@ struct JobQueue { background_queue: VecDeque, } +pub struct Job { + priority: JobPriority, + instance: JobInstance, +} + +impl Job { + pub fn new(priority: JobPriority, instance: JobInstance) -> Self { + Job { priority, instance } + } +} + +pub enum JobPriority { + Foreground, + Background, +} + +pub type ResultSender = mpsc::Sender>; +pub struct JobInstance { + result_sender: ResultSender, + call_queue: VecDeque, +} + +enum JobInstanceStatus { + Continue, + Complete, +} + +enum JobInstanceError { + ReturnChannelDisconnected, + EventChannelDisconnected, +} + +impl JobInstance { + pub fn new(result_sender: ResultSender, call_queue: VecDeque) -> Self { + JobInstance { + result_sender, + call_queue, + } + } +} + +pub enum ApiParams { + Search(SearchParams), +} + +impl ApiParams { + pub fn search_artist(artist: ArtistMeta) -> Self { + ApiParams::Search(SearchParams::Artist(SearchArtistParams { artist })) + } + + pub fn search_release_group(arid: Mbid, album: AlbumMeta) -> Self { + ApiParams::Search(SearchParams::ReleaseGroup(SearchReleaseGroupParams { + arid, + album, + })) + } +} + +pub enum SearchParams { + Artist(SearchArtistParams), + ReleaseGroup(SearchReleaseGroupParams), +} + +pub struct SearchArtistParams { + artist: ArtistMeta, +} + +pub struct SearchReleaseGroupParams { + arid: Mbid, + album: AlbumMeta, +} + +pub struct JobChannel { + pub receiver: mpsc::Receiver, + pub sender: mpsc::Sender, +} + +impl JobChannel { + pub fn new() -> Self { + let (sender, receiver) = mpsc::channel(); + JobChannel { receiver, sender } + } +} + +impl MusicBrainzDaemon { + pub fn run( + musicbrainz: MB, + job_receiver: mpsc::Receiver, + event_sender: EventSender, + ) -> Result<(), Error> { + let daemon = MusicBrainzDaemon { + musicbrainz: Box::new(musicbrainz), + job_receiver, + job_queue: JobQueue::new(), + event_sender, + }; + daemon.main() + } + + fn main(mut self) -> Result<(), Error> { + loop { + self.enqueue_all_pending_jobs()?; + match self.execute_next_job() { + Ok(()) => { + // Sleep for one second. Required by MB API rate limiting. Assume all other + // processing takes negligible time such that regardless of how much other + // processing there is to be done, this one second sleep is necessary. + thread::sleep(time::Duration::from_secs(1)); + } + Err(JobError::JobQueueEmpty) => { + self.wait_for_jobs()?; + } + Err(JobError::EventChannelDisconnected) => { + return Err(Error::EventChannelDisconnected); + } + } + } + } + + fn enqueue_all_pending_jobs(&mut self) -> Result<(), Error> { + loop { + match self.job_receiver.try_recv() { + Ok(job) => self.job_queue.push_back(job), + Err(mpsc::TryRecvError::Empty) => return Ok(()), + Err(mpsc::TryRecvError::Disconnected) => { + return Err(Error::RequestChannelDisconnected); + } + } + } + } + + fn execute_next_job(&mut self) -> Result<(), JobError> { + if let Some(instance) = self.job_queue.front_mut() { + let result = instance.execute_next(&mut self.musicbrainz, &mut self.event_sender); + match result { + Ok(JobInstanceStatus::Continue) => {} + Ok(JobInstanceStatus::Complete) + | Err(JobInstanceError::ReturnChannelDisconnected) => { + self.job_queue.pop_front(); + } + Err(JobInstanceError::EventChannelDisconnected) => { + return Err(JobError::EventChannelDisconnected) + } + } + return Ok(()); + } + Err(JobError::JobQueueEmpty) + } + + fn wait_for_jobs(&mut self) -> Result<(), Error> { + assert!(self.job_queue.is_empty()); + + match self.job_receiver.recv() { + Ok(job) => self.job_queue.push_back(job), + Err(mpsc::RecvError) => return Err(Error::RequestChannelDisconnected), + } + + Ok(()) + } +} + +impl JobInstance { + fn execute_next( + &mut self, + musicbrainz: &mut Box, + event_sender: &mut EventSender, + ) -> Result { + // self.call_queue can be empty if the caller submits an empty job. + if let Some(params) = self.call_queue.pop_front() { + self.execute(musicbrainz, event_sender, params)? + } + + if self.call_queue.is_empty() { + Ok(JobInstanceStatus::Complete) + } else { + Ok(JobInstanceStatus::Continue) + } + } + + fn execute( + &mut self, + musicbrainz: &mut Box, + event_sender: &mut EventSender, + api_params: ApiParams, + ) -> Result<(), JobInstanceError> { + match api_params { + ApiParams::Search(search) => match search { + SearchParams::Artist(params) => { + let result = musicbrainz.search_artist(¶ms.artist); + let result = result.map(|list| MatchStateInfo::artist(params.artist, list)); + self.return_result(event_sender, result) + } + SearchParams::ReleaseGroup(params) => { + let result = musicbrainz.search_release_group(¶ms.arid, ¶ms.album); + let result = result.map(|list| MatchStateInfo::album(params.album, list)); + self.return_result(event_sender, result) + } + }, + } + } + + fn return_result( + &mut self, + event_sender: &mut EventSender, + result: Result, + ) -> Result<(), JobInstanceError> { + self.result_sender + .send(result) + .map_err(|_| JobInstanceError::ReturnChannelDisconnected)?; + + // If this send fails the event listener is dead. Don't panic as this function runs in a + // detached thread so this might be happening during normal shut down. + event_sender + .send(Event::FetchResultReady) + .map_err(|_| JobInstanceError::EventChannelDisconnected)?; + + Ok(()) + } +} + impl JobQueue { fn new() -> Self { JobQueue { @@ -68,207 +281,3 @@ impl JobQueue { } } } - -pub struct Job { - priority: JobPriority, - instance: JobInstance, -} - -impl Job { - pub fn new(priority: JobPriority, instance: JobInstance) -> Self { - Job { priority, instance } - } -} - -pub enum JobPriority { - Foreground, - Background, -} - -pub type ReturnSender = mpsc::Sender>; -pub struct JobInstance { - return_sender: ReturnSender, - call_queue: VecDeque, -} - -impl JobInstance { - pub fn new(return_sender: ReturnSender, call_queue: VecDeque) -> Self { - JobInstance { - return_sender, - call_queue, - } - } -} - -pub enum ApiParams { - Search(SearchParams), -} - -impl ApiParams { - pub fn search(params: SearchParams) -> Self { - ApiParams::Search(params) - } -} - -pub enum SearchParams { - Artist(SearchArtistParams), - ReleaseGroup(SearchReleaseGroupParams), -} - -impl SearchParams { - pub fn artist(params: SearchArtistParams) -> Self { - SearchParams::Artist(params) - } - - pub fn release_group(params: SearchReleaseGroupParams) -> Self { - SearchParams::ReleaseGroup(params) - } -} - -pub struct SearchArtistParams { - artist: ArtistMeta, -} - -impl SearchArtistParams { - pub fn new(artist: ArtistMeta) -> Self { - SearchArtistParams { artist } - } -} - -pub struct SearchReleaseGroupParams { - arid: Mbid, - album: AlbumMeta, -} - -impl SearchReleaseGroupParams { - pub fn new(arid: Mbid, album: AlbumMeta) -> Self { - SearchReleaseGroupParams { arid, album } - } -} - -pub struct RequestChannel { - pub receiver: mpsc::Receiver, - pub sender: mpsc::Sender, -} - -impl RequestChannel { - pub fn new() -> Self { - let (sender, receiver) = mpsc::channel(); - RequestChannel { receiver, sender } - } -} - -impl MusicBrainzDaemon { - pub fn run( - api: MB, - request_receiver: mpsc::Receiver, - events: EventSender, - ) -> Result<(), Error> { - let daemon = MusicBrainzDaemon { - api: Box::new(api), - request_receiver, - job_queue: JobQueue::new(), - events, - }; - daemon.main() - } - - fn wait_for_jobs(&mut self) -> Result<(), Error> { - if self.job_queue.is_empty() { - self.job_queue.push_back(self.request_receiver.recv()?); - } - Ok(()) - } - - fn enqueue_all_pending_jobs(&mut self) -> Result<(), Error> { - loop { - match self.request_receiver.try_recv() { - Ok(job) => self.job_queue.push_back(job), - Err(mpsc::TryRecvError::Empty) => return Ok(()), - Err(mpsc::TryRecvError::Disconnected) => { - return Err(Error::RequestRecv( - mpsc::TryRecvError::Disconnected.to_string(), - )) - } - } - } - } - - fn execute_next_job(&mut self) -> Option<()> { - if let Some(instance) = self.job_queue.front_mut() { - let result = instance.execute_next(&mut self.api, &mut self.events); - if result.is_none() { - self.job_queue.pop_front(); - } - return Some(()); - } - None - } - - fn main(mut self) -> Result<(), Error> { - loop { - self.wait_for_jobs()?; - self.enqueue_all_pending_jobs()?; - if let Some(()) = self.execute_next_job() { - // Sleep for one second. Required by MB API rate limiting. Assume all other - // processing takes negligible time such that regardless of how much other - // processing there is to be done, this one second sleep is necessary. - thread::sleep(time::Duration::from_secs(1)); - } - } - } -} - -impl JobInstance { - fn execute_next( - &mut self, - api: &mut Box, - events: &mut EventSender, - ) -> Option<()> { - self.call_queue - .pop_front() - .map(|api_params| self.execute_call(api, events, api_params)); - if !self.call_queue.is_empty() { - Some(()) - } else { - None - } - } - - fn execute_call( - &mut self, - api: &mut Box, - events: &mut EventSender, - api_params: ApiParams, - ) { - match api_params { - ApiParams::Search(search) => match search { - SearchParams::Artist(params) => { - let result = api.search_artist(¶ms.artist); - let result = result.map(|list| MatchStateInfo::artist(params.artist, list)); - self.return_result(events, result).ok(); - } - SearchParams::ReleaseGroup(params) => { - let result = api.search_release_group(¶ms.arid, ¶ms.album); - let result = result.map(|list| MatchStateInfo::album(params.album, list)); - self.return_result(events, result).ok(); - } - }, - } - } - - fn return_result( - &mut self, - events: &mut EventSender, - result: Result, - ) -> Result<(), ()> { - // If receiver disconnects just drop the rest. - self.return_sender.send(result).map_err(|_| ())?; - - // If this send fails the event listener is dead. Don't panic as this function runs in a - // detached thread so this might be happening during normal shut down. - events.send(Event::FetchResultReady).map_err(|_| ())?; - - Ok(()) - } -} diff --git a/src/tui/mod.rs b/src/tui/mod.rs index 3d73f9e..10c3661 100644 --- a/src/tui/mod.rs +++ b/src/tui/mod.rs @@ -10,7 +10,7 @@ pub use event::EventChannel; pub use handler::EventHandler; pub use lib::external::musicbrainz::{ api::MusicBrainz, - daemon::{MusicBrainzDaemon, RequestChannel}, + daemon::{JobChannel, MusicBrainzDaemon}, }; pub use listener::EventListener; pub use ui::Ui;