Some clean up

This commit is contained in:
Wojciech Kozlowski 2024-09-21 13:43:25 +02:00
parent 21d0034b97
commit 993de9a38b
4 changed files with 243 additions and 237 deletions

View File

@ -26,7 +26,7 @@ use musichoard::{
}; };
use tui::{ use tui::{
App, EventChannel, EventHandler, EventListener, MusicBrainz, MusicBrainzDaemon, RequestChannel, App, EventChannel, EventHandler, EventListener, JobChannel, MusicBrainz, MusicBrainzDaemon,
Tui, Ui, Tui, Ui,
}; };
@ -94,7 +94,7 @@ fn with<Database: IDatabase + 'static, Library: ILibrary + 'static>(
let listener = EventListener::new(listener_sender); let listener = EventListener::new(listener_sender);
let handler = EventHandler::new(channel.receiver()); let handler = EventHandler::new(channel.receiver());
let mb_request_channel = RequestChannel::new(); let mb_request_channel = JobChannel::new();
thread::spawn(|| MusicBrainzDaemon::run(musicbrainz, mb_request_channel.receiver, app_sender)); thread::spawn(|| MusicBrainzDaemon::run(musicbrainz, mb_request_channel.receiver, app_sender));
let app = App::new(music_hoard, mb_request_channel.sender); let app = App::new(music_hoard, mb_request_channel.sender);

View File

@ -1,4 +1,7 @@
use std::{collections::VecDeque, sync::mpsc::{self, TryRecvError}}; use std::{
collections::VecDeque,
sync::mpsc::{self, TryRecvError},
};
use musichoard::collection::{artist::Artist, musicbrainz::IMusicBrainzRef}; use musichoard::collection::{artist::Artist, musicbrainz::IMusicBrainzRef};
@ -8,10 +11,7 @@ use crate::tui::{
AppPublicState, AppState, IAppEventFetch, IAppInteractFetch, MatchStateInfo, AppPublicState, AppState, IAppEventFetch, IAppInteractFetch, MatchStateInfo,
}, },
lib::{ lib::{
external::musicbrainz::daemon::{ external::musicbrainz::daemon::{ApiParams, Job, JobInstance, JobPriority, ResultSender},
ApiParams, Job, JobInstance, JobPriority, ReturnSender, SearchArtistParams,
SearchParams, SearchReleaseGroupParams,
},
interface::musicbrainz::api::Error as MbError, interface::musicbrainz::api::Error as MbError,
}, },
}; };
@ -79,21 +79,18 @@ impl AppMachine<FetchState> {
} }
} }
fn submit_fetch_job(inner: &AppInner, artist: &Artist, tx: ReturnSender) { fn submit_fetch_job(inner: &AppInner, artist: &Artist, tx: ResultSender) {
let mut queue = VecDeque::new(); let mut queue = VecDeque::new();
match artist.meta.musicbrainz { match artist.meta.musicbrainz {
Some(ref arid) => { Some(ref arid) => {
let arid = arid.mbid().clone(); let arid = arid.mbid();
for album in artist.albums.iter() { for album in artist.albums.iter() {
queue.push_back(ApiParams::search(SearchParams::release_group( let meta = &album.meta;
SearchReleaseGroupParams::new(arid.clone(), album.meta.clone()), queue.push_back(ApiParams::search_release_group(arid.clone(), meta.clone()));
)));
} }
} }
None => { None => {
queue.push_back(ApiParams::search(SearchParams::artist( queue.push_back(ApiParams::search_artist(artist.meta.clone()));
SearchArtistParams::new(artist.meta.clone()),
)));
} }
} }
let job = Job::new(JobPriority::Background, JobInstance::new(tx, queue)); let job = Job::new(JobPriority::Background, JobInstance::new(tx, queue));

View File

@ -9,27 +9,20 @@ use crate::tui::{
}; };
pub enum Error { pub enum Error {
RequestRecv(String), EventChannelDisconnected,
Api(String), RequestChannelDisconnected,
}
impl From<mpsc::RecvError> for Error {
fn from(value: mpsc::RecvError) -> Self {
Error::RequestRecv(value.to_string())
}
}
impl From<ApiError> for Error {
fn from(value: ApiError) -> Self {
Error::Api(value.to_string())
}
} }
pub struct MusicBrainzDaemon { pub struct MusicBrainzDaemon {
api: Box<dyn IMusicBrainz>, musicbrainz: Box<dyn IMusicBrainz>,
request_receiver: mpsc::Receiver<Job>, job_receiver: mpsc::Receiver<Job>,
job_queue: JobQueue, job_queue: JobQueue,
events: EventSender, event_sender: EventSender,
}
enum JobError {
JobQueueEmpty,
EventChannelDisconnected,
} }
struct JobQueue { struct JobQueue {
@ -37,6 +30,226 @@ struct JobQueue {
background_queue: VecDeque<JobInstance>, background_queue: VecDeque<JobInstance>,
} }
pub struct Job {
priority: JobPriority,
instance: JobInstance,
}
impl Job {
pub fn new(priority: JobPriority, instance: JobInstance) -> Self {
Job { priority, instance }
}
}
pub enum JobPriority {
Foreground,
Background,
}
pub type ResultSender = mpsc::Sender<Result<MatchStateInfo, ApiError>>;
pub struct JobInstance {
result_sender: ResultSender,
call_queue: VecDeque<ApiParams>,
}
enum JobInstanceStatus {
Continue,
Complete,
}
enum JobInstanceError {
ReturnChannelDisconnected,
EventChannelDisconnected,
}
impl JobInstance {
pub fn new(result_sender: ResultSender, call_queue: VecDeque<ApiParams>) -> Self {
JobInstance {
result_sender,
call_queue,
}
}
}
pub enum ApiParams {
Search(SearchParams),
}
impl ApiParams {
pub fn search_artist(artist: ArtistMeta) -> Self {
ApiParams::Search(SearchParams::Artist(SearchArtistParams { artist }))
}
pub fn search_release_group(arid: Mbid, album: AlbumMeta) -> Self {
ApiParams::Search(SearchParams::ReleaseGroup(SearchReleaseGroupParams {
arid,
album,
}))
}
}
pub enum SearchParams {
Artist(SearchArtistParams),
ReleaseGroup(SearchReleaseGroupParams),
}
pub struct SearchArtistParams {
artist: ArtistMeta,
}
pub struct SearchReleaseGroupParams {
arid: Mbid,
album: AlbumMeta,
}
pub struct JobChannel {
pub receiver: mpsc::Receiver<Job>,
pub sender: mpsc::Sender<Job>,
}
impl JobChannel {
pub fn new() -> Self {
let (sender, receiver) = mpsc::channel();
JobChannel { receiver, sender }
}
}
impl MusicBrainzDaemon {
pub fn run<MB: IMusicBrainz + 'static>(
musicbrainz: MB,
job_receiver: mpsc::Receiver<Job>,
event_sender: EventSender,
) -> Result<(), Error> {
let daemon = MusicBrainzDaemon {
musicbrainz: Box::new(musicbrainz),
job_receiver,
job_queue: JobQueue::new(),
event_sender,
};
daemon.main()
}
fn main(mut self) -> Result<(), Error> {
loop {
self.enqueue_all_pending_jobs()?;
match self.execute_next_job() {
Ok(()) => {
// Sleep for one second. Required by MB API rate limiting. Assume all other
// processing takes negligible time such that regardless of how much other
// processing there is to be done, this one second sleep is necessary.
thread::sleep(time::Duration::from_secs(1));
}
Err(JobError::JobQueueEmpty) => {
self.wait_for_jobs()?;
}
Err(JobError::EventChannelDisconnected) => {
return Err(Error::EventChannelDisconnected);
}
}
}
}
fn enqueue_all_pending_jobs(&mut self) -> Result<(), Error> {
loop {
match self.job_receiver.try_recv() {
Ok(job) => self.job_queue.push_back(job),
Err(mpsc::TryRecvError::Empty) => return Ok(()),
Err(mpsc::TryRecvError::Disconnected) => {
return Err(Error::RequestChannelDisconnected);
}
}
}
}
fn execute_next_job(&mut self) -> Result<(), JobError> {
if let Some(instance) = self.job_queue.front_mut() {
let result = instance.execute_next(&mut self.musicbrainz, &mut self.event_sender);
match result {
Ok(JobInstanceStatus::Continue) => {}
Ok(JobInstanceStatus::Complete)
| Err(JobInstanceError::ReturnChannelDisconnected) => {
self.job_queue.pop_front();
}
Err(JobInstanceError::EventChannelDisconnected) => {
return Err(JobError::EventChannelDisconnected)
}
}
return Ok(());
}
Err(JobError::JobQueueEmpty)
}
fn wait_for_jobs(&mut self) -> Result<(), Error> {
assert!(self.job_queue.is_empty());
match self.job_receiver.recv() {
Ok(job) => self.job_queue.push_back(job),
Err(mpsc::RecvError) => return Err(Error::RequestChannelDisconnected),
}
Ok(())
}
}
impl JobInstance {
fn execute_next(
&mut self,
musicbrainz: &mut Box<dyn IMusicBrainz>,
event_sender: &mut EventSender,
) -> Result<JobInstanceStatus, JobInstanceError> {
// self.call_queue can be empty if the caller submits an empty job.
if let Some(params) = self.call_queue.pop_front() {
self.execute(musicbrainz, event_sender, params)?
}
if self.call_queue.is_empty() {
Ok(JobInstanceStatus::Complete)
} else {
Ok(JobInstanceStatus::Continue)
}
}
fn execute(
&mut self,
musicbrainz: &mut Box<dyn IMusicBrainz>,
event_sender: &mut EventSender,
api_params: ApiParams,
) -> Result<(), JobInstanceError> {
match api_params {
ApiParams::Search(search) => match search {
SearchParams::Artist(params) => {
let result = musicbrainz.search_artist(&params.artist);
let result = result.map(|list| MatchStateInfo::artist(params.artist, list));
self.return_result(event_sender, result)
}
SearchParams::ReleaseGroup(params) => {
let result = musicbrainz.search_release_group(&params.arid, &params.album);
let result = result.map(|list| MatchStateInfo::album(params.album, list));
self.return_result(event_sender, result)
}
},
}
}
fn return_result(
&mut self,
event_sender: &mut EventSender,
result: Result<MatchStateInfo, ApiError>,
) -> Result<(), JobInstanceError> {
self.result_sender
.send(result)
.map_err(|_| JobInstanceError::ReturnChannelDisconnected)?;
// If this send fails the event listener is dead. Don't panic as this function runs in a
// detached thread so this might be happening during normal shut down.
event_sender
.send(Event::FetchResultReady)
.map_err(|_| JobInstanceError::EventChannelDisconnected)?;
Ok(())
}
}
impl JobQueue { impl JobQueue {
fn new() -> Self { fn new() -> Self {
JobQueue { JobQueue {
@ -68,207 +281,3 @@ impl JobQueue {
} }
} }
} }
pub struct Job {
priority: JobPriority,
instance: JobInstance,
}
impl Job {
pub fn new(priority: JobPriority, instance: JobInstance) -> Self {
Job { priority, instance }
}
}
pub enum JobPriority {
Foreground,
Background,
}
pub type ReturnSender = mpsc::Sender<Result<MatchStateInfo, ApiError>>;
pub struct JobInstance {
return_sender: ReturnSender,
call_queue: VecDeque<ApiParams>,
}
impl JobInstance {
pub fn new(return_sender: ReturnSender, call_queue: VecDeque<ApiParams>) -> Self {
JobInstance {
return_sender,
call_queue,
}
}
}
pub enum ApiParams {
Search(SearchParams),
}
impl ApiParams {
pub fn search(params: SearchParams) -> Self {
ApiParams::Search(params)
}
}
pub enum SearchParams {
Artist(SearchArtistParams),
ReleaseGroup(SearchReleaseGroupParams),
}
impl SearchParams {
pub fn artist(params: SearchArtistParams) -> Self {
SearchParams::Artist(params)
}
pub fn release_group(params: SearchReleaseGroupParams) -> Self {
SearchParams::ReleaseGroup(params)
}
}
pub struct SearchArtistParams {
artist: ArtistMeta,
}
impl SearchArtistParams {
pub fn new(artist: ArtistMeta) -> Self {
SearchArtistParams { artist }
}
}
pub struct SearchReleaseGroupParams {
arid: Mbid,
album: AlbumMeta,
}
impl SearchReleaseGroupParams {
pub fn new(arid: Mbid, album: AlbumMeta) -> Self {
SearchReleaseGroupParams { arid, album }
}
}
pub struct RequestChannel {
pub receiver: mpsc::Receiver<Job>,
pub sender: mpsc::Sender<Job>,
}
impl RequestChannel {
pub fn new() -> Self {
let (sender, receiver) = mpsc::channel();
RequestChannel { receiver, sender }
}
}
impl MusicBrainzDaemon {
pub fn run<MB: IMusicBrainz + 'static>(
api: MB,
request_receiver: mpsc::Receiver<Job>,
events: EventSender,
) -> Result<(), Error> {
let daemon = MusicBrainzDaemon {
api: Box::new(api),
request_receiver,
job_queue: JobQueue::new(),
events,
};
daemon.main()
}
fn wait_for_jobs(&mut self) -> Result<(), Error> {
if self.job_queue.is_empty() {
self.job_queue.push_back(self.request_receiver.recv()?);
}
Ok(())
}
fn enqueue_all_pending_jobs(&mut self) -> Result<(), Error> {
loop {
match self.request_receiver.try_recv() {
Ok(job) => self.job_queue.push_back(job),
Err(mpsc::TryRecvError::Empty) => return Ok(()),
Err(mpsc::TryRecvError::Disconnected) => {
return Err(Error::RequestRecv(
mpsc::TryRecvError::Disconnected.to_string(),
))
}
}
}
}
fn execute_next_job(&mut self) -> Option<()> {
if let Some(instance) = self.job_queue.front_mut() {
let result = instance.execute_next(&mut self.api, &mut self.events);
if result.is_none() {
self.job_queue.pop_front();
}
return Some(());
}
None
}
fn main(mut self) -> Result<(), Error> {
loop {
self.wait_for_jobs()?;
self.enqueue_all_pending_jobs()?;
if let Some(()) = self.execute_next_job() {
// Sleep for one second. Required by MB API rate limiting. Assume all other
// processing takes negligible time such that regardless of how much other
// processing there is to be done, this one second sleep is necessary.
thread::sleep(time::Duration::from_secs(1));
}
}
}
}
impl JobInstance {
fn execute_next(
&mut self,
api: &mut Box<dyn IMusicBrainz>,
events: &mut EventSender,
) -> Option<()> {
self.call_queue
.pop_front()
.map(|api_params| self.execute_call(api, events, api_params));
if !self.call_queue.is_empty() {
Some(())
} else {
None
}
}
fn execute_call(
&mut self,
api: &mut Box<dyn IMusicBrainz>,
events: &mut EventSender,
api_params: ApiParams,
) {
match api_params {
ApiParams::Search(search) => match search {
SearchParams::Artist(params) => {
let result = api.search_artist(&params.artist);
let result = result.map(|list| MatchStateInfo::artist(params.artist, list));
self.return_result(events, result).ok();
}
SearchParams::ReleaseGroup(params) => {
let result = api.search_release_group(&params.arid, &params.album);
let result = result.map(|list| MatchStateInfo::album(params.album, list));
self.return_result(events, result).ok();
}
},
}
}
fn return_result(
&mut self,
events: &mut EventSender,
result: Result<MatchStateInfo, ApiError>,
) -> Result<(), ()> {
// If receiver disconnects just drop the rest.
self.return_sender.send(result).map_err(|_| ())?;
// If this send fails the event listener is dead. Don't panic as this function runs in a
// detached thread so this might be happening during normal shut down.
events.send(Event::FetchResultReady).map_err(|_| ())?;
Ok(())
}
}

View File

@ -10,7 +10,7 @@ pub use event::EventChannel;
pub use handler::EventHandler; pub use handler::EventHandler;
pub use lib::external::musicbrainz::{ pub use lib::external::musicbrainz::{
api::MusicBrainz, api::MusicBrainz,
daemon::{MusicBrainzDaemon, RequestChannel}, daemon::{JobChannel, MusicBrainzDaemon},
}; };
pub use listener::EventListener; pub use listener::EventListener;
pub use ui::Ui; pub use ui::Ui;