diff --git a/CHANGELOG.md b/CHANGELOG.md index 5b057082b..7ee476558 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -36,6 +36,9 @@ https://github.com/librespot-org/librespot - [all] `chrono` replaced with `time` (breaking) - [all] `time` updated (CVE-2020-26235) - [all] Improve lock contention and performance (breaking) +- [all] Use a single `player` instance. Eliminates occasional `player` and + `audio backend` restarts, which can cause issues with some playback + configurations. - [audio] Files are now downloaded over the HTTPS CDN (breaking) - [audio] Improve file opening and seeking performance (breaking) - [core] MSRV is now 1.65 (breaking) diff --git a/connect/src/spirc.rs b/connect/src/spirc.rs index 857a44092..be0a8b807 100644 --- a/connect/src/spirc.rs +++ b/connect/src/spirc.rs @@ -3,6 +3,7 @@ use std::{ future::Future, pin::Pin, sync::atomic::{AtomicUsize, Ordering}, + sync::Arc, time::{SystemTime, UNIX_EPOCH}, }; @@ -77,8 +78,8 @@ enum SpircPlayStatus { type BoxedStream = Pin + Send>>; struct SpircTask { - player: Player, - mixer: Box, + player: Arc, + mixer: Arc, sequence: SeqGenerator, @@ -272,8 +273,8 @@ impl Spirc { config: ConnectConfig, session: Session, credentials: Credentials, - player: Player, - mixer: Box, + player: Arc, + mixer: Arc, ) -> Result<(Spirc, impl Future), Error> { let spirc_id = SPIRC_COUNTER.fetch_add(1, Ordering::AcqRel); debug!("new Spirc[{}]", spirc_id); @@ -663,6 +664,11 @@ impl SpircTask { } fn handle_player_event(&mut self, event: PlayerEvent) -> Result<(), Error> { + // update play_request_id + if let PlayerEvent::PlayRequestIdChanged { play_request_id } = event { + self.play_request_id = Some(play_request_id); + return Ok(()); + } // we only process events if the play_request_id matches. If it doesn't, it is // an event that belongs to a previous track and only arrives now due to a race // condition. In this case we have updated the state already and don't want to @@ -1462,7 +1468,7 @@ impl SpircTask { Some((track, index)) => { self.state.set_playing_track_index(index); - self.play_request_id = Some(self.player.load(track, start_playing, position_ms)); + self.player.load(track, start_playing, position_ms); self.update_state_position(position_ms); if start_playing { diff --git a/examples/play.rs b/examples/play.rs index eb7dc3826..9e4e29afb 100644 --- a/examples/play.rs +++ b/examples/play.rs @@ -40,7 +40,7 @@ async fn main() { exit(1); } - let mut player = Player::new(player_config, session, Box::new(NoOpVolume), move || { + let player = Player::new(player_config, session, Box::new(NoOpVolume), move || { backend(None, audio_format) }); diff --git a/examples/play_connect.rs b/examples/play_connect.rs index 2b23a7d3f..a61d3d674 100644 --- a/examples/play_connect.rs +++ b/examples/play_connect.rs @@ -17,6 +17,7 @@ use librespot_metadata::{Album, Metadata}; use librespot_playback::mixer::{softmixer::SoftMixer, Mixer, MixerConfig}; use librespot_protocol::spirc::TrackRef; use std::env; +use std::sync::Arc; use tokio::join; #[tokio::main] @@ -54,7 +55,7 @@ async fn main() { session.clone(), credentials, player, - Box::new(SoftMixer::open(MixerConfig::default())), + Arc::new(SoftMixer::open(MixerConfig::default())), ) .await .unwrap(); diff --git a/playback/src/mixer/mod.rs b/playback/src/mixer/mod.rs index 0a8b8d6c1..83d008532 100644 --- a/playback/src/mixer/mod.rs +++ b/playback/src/mixer/mod.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use crate::config::VolumeCtrl; pub mod mappings; @@ -5,7 +7,7 @@ use self::mappings::MappedCtrl; pub struct NoOpVolume; -pub trait Mixer: Send { +pub trait Mixer: Send + Sync { fn open(config: MixerConfig) -> Self where Self: Sized; @@ -55,10 +57,10 @@ impl Default for MixerConfig { } } -pub type MixerFn = fn(MixerConfig) -> Box; +pub type MixerFn = fn(MixerConfig) -> Arc; -fn mk_sink(config: MixerConfig) -> Box { - Box::new(M::open(config)) +fn mk_sink(config: MixerConfig) -> Arc { + Arc::new(M::open(config)) } pub const MIXERS: &[(&str, MixerFn)] = &[ diff --git a/playback/src/player.rs b/playback/src/player.rs index 0790b40ab..162750531 100644 --- a/playback/src/player.rs +++ b/playback/src/player.rs @@ -55,7 +55,6 @@ pub type PlayerResult = Result<(), Error>; pub struct Player { commands: Option>, thread_handle: Option>, - play_request_id_generator: SeqGenerator, } #[derive(PartialEq, Eq, Debug, Clone, Copy)] @@ -88,6 +87,7 @@ struct PlayerInternal { auto_normalise_as_album: bool, player_id: usize, + play_request_id_generator: SeqGenerator, } static PLAYER_COUNTER: AtomicUsize = AtomicUsize::new(0); @@ -95,7 +95,6 @@ static PLAYER_COUNTER: AtomicUsize = AtomicUsize::new(0); enum PlayerCommand { Load { track_id: SpotifyId, - play_request_id: u64, play: bool, position_ms: u32, }, @@ -106,6 +105,7 @@ enum PlayerCommand { Pause, Stop, Seek(u32), + SetSession(Session), AddEventSender(mpsc::UnboundedSender), SetSinkEventCallback(Option), EmitVolumeChangedEvent(u16), @@ -132,6 +132,10 @@ enum PlayerCommand { #[derive(Debug, Clone)] pub enum PlayerEvent { + // Play request id changed + PlayRequestIdChanged { + play_request_id: u64, + }, // Fired when the player is stopped (e.g. by issuing a "stop" command to the player). Stopped { play_request_id: u64, @@ -416,7 +420,7 @@ impl Player { session: Session, volume_getter: Box, sink_builder: F, - ) -> Self + ) -> Arc where F: FnOnce() -> Box + Send + 'static, { @@ -475,6 +479,7 @@ impl Player { auto_normalise_as_album: false, player_id, + play_request_id_generator: SeqGenerator::new(0), }; // While PlayerInternal is written as a future, it still contains blocking code. @@ -485,11 +490,17 @@ impl Player { debug!("PlayerInternal thread finished."); }); - Self { + Arc::new(Self { commands: Some(cmd_tx), thread_handle: Some(handle), - play_request_id_generator: SeqGenerator::new(0), + }) + } + + pub fn is_invalid(&self) -> bool { + if let Some(handle) = self.thread_handle.as_ref() { + return handle.is_finished(); } + true } fn command(&self, cmd: PlayerCommand) { @@ -500,16 +511,12 @@ impl Player { } } - pub fn load(&mut self, track_id: SpotifyId, start_playing: bool, position_ms: u32) -> u64 { - let play_request_id = self.play_request_id_generator.get(); + pub fn load(&self, track_id: SpotifyId, start_playing: bool, position_ms: u32) { self.command(PlayerCommand::Load { track_id, - play_request_id, play: start_playing, position_ms, }); - - play_request_id } pub fn preload(&self, track_id: SpotifyId) { @@ -532,6 +539,10 @@ impl Player { self.command(PlayerCommand::Seek(position_ms)); } + pub fn set_session(&self, session: Session) { + self.command(PlayerCommand::SetSession(session)); + } + pub fn get_player_event_channel(&self) -> PlayerEventChannel { let (event_sender, event_receiver) = mpsc::unbounded_channel(); self.command(PlayerCommand::AddEventSender(event_sender)); @@ -1379,10 +1390,6 @@ impl Future for PlayerInternal { } } - if self.session.is_invalid() { - return Poll::Ready(()); - } - if (!self.state.is_playing()) && all_futures_completed_or_not_ready { return Poll::Pending; } @@ -1754,10 +1761,15 @@ impl PlayerInternal { fn handle_command_load( &mut self, track_id: SpotifyId, - play_request_id: u64, + play_request_id_option: Option, play: bool, position_ms: u32, ) -> PlayerResult { + let play_request_id = + play_request_id_option.unwrap_or(self.play_request_id_generator.get()); + + self.send_event(PlayerEvent::PlayRequestIdChanged { play_request_id }); + if !self.config.gapless { self.ensure_sink_stopped(play); } @@ -2010,7 +2022,7 @@ impl PlayerInternal { { return self.handle_command_load( track_id, - play_request_id, + Some(play_request_id), start_playback, position_ms, ); @@ -2067,10 +2079,9 @@ impl PlayerInternal { match cmd { PlayerCommand::Load { track_id, - play_request_id, play, position_ms, - } => self.handle_command_load(track_id, play_request_id, play, position_ms)?, + } => self.handle_command_load(track_id, None, play, position_ms)?, PlayerCommand::Preload { track_id } => self.handle_command_preload(track_id), @@ -2082,6 +2093,8 @@ impl PlayerInternal { PlayerCommand::Stop => self.handle_player_stop(), + PlayerCommand::SetSession(session) => self.session = session, + PlayerCommand::AddEventSender(sender) => self.event_senders.push(sender), PlayerCommand::SetSinkEventCallback(callback) => self.sink_event_callback = callback, @@ -2272,6 +2285,7 @@ impl fmt::Debug for PlayerCommand { PlayerCommand::Pause => f.debug_tuple("Pause").finish(), PlayerCommand::Stop => f.debug_tuple("Stop").finish(), PlayerCommand::Seek(position) => f.debug_tuple("Seek").field(&position).finish(), + PlayerCommand::SetSession(_) => f.debug_tuple("SetSession").finish(), PlayerCommand::AddEventSender(_) => f.debug_tuple("AddEventSender").finish(), PlayerCommand::SetSinkEventCallback(_) => { f.debug_tuple("SetSinkEventCallback").finish() diff --git a/src/main.rs b/src/main.rs index e8c2e66f0..11574d925 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1715,6 +1715,31 @@ async fn main() { exit(1); } + let mixer_config = setup.mixer_config.clone(); + let mixer = (setup.mixer)(mixer_config); + let player_config = setup.player_config.clone(); + + let soft_volume = mixer.get_soft_volume(); + let format = setup.format; + let backend = setup.backend; + let device = setup.device.clone(); + let player = Player::new(player_config, session.clone(), soft_volume, move || { + (backend)(device, format) + }); + + if let Some(player_event_program) = setup.player_event_program.clone() { + _event_handler = Some(EventHandler::new( + player.get_player_event_channel(), + &player_event_program, + )); + + if setup.emit_sink_events { + player.set_sink_event_callback(Some(Box::new(move |sink_status| { + run_program_on_sink_events(sink_status, &player_event_program) + }))); + } + } + loop { tokio::select! { credentials = async { @@ -1737,6 +1762,9 @@ async fn main() { // Continue shutdown in its own task tokio::spawn(spirc_task); } + if !session.is_invalid() { + session.shutdown(); + } connecting = true; }, @@ -1749,32 +1777,16 @@ async fn main() { _ = async {}, if connecting && last_credentials.is_some() => { if session.is_invalid() { session = Session::new(setup.session_config.clone(), setup.cache.clone()); + player.set_session(session.clone()); } - let mixer_config = setup.mixer_config.clone(); - let mixer = (setup.mixer)(mixer_config); - let player_config = setup.player_config.clone(); let connect_config = setup.connect_config.clone(); - let soft_volume = mixer.get_soft_volume(); - let format = setup.format; - let backend = setup.backend; - let device = setup.device.clone(); - let player = Player::new(player_config, session.clone(), soft_volume, move || { - (backend)(device, format) - }); - - if let Some(player_event_program) = setup.player_event_program.clone() { - _event_handler = Some(EventHandler::new(player.get_player_event_channel(), &player_event_program)); - - if setup.emit_sink_events { - player.set_sink_event_callback(Some(Box::new(move |sink_status| { - run_program_on_sink_events(sink_status, &player_event_program) - }))); - } - }; - - let (spirc_, spirc_task_) = match Spirc::new(connect_config, session.clone(), last_credentials.clone().unwrap_or_default(), player, mixer).await { + let (spirc_, spirc_task_) = match Spirc::new(connect_config, + session.clone(), + last_credentials.clone().unwrap_or_default(), + player.clone(), + mixer.clone()).await { Ok((spirc_, spirc_task_)) => (spirc_, spirc_task_), Err(e) => { error!("could not initialize spirc: {}", e); @@ -1802,12 +1814,19 @@ async fn main() { if last_credentials.is_some() && !reconnect_exceeds_rate_limit() { auto_connect_times.push(Instant::now()); + if !session.is_invalid() { + session.shutdown(); + } connecting = true; } else { error!("Spirc shut down too often. Not reconnecting automatically."); exit(1); } }, + _ = async {}, if player.is_invalid() => { + error!("Player shut down unexpectedly"); + exit(1); + }, _ = tokio::signal::ctrl_c() => { break; }, diff --git a/src/player_event_handler.rs b/src/player_event_handler.rs index d9d6de21e..3d0a47df5 100644 --- a/src/player_event_handler.rs +++ b/src/player_event_handler.rs @@ -21,6 +21,10 @@ impl EventHandler { let mut env_vars = HashMap::new(); match event { + PlayerEvent::PlayRequestIdChanged { play_request_id } => { + env_vars.insert("PLAYER_EVENT", "play_request_id_changed".to_string()); + env_vars.insert("PLAY_REQUEST_ID", play_request_id.to_string()); + } PlayerEvent::TrackChanged { audio_item } => { match audio_item.track_id.to_base62() { Err(e) => {