Compare commits

..

No commits in common. "a67e6b937f0c76987c0bddc830954e7e3ab9d93a" and "21d0034b973cf423ffba12aa73aaffd464b0eb9f" have entirely different histories.

8 changed files with 283 additions and 372 deletions

View File

@ -26,7 +26,7 @@ use musichoard::{
}; };
use tui::{ use tui::{
App, EventChannel, EventHandler, EventListener, JobChannel, MusicBrainz, MusicBrainzDaemon, App, EventChannel, EventHandler, EventListener, MusicBrainz, MusicBrainzDaemon, RequestChannel,
Tui, Ui, Tui, Ui,
}; };
@ -94,13 +94,13 @@ fn with<Database: IDatabase + 'static, Library: ILibrary + 'static>(
let listener = EventListener::new(listener_sender); let listener = EventListener::new(listener_sender);
let handler = EventHandler::new(channel.receiver()); let handler = EventHandler::new(channel.receiver());
let mb_job_channel = JobChannel::new(); let mb_request_channel = RequestChannel::new();
thread::spawn(|| MusicBrainzDaemon::run(musicbrainz, mb_request_channel.receiver, app_sender));
let app = App::new(music_hoard, mb_job_channel.sender()); let app = App::new(music_hoard, mb_request_channel.sender);
let ui = Ui; let ui = Ui;
// Run the TUI application. // Run the TUI application.
thread::spawn(|| MusicBrainzDaemon::run(musicbrainz, mb_job_channel.receiver(), app_sender));
Tui::run(terminal, app, ui, handler, listener).expect("failed to run tui"); Tui::run(terminal, app, ui, handler, listener).expect("failed to run tui");
} }

View File

@ -1,17 +1,18 @@
use std::{ use std::{collections::VecDeque, sync::mpsc::{self, TryRecvError}};
collections::VecDeque,
sync::mpsc::{self, TryRecvError},
};
use musichoard::collection::{artist::Artist, musicbrainz::IMusicBrainzRef}; use musichoard::collection::{artist::Artist, musicbrainz::IMusicBrainzRef};
use crate::tui::{ use crate::tui::{
app::{ app::{
machine::{match_state::MatchState, App, AppInner, AppMachine}, machine::{match_state::MatchState, App, AppInner, AppMachine},
AppPublicState, AppState, IAppEventFetch, IAppInteractFetch, AppPublicState, AppState, IAppEventFetch, IAppInteractFetch, MatchStateInfo,
}, },
lib::external::musicbrainz::daemon::{ lib::{
Error as DaemonError, IMbJobSender, MbApiResult, MbParams, ResultSender, external::musicbrainz::daemon::{
ApiParams, Job, JobInstance, JobPriority, ReturnSender, SearchArtistParams,
SearchParams, SearchReleaseGroupParams,
},
interface::musicbrainz::api::Error as MbError,
}, },
}; };
@ -19,13 +20,16 @@ pub struct FetchState {
fetch_rx: FetchReceiver, fetch_rx: FetchReceiver,
} }
pub type FetchReceiver = mpsc::Receiver<MbApiResult>;
impl FetchState { impl FetchState {
pub fn new(fetch_rx: FetchReceiver) -> Self { pub fn new(fetch_rx: FetchReceiver) -> Self {
FetchState { fetch_rx } FetchState { fetch_rx }
} }
} }
pub type FetchError = MbError;
pub type FetchResult = Result<MatchStateInfo, FetchError>;
pub type FetchReceiver = mpsc::Receiver<FetchResult>;
impl AppMachine<FetchState> { impl AppMachine<FetchState> {
fn fetch_state(inner: AppInner, state: FetchState) -> Self { fn fetch_state(inner: AppInner, state: FetchState) -> Self {
AppMachine::new(inner, state) AppMachine::new(inner, state)
@ -40,10 +44,8 @@ impl AppMachine<FetchState> {
} }
}; };
let (fetch_tx, fetch_rx) = mpsc::channel::<MbApiResult>(); let (fetch_tx, fetch_rx) = mpsc::channel::<FetchResult>();
if let Err(err) = Self::submit_fetch_job(&inner.musicbrainz, fetch_tx, artist) { Self::submit_fetch_job(&inner, artist, fetch_tx);
return AppMachine::error_state(inner, err.to_string()).into();
}
let fetch = FetchState::new(fetch_rx); let fetch = FetchState::new(fetch_rx);
AppMachine::app_fetch(inner, fetch, true) AppMachine::app_fetch(inner, fetch, true)
@ -77,22 +79,25 @@ impl AppMachine<FetchState> {
} }
} }
fn submit_fetch_job( fn submit_fetch_job(inner: &AppInner, artist: &Artist, tx: ReturnSender) {
musicbrainz: &Box<dyn IMbJobSender>, let mut queue = VecDeque::new();
result_sender: ResultSender, match artist.meta.musicbrainz {
artist: &Artist,
) -> Result<(), DaemonError> {
let requests = match artist.meta.musicbrainz {
Some(ref arid) => { Some(ref arid) => {
let arid = arid.mbid(); let arid = arid.mbid().clone();
let albums = artist.albums.iter(); for album in artist.albums.iter() {
albums queue.push_back(ApiParams::search(SearchParams::release_group(
.map(|album| MbParams::search_release_group(arid.clone(), album.meta.clone())) SearchReleaseGroupParams::new(arid.clone(), album.meta.clone()),
.collect() )));
}
} }
None => VecDeque::from([MbParams::search_artist(artist.meta.clone())]), None => {
}; queue.push_back(ApiParams::search(SearchParams::artist(
musicbrainz.submit_background_job(result_sender, requests) SearchArtistParams::new(artist.meta.clone()),
)));
}
}
let job = Job::new(JobPriority::Background, JobInstance::new(tx, queue));
inner.musicbrainz.send(job);
} }
} }

View File

@ -8,12 +8,14 @@ mod match_state;
mod reload_state; mod reload_state;
mod search_state; mod search_state;
use std::sync::mpsc;
use crate::tui::{ use crate::tui::{
app::{ app::{
selection::Selection, AppMode, AppPublic, AppPublicInner, AppPublicState, AppState, IApp, selection::Selection, AppMode, AppPublic, AppPublicInner, AppPublicState, AppState, IApp,
IAppAccess, IAppBase, IAppState, IAppAccess, IAppBase, IAppState,
}, },
lib::{external::musicbrainz::daemon::IMbJobSender, IMusicHoard}, lib::{external::musicbrainz::daemon::Job, IMusicHoard},
}; };
use browse_state::BrowseState; use browse_state::BrowseState;
@ -46,7 +48,7 @@ pub struct AppMachine<STATE> {
pub struct AppInner { pub struct AppInner {
running: bool, running: bool,
music_hoard: Box<dyn IMusicHoard>, music_hoard: Box<dyn IMusicHoard>,
musicbrainz: Box<dyn IMbJobSender>, musicbrainz: mpsc::Sender<Job>,
selection: Selection, selection: Selection,
} }
@ -81,9 +83,9 @@ macro_rules! app_field_mut {
} }
impl App { impl App {
pub fn new<MH: IMusicHoard + 'static, MB: IMbJobSender + 'static>( pub fn new<MH: IMusicHoard + 'static>(
mut music_hoard: MH, mut music_hoard: MH,
musicbrainz: MB, musicbrainz: mpsc::Sender<Job>,
) -> Self { ) -> Self {
let init_result = Self::init(&mut music_hoard); let init_result = Self::init(&mut music_hoard);
let inner = AppInner::new(music_hoard, musicbrainz); let inner = AppInner::new(music_hoard, musicbrainz);
@ -169,15 +171,12 @@ impl IAppAccess for App {
} }
impl AppInner { impl AppInner {
pub fn new<MH: IMusicHoard + 'static, MB: IMbJobSender + 'static>( pub fn new<MH: IMusicHoard + 'static>(music_hoard: MH, musicbrainz: mpsc::Sender<Job>) -> Self {
music_hoard: MH,
musicbrainz: MB,
) -> Self {
let selection = Selection::new(music_hoard.get_collection()); let selection = Selection::new(music_hoard.get_collection());
AppInner { AppInner {
running: true, running: true,
music_hoard: Box::new(music_hoard), music_hoard: Box::new(music_hoard),
musicbrainz: Box::new(musicbrainz), musicbrainz,
selection, selection,
} }
} }

View File

@ -42,7 +42,7 @@ impl From<mpsc::TryRecvError> for EventError {
#[derive(Clone, Copy, Debug, PartialEq, Eq)] #[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum Event { pub enum Event {
Key(KeyEvent), Key(KeyEvent),
FetchComplete, FetchResultReady,
} }
pub struct EventChannel { pub struct EventChannel {
@ -50,14 +50,6 @@ pub struct EventChannel {
receiver: mpsc::Receiver<Event>, receiver: mpsc::Receiver<Event>,
} }
pub trait IKeyEventSender {
fn send_key(&self, key_event: KeyEvent) -> Result<(), EventError>;
}
pub trait IFetchCompleteEventSender {
fn send_fetch_complete(&self) -> Result<(), EventError>;
}
#[derive(Clone)] #[derive(Clone)]
pub struct EventSender { pub struct EventSender {
sender: mpsc::Sender<Event>, sender: mpsc::Sender<Event>,
@ -86,15 +78,9 @@ impl EventChannel {
} }
} }
impl IKeyEventSender for EventSender { impl EventSender {
fn send_key(&self, key_event: KeyEvent) -> Result<(), EventError> { pub fn send(&self, event: Event) -> Result<(), EventError> {
Ok(self.sender.send(Event::Key(key_event))?) Ok(self.sender.send(event)?)
}
}
impl IFetchCompleteEventSender for EventSender {
fn send_fetch_complete(&self) -> Result<(), EventError> {
Ok(self.sender.send(Event::FetchComplete)?)
} }
} }
@ -122,13 +108,13 @@ mod tests {
let channel = EventChannel::new(); let channel = EventChannel::new();
let sender = channel.sender(); let sender = channel.sender();
let receiver = channel.receiver(); let receiver = channel.receiver();
let key_event = KeyEvent::new(KeyCode::Up, KeyModifiers::empty()); let event = Event::Key(KeyEvent::new(KeyCode::Up, KeyModifiers::empty()));
let result = sender.send_key(key_event); let result = sender.send(event);
assert!(result.is_ok()); assert!(result.is_ok());
drop(receiver); drop(receiver);
let result = sender.send_key(key_event); let result = sender.send(event);
assert!(result.is_err()); assert!(result.is_err());
} }
@ -137,9 +123,9 @@ mod tests {
let channel = EventChannel::new(); let channel = EventChannel::new();
let sender = channel.sender(); let sender = channel.sender();
let receiver = channel.receiver(); let receiver = channel.receiver();
let key_event = KeyEvent::new(KeyCode::Up, KeyModifiers::empty()); let event = Event::Key(KeyEvent::new(KeyCode::Up, KeyModifiers::empty()));
sender.send_key(key_event).unwrap(); sender.send(event).unwrap();
let result = receiver.recv(); let result = receiver.recv();
assert!(result.is_ok()); assert!(result.is_ok());
@ -153,11 +139,12 @@ mod tests {
let channel = EventChannel::new(); let channel = EventChannel::new();
let sender = channel.sender(); let sender = channel.sender();
let receiver = channel.receiver(); let receiver = channel.receiver();
let event = Event::FetchResultReady;
let result = receiver.try_recv(); let result = receiver.try_recv();
assert!(result.is_err()); assert!(result.is_err());
sender.send_fetch_complete().unwrap(); sender.send(event).unwrap();
let result = receiver.try_recv(); let result = receiver.try_recv();
assert!(result.is_ok()); assert!(result.is_ok());

View File

@ -28,7 +28,7 @@ trait IEventHandlerPrivate<APP: IApp> {
fn handle_error_key_event(app: <APP as IApp>::ErrorState, key_event: KeyEvent) -> APP; fn handle_error_key_event(app: <APP as IApp>::ErrorState, key_event: KeyEvent) -> APP;
fn handle_critical_key_event(app: <APP as IApp>::CriticalState, key_event: KeyEvent) -> APP; fn handle_critical_key_event(app: <APP as IApp>::CriticalState, key_event: KeyEvent) -> APP;
fn handle_fetch_complete_event(app: APP) -> APP; fn handle_fetch_result_ready_event(app: APP) -> APP;
fn handle_input_key_event<Input: IAppInput<APP = APP>>(app: Input, key_event: KeyEvent) -> APP; fn handle_input_key_event<Input: IAppInput<APP = APP>>(app: Input, key_event: KeyEvent) -> APP;
} }
@ -48,7 +48,7 @@ impl<APP: IApp> IEventHandler<APP> for EventHandler {
fn handle_next_event(&self, app: APP) -> Result<APP, EventError> { fn handle_next_event(&self, app: APP) -> Result<APP, EventError> {
Ok(match self.events.recv()? { Ok(match self.events.recv()? {
Event::Key(key_event) => Self::handle_key_event(app, key_event), Event::Key(key_event) => Self::handle_key_event(app, key_event),
Event::FetchComplete => Self::handle_fetch_complete_event(app), Event::FetchResultReady => Self::handle_fetch_result_ready_event(app),
}) })
} }
} }
@ -92,7 +92,7 @@ impl<APP: IApp> IEventHandlerPrivate<APP> for EventHandler {
} }
} }
fn handle_fetch_complete_event(app: APP) -> APP { fn handle_fetch_result_ready_event(app: APP) -> APP {
match app.state() { match app.state() {
AppState::Browse(state) => state.no_op(), AppState::Browse(state) => state.no_op(),
AppState::Info(state) => state.no_op(), AppState::Info(state) => state.no_op(),

View File

@ -1,37 +1,35 @@
use std::{collections::VecDeque, fmt, sync::mpsc, thread, time}; use std::{collections::VecDeque, sync::mpsc, thread, time};
use musichoard::collection::{album::AlbumMeta, artist::ArtistMeta, musicbrainz::Mbid}; use musichoard::collection::{album::AlbumMeta, artist::ArtistMeta, musicbrainz::Mbid};
use crate::tui::{ use crate::tui::{
app::MatchStateInfo, app::MatchStateInfo,
event::IFetchCompleteEventSender, event::{Event, EventSender},
lib::interface::musicbrainz::api::{Error as ApiError, IMusicBrainz}, lib::interface::musicbrainz::api::{Error as ApiError, IMusicBrainz},
}; };
pub enum Error { pub enum Error {
EventChannelDisconnected, RequestRecv(String),
JobChannelDisconnected, Api(String),
} }
impl fmt::Display for Error { impl From<mpsc::RecvError> for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn from(value: mpsc::RecvError) -> Self {
match self { Error::RequestRecv(value.to_string())
Error::EventChannelDisconnected => write!(f, "the event channel is disconnected"), }
Error::JobChannelDisconnected => write!(f, "the job channel is disconnected"), }
}
impl From<ApiError> for Error {
fn from(value: ApiError) -> Self {
Error::Api(value.to_string())
} }
} }
pub struct MusicBrainzDaemon { pub struct MusicBrainzDaemon {
musicbrainz: Box<dyn IMusicBrainz>, api: Box<dyn IMusicBrainz>,
job_receiver: mpsc::Receiver<Job>, request_receiver: mpsc::Receiver<Job>,
job_queue: JobQueue, job_queue: JobQueue,
event_sender: Box<dyn IFetchCompleteEventSender>, events: EventSender,
}
enum JobError {
JobQueueEmpty,
EventChannelDisconnected,
} }
struct JobQueue { struct JobQueue {
@ -39,286 +37,6 @@ struct JobQueue {
background_queue: VecDeque<JobInstance>, background_queue: VecDeque<JobInstance>,
} }
struct Job {
priority: JobPriority,
instance: JobInstance,
}
impl Job {
pub fn new(priority: JobPriority, instance: JobInstance) -> Self {
Job { priority, instance }
}
}
enum JobPriority {
Foreground,
Background,
}
pub type MbApiResult = Result<MatchStateInfo, ApiError>;
pub type ResultSender = mpsc::Sender<MbApiResult>;
struct JobInstance {
result_sender: ResultSender,
requests: VecDeque<MbParams>,
}
enum JobInstanceStatus {
Continue,
Complete,
}
enum JobInstanceError {
ReturnChannelDisconnected,
EventChannelDisconnected,
}
impl JobInstance {
fn new(result_sender: ResultSender, requests: VecDeque<MbParams>) -> Self {
JobInstance {
result_sender,
requests,
}
}
}
pub enum MbParams {
Search(SearchParams),
}
impl MbParams {
pub fn search_artist(artist: ArtistMeta) -> Self {
MbParams::Search(SearchParams::Artist(SearchArtistParams { artist }))
}
pub fn search_release_group(arid: Mbid, album: AlbumMeta) -> Self {
MbParams::Search(SearchParams::ReleaseGroup(SearchReleaseGroupParams {
arid,
album,
}))
}
}
pub enum SearchParams {
Artist(SearchArtistParams),
ReleaseGroup(SearchReleaseGroupParams),
}
pub struct SearchArtistParams {
artist: ArtistMeta,
}
pub struct SearchReleaseGroupParams {
arid: Mbid,
album: AlbumMeta,
}
pub struct JobChannel {
sender: mpsc::Sender<Job>,
receiver: mpsc::Receiver<Job>,
}
pub struct JobSender {
sender: mpsc::Sender<Job>,
}
pub struct JobReceiver {
receiver: mpsc::Receiver<Job>,
}
impl JobChannel {
pub fn new() -> Self {
let (sender, receiver) = mpsc::channel();
JobChannel { receiver, sender }
}
pub fn sender(&self) -> JobSender {
JobSender {
sender: self.sender.clone(),
}
}
pub fn receiver(self) -> JobReceiver {
JobReceiver {
receiver: self.receiver,
}
}
}
pub trait IMbJobSender {
fn submit_background_job(
&self,
result_sender: ResultSender,
requests: VecDeque<MbParams>,
) -> Result<(), Error>;
}
impl IMbJobSender for JobSender {
fn submit_background_job(
&self,
result_sender: ResultSender,
requests: VecDeque<MbParams>,
) -> Result<(), Error> {
self.send_background_job(result_sender, requests)
}
}
impl JobSender {
fn send_background_job(
&self,
result_sender: ResultSender,
requests: VecDeque<MbParams>,
) -> Result<(), Error> {
self.send_job(JobPriority::Background, result_sender, requests)
}
fn send_job(
&self,
priority: JobPriority,
result_sender: ResultSender,
requests: VecDeque<MbParams>,
) -> Result<(), Error> {
let instance = JobInstance::new(result_sender, requests);
let job = Job::new(priority, instance);
self.sender.send(job).or(Err(Error::JobChannelDisconnected))
}
}
impl MusicBrainzDaemon {
pub fn run<MB: IMusicBrainz + 'static, ES: IFetchCompleteEventSender + 'static>(
musicbrainz: MB,
job_receiver: JobReceiver,
event_sender: ES,
) -> Result<(), Error> {
let daemon = MusicBrainzDaemon {
musicbrainz: Box::new(musicbrainz),
job_receiver: job_receiver.receiver,
job_queue: JobQueue::new(),
event_sender: Box::new(event_sender),
};
daemon.main()
}
fn main(mut self) -> Result<(), Error> {
loop {
self.enqueue_all_pending_jobs()?;
match self.execute_next_job() {
Ok(()) => {
// Sleep for one second. Required by MB API rate limiting. Assume all other
// processing takes negligible time such that regardless of how much other
// processing there is to be done, this one second sleep is necessary.
thread::sleep(time::Duration::from_secs(1));
}
Err(JobError::JobQueueEmpty) => {
self.wait_for_jobs()?;
}
Err(JobError::EventChannelDisconnected) => {
return Err(Error::EventChannelDisconnected);
}
}
}
}
fn enqueue_all_pending_jobs(&mut self) -> Result<(), Error> {
loop {
match self.job_receiver.try_recv() {
Ok(job) => self.job_queue.push_back(job),
Err(mpsc::TryRecvError::Empty) => return Ok(()),
Err(mpsc::TryRecvError::Disconnected) => {
return Err(Error::JobChannelDisconnected);
}
}
}
}
fn execute_next_job(&mut self) -> Result<(), JobError> {
if let Some(instance) = self.job_queue.front_mut() {
let result = instance.execute_next(&mut self.musicbrainz, &mut self.event_sender);
match result {
Ok(JobInstanceStatus::Continue) => {}
Ok(JobInstanceStatus::Complete)
| Err(JobInstanceError::ReturnChannelDisconnected) => {
self.job_queue.pop_front();
}
Err(JobInstanceError::EventChannelDisconnected) => {
return Err(JobError::EventChannelDisconnected)
}
}
return Ok(());
}
Err(JobError::JobQueueEmpty)
}
fn wait_for_jobs(&mut self) -> Result<(), Error> {
assert!(self.job_queue.is_empty());
match self.job_receiver.recv() {
Ok(job) => self.job_queue.push_back(job),
Err(mpsc::RecvError) => return Err(Error::JobChannelDisconnected),
}
Ok(())
}
}
impl JobInstance {
fn execute_next(
&mut self,
musicbrainz: &mut Box<dyn IMusicBrainz>,
event_sender: &mut Box<dyn IFetchCompleteEventSender>,
) -> Result<JobInstanceStatus, JobInstanceError> {
// self.requests can be empty if the caller submits an empty job.
if let Some(params) = self.requests.pop_front() {
self.execute(musicbrainz, event_sender, params)?
}
if self.requests.is_empty() {
Ok(JobInstanceStatus::Complete)
} else {
Ok(JobInstanceStatus::Continue)
}
}
fn execute(
&mut self,
musicbrainz: &mut Box<dyn IMusicBrainz>,
event_sender: &mut Box<dyn IFetchCompleteEventSender>,
api_params: MbParams,
) -> Result<(), JobInstanceError> {
match api_params {
MbParams::Search(search) => match search {
SearchParams::Artist(params) => {
let result = musicbrainz.search_artist(&params.artist);
let result = result.map(|list| MatchStateInfo::artist(params.artist, list));
self.return_result(event_sender, result)
}
SearchParams::ReleaseGroup(params) => {
let result = musicbrainz.search_release_group(&params.arid, &params.album);
let result = result.map(|list| MatchStateInfo::album(params.album, list));
self.return_result(event_sender, result)
}
},
}
}
fn return_result(
&mut self,
event_sender: &mut Box<dyn IFetchCompleteEventSender>,
result: Result<MatchStateInfo, ApiError>,
) -> Result<(), JobInstanceError> {
self.result_sender
.send(result)
.map_err(|_| JobInstanceError::ReturnChannelDisconnected)?;
// If this send fails the event listener is dead. Don't panic as this function runs in a
// detached thread so this might be happening during normal shut down.
event_sender
.send_fetch_complete()
.map_err(|_| JobInstanceError::EventChannelDisconnected)?;
Ok(())
}
}
impl JobQueue { impl JobQueue {
fn new() -> Self { fn new() -> Self {
JobQueue { JobQueue {
@ -350,3 +68,207 @@ impl JobQueue {
} }
} }
} }
pub struct Job {
priority: JobPriority,
instance: JobInstance,
}
impl Job {
pub fn new(priority: JobPriority, instance: JobInstance) -> Self {
Job { priority, instance }
}
}
pub enum JobPriority {
Foreground,
Background,
}
pub type ReturnSender = mpsc::Sender<Result<MatchStateInfo, ApiError>>;
pub struct JobInstance {
return_sender: ReturnSender,
call_queue: VecDeque<ApiParams>,
}
impl JobInstance {
pub fn new(return_sender: ReturnSender, call_queue: VecDeque<ApiParams>) -> Self {
JobInstance {
return_sender,
call_queue,
}
}
}
pub enum ApiParams {
Search(SearchParams),
}
impl ApiParams {
pub fn search(params: SearchParams) -> Self {
ApiParams::Search(params)
}
}
pub enum SearchParams {
Artist(SearchArtistParams),
ReleaseGroup(SearchReleaseGroupParams),
}
impl SearchParams {
pub fn artist(params: SearchArtistParams) -> Self {
SearchParams::Artist(params)
}
pub fn release_group(params: SearchReleaseGroupParams) -> Self {
SearchParams::ReleaseGroup(params)
}
}
pub struct SearchArtistParams {
artist: ArtistMeta,
}
impl SearchArtistParams {
pub fn new(artist: ArtistMeta) -> Self {
SearchArtistParams { artist }
}
}
pub struct SearchReleaseGroupParams {
arid: Mbid,
album: AlbumMeta,
}
impl SearchReleaseGroupParams {
pub fn new(arid: Mbid, album: AlbumMeta) -> Self {
SearchReleaseGroupParams { arid, album }
}
}
pub struct RequestChannel {
pub receiver: mpsc::Receiver<Job>,
pub sender: mpsc::Sender<Job>,
}
impl RequestChannel {
pub fn new() -> Self {
let (sender, receiver) = mpsc::channel();
RequestChannel { receiver, sender }
}
}
impl MusicBrainzDaemon {
pub fn run<MB: IMusicBrainz + 'static>(
api: MB,
request_receiver: mpsc::Receiver<Job>,
events: EventSender,
) -> Result<(), Error> {
let daemon = MusicBrainzDaemon {
api: Box::new(api),
request_receiver,
job_queue: JobQueue::new(),
events,
};
daemon.main()
}
fn wait_for_jobs(&mut self) -> Result<(), Error> {
if self.job_queue.is_empty() {
self.job_queue.push_back(self.request_receiver.recv()?);
}
Ok(())
}
fn enqueue_all_pending_jobs(&mut self) -> Result<(), Error> {
loop {
match self.request_receiver.try_recv() {
Ok(job) => self.job_queue.push_back(job),
Err(mpsc::TryRecvError::Empty) => return Ok(()),
Err(mpsc::TryRecvError::Disconnected) => {
return Err(Error::RequestRecv(
mpsc::TryRecvError::Disconnected.to_string(),
))
}
}
}
}
fn execute_next_job(&mut self) -> Option<()> {
if let Some(instance) = self.job_queue.front_mut() {
let result = instance.execute_next(&mut self.api, &mut self.events);
if result.is_none() {
self.job_queue.pop_front();
}
return Some(());
}
None
}
fn main(mut self) -> Result<(), Error> {
loop {
self.wait_for_jobs()?;
self.enqueue_all_pending_jobs()?;
if let Some(()) = self.execute_next_job() {
// Sleep for one second. Required by MB API rate limiting. Assume all other
// processing takes negligible time such that regardless of how much other
// processing there is to be done, this one second sleep is necessary.
thread::sleep(time::Duration::from_secs(1));
}
}
}
}
impl JobInstance {
fn execute_next(
&mut self,
api: &mut Box<dyn IMusicBrainz>,
events: &mut EventSender,
) -> Option<()> {
self.call_queue
.pop_front()
.map(|api_params| self.execute_call(api, events, api_params));
if !self.call_queue.is_empty() {
Some(())
} else {
None
}
}
fn execute_call(
&mut self,
api: &mut Box<dyn IMusicBrainz>,
events: &mut EventSender,
api_params: ApiParams,
) {
match api_params {
ApiParams::Search(search) => match search {
SearchParams::Artist(params) => {
let result = api.search_artist(&params.artist);
let result = result.map(|list| MatchStateInfo::artist(params.artist, list));
self.return_result(events, result).ok();
}
SearchParams::ReleaseGroup(params) => {
let result = api.search_release_group(&params.arid, &params.album);
let result = result.map(|list| MatchStateInfo::album(params.album, list));
self.return_result(events, result).ok();
}
},
}
}
fn return_result(
&mut self,
events: &mut EventSender,
result: Result<MatchStateInfo, ApiError>,
) -> Result<(), ()> {
// If receiver disconnects just drop the rest.
self.return_sender.send(result).map_err(|_| ())?;
// If this send fails the event listener is dead. Don't panic as this function runs in a
// detached thread so this might be happening during normal shut down.
events.send(Event::FetchResultReady).map_err(|_| ())?;
Ok(())
}
}

View File

@ -4,7 +4,7 @@ use std::thread;
#[cfg(test)] #[cfg(test)]
use mockall::automock; use mockall::automock;
use crate::tui::event::{EventError, IKeyEventSender}; use crate::tui::event::{Event, EventError, EventSender};
#[cfg_attr(test, automock)] #[cfg_attr(test, automock)]
pub trait IEventListener { pub trait IEventListener {
@ -12,15 +12,13 @@ pub trait IEventListener {
} }
pub struct EventListener { pub struct EventListener {
event_sender: Box<dyn IKeyEventSender + Send>, events: EventSender,
} }
// GRCOV_EXCL_START // GRCOV_EXCL_START
impl EventListener { impl EventListener {
pub fn new<ES: IKeyEventSender + Send + 'static>(event_sender: ES) -> Self { pub fn new(events: EventSender) -> Self {
EventListener { EventListener { events }
event_sender: Box::new(event_sender),
}
} }
} }
@ -34,7 +32,7 @@ impl IEventListener for EventListener {
match event::read() { match event::read() {
Ok(event) => { Ok(event) => {
if let Err(err) = match event { if let Err(err) = match event {
CrosstermEvent::Key(e) => self.event_sender.send_key(e), CrosstermEvent::Key(e) => self.events.send(Event::Key(e)),
_ => Ok(()), _ => Ok(()),
} { } {
return err; return err;

View File

@ -10,7 +10,7 @@ pub use event::EventChannel;
pub use handler::EventHandler; pub use handler::EventHandler;
pub use lib::external::musicbrainz::{ pub use lib::external::musicbrainz::{
api::MusicBrainz, api::MusicBrainz,
daemon::{JobChannel, MusicBrainzDaemon}, daemon::{MusicBrainzDaemon, RequestChannel},
}; };
pub use listener::EventListener; pub use listener::EventListener;
pub use ui::Ui; pub use ui::Ui;