diff --git a/CHANGELOG.md b/CHANGELOG.md index 62c7eb953..2634915c3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -36,6 +36,9 @@ https://github.com/librespot-org/librespot - [all] `chrono` replaced with `time` (breaking) - [all] `time` updated (CVE-2020-26235) - [all] Improve lock contention and performance (breaking) +- [all] Use a single `player` instance. Eliminates occasional `player` and + `audio backend` restarts, which can cause issues with some playback + configurations. - [audio] Files are now downloaded over the HTTPS CDN (breaking) - [audio] Improve file opening and seeking performance (breaking) - [core] MSRV is now 1.65 (breaking) diff --git a/Cargo.lock b/Cargo.lock index 1f301939f..d64310141 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2395,9 +2395,9 @@ dependencies = [ [[package]] name = "rustls-webpki" -version = "0.100.1" +version = "0.100.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d6207cd5ed3d8dca7816f8f3725513a34609c0c765bf652b8c3cb4cfd87db46b" +checksum = "e98ff011474fa39949b7e5c0428f9b4937eda7da7848bbb947786b7be0b27dab" dependencies = [ "ring", "untrusted", diff --git a/connect/src/spirc.rs b/connect/src/spirc.rs index 396970fdb..ed1e86131 100644 --- a/connect/src/spirc.rs +++ b/connect/src/spirc.rs @@ -3,6 +3,7 @@ use std::{ future::Future, pin::Pin, sync::atomic::{AtomicUsize, Ordering}, + sync::Arc, time::{SystemTime, UNIX_EPOCH}, }; @@ -77,8 +78,8 @@ enum SpircPlayStatus { type BoxedStream = Pin + Send>>; struct SpircTask { - player: Player, - mixer: Box, + player: Arc, + mixer: Arc, sequence: SeqGenerator, @@ -272,8 +273,8 @@ impl Spirc { config: ConnectConfig, session: Session, credentials: Credentials, - player: Player, - mixer: Box, + player: Arc, + mixer: Arc, ) -> Result<(Spirc, impl Future), Error> { let spirc_id = SPIRC_COUNTER.fetch_add(1, Ordering::AcqRel); debug!("new Spirc[{}]", spirc_id); @@ -663,6 +664,11 @@ impl SpircTask { } fn handle_player_event(&mut self, event: PlayerEvent) -> Result<(), Error> { + // update play_request_id + if let PlayerEvent::PlayRequestIdChanged { play_request_id } = event { + self.play_request_id = Some(play_request_id); + return Ok(()); + } // we only process events if the play_request_id matches. If it doesn't, it is // an event that belongs to a previous track and only arrives now due to a race // condition. In this case we have updated the state already and don't want to @@ -1462,7 +1468,7 @@ impl SpircTask { Some((track, index)) => { self.state.set_playing_track_index(index); - self.play_request_id = Some(self.player.load(track, start_playing, position_ms)); + self.player.load(track, start_playing, position_ms); self.update_state_position(position_ms); if start_playing { diff --git a/core/src/authentication.rs b/core/src/authentication.rs index a536b5587..8122d6591 100644 --- a/core/src/authentication.rs +++ b/core/src/authentication.rs @@ -27,7 +27,7 @@ impl From for Error { } /// The credentials are used to log into the Spotify API. -#[derive(Debug, Clone, Default, Serialize, Deserialize)] +#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)] pub struct Credentials { pub username: String, diff --git a/core/src/session.rs b/core/src/session.rs index 58397c8d5..3151c6a3e 100755 --- a/core/src/session.rs +++ b/core/src/session.rs @@ -176,7 +176,13 @@ impl Session { self.set_username(&reusable_credentials.username); if let Some(cache) = self.cache() { if store_credentials { - cache.save_credentials(&reusable_credentials); + let cred_changed = cache + .credentials() + .map(|c| c != reusable_credentials) + .unwrap_or(true); + if cred_changed { + cache.save_credentials(&reusable_credentials); + } } } diff --git a/examples/play_connect.rs b/examples/play_connect.rs index f7a84d43b..dd9c6780c 100644 --- a/examples/play_connect.rs +++ b/examples/play_connect.rs @@ -17,6 +17,7 @@ use librespot_metadata::{Album, Metadata}; use librespot_playback::mixer::{softmixer::SoftMixer, Mixer, MixerConfig}; use librespot_protocol::spirc::TrackRef; use std::env; +use std::sync::Arc; use tokio::join; #[tokio::main] @@ -55,7 +56,7 @@ async fn main() { session.clone(), credentials, player, - Box::new(SoftMixer::open(MixerConfig::default())), + Arc::new(SoftMixer::open(MixerConfig::default())), ) .await .unwrap(); diff --git a/playback/src/audio_backend/jackaudio.rs b/playback/src/audio_backend/jackaudio.rs index 1236e1f30..b38f4612d 100644 --- a/playback/src/audio_backend/jackaudio.rs +++ b/playback/src/audio_backend/jackaudio.rs @@ -55,8 +55,8 @@ impl Open for JackSink { let client_name = client_name.unwrap_or_else(|| "librespot".to_string()); let (client, _status) = Client::new(&client_name[..], ClientOptions::NO_START_SERVER).unwrap(); - let ch_r = client.register_port("out_0", AudioOut::default()).unwrap(); - let ch_l = client.register_port("out_1", AudioOut::default()).unwrap(); + let ch_r = client.register_port("out_0", AudioOut).unwrap(); + let ch_l = client.register_port("out_1", AudioOut).unwrap(); // buffer for samples from librespot (~10ms) let (tx, rx) = sync_channel::(NUM_CHANNELS as usize * 1024 * AudioFormat::F32.size()); let jack_data = JackData { diff --git a/playback/src/mixer/mod.rs b/playback/src/mixer/mod.rs index 2d89d30e9..9571c2a87 100644 --- a/playback/src/mixer/mod.rs +++ b/playback/src/mixer/mod.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use crate::config::VolumeCtrl; pub mod mappings; @@ -5,7 +7,7 @@ use self::mappings::MappedCtrl; pub struct NoOpVolume; -pub trait Mixer: Send { +pub trait Mixer: Send + Sync { fn open(config: MixerConfig) -> Self where Self: Sized; @@ -55,10 +57,10 @@ impl Default for MixerConfig { } } -pub type MixerFn = fn(MixerConfig) -> Box; +pub type MixerFn = fn(MixerConfig) -> Arc; -fn mk_sink(config: MixerConfig) -> Box { - Box::new(M::open(config)) +fn mk_sink(config: MixerConfig) -> Arc { + Arc::new(M::open(config)) } pub const MIXERS: &[(&str, MixerFn)] = &[ diff --git a/playback/src/player.rs b/playback/src/player.rs index aa29c818f..d99b7756d 100644 --- a/playback/src/player.rs +++ b/playback/src/player.rs @@ -51,7 +51,6 @@ pub type PlayerResult = Result<(), Error>; pub struct Player { commands: Option>, thread_handle: Option>, - play_request_id_generator: SeqGenerator, } #[derive(PartialEq, Eq, Debug, Clone, Copy)] @@ -79,6 +78,7 @@ struct PlayerInternal { auto_normalise_as_album: bool, player_id: usize, + play_request_id_generator: SeqGenerator, } pub static PLAYER_COUNTER: AtomicUsize = AtomicUsize::new(0); @@ -86,7 +86,6 @@ pub static PLAYER_COUNTER: AtomicUsize = AtomicUsize::new(0); enum PlayerCommand { Load { track_id: SpotifyId, - play_request_id: u64, play: bool, position_ms: u32, }, @@ -97,6 +96,7 @@ enum PlayerCommand { Pause, Stop, Seek(u32), + SetSession(Session), AddEventSender(mpsc::UnboundedSender), SetSinkEventCallback(Option), EmitVolumeChangedEvent(u16), @@ -123,6 +123,10 @@ enum PlayerCommand { #[derive(Debug, Clone)] pub enum PlayerEvent { + // Play request id changed + PlayRequestIdChanged { + play_request_id: u64, + }, // Fired when the player is stopped (e.g. by issuing a "stop" command to the player). Stopped { play_request_id: u64, @@ -318,7 +322,7 @@ impl Player { session: Session, volume_getter: Box, sink_builder: F, - ) -> Self + ) -> Arc where F: FnOnce() -> Box + Send + 'static, { @@ -349,6 +353,7 @@ impl Player { auto_normalise_as_album: false, player_id, + play_request_id_generator: SeqGenerator::new(0), }; // While PlayerInternal is written as a future, it still contains blocking code. @@ -382,11 +387,17 @@ impl Player { } }; - Self { + Arc::new(Self { commands: Some(cmd_tx), thread_handle: Some(handle), - play_request_id_generator: SeqGenerator::new(0), + }) + } + + pub fn is_invalid(&self) -> bool { + if let Some(handle) = self.thread_handle.as_ref() { + return handle.is_finished(); } + true } fn command(&self, cmd: PlayerCommand) { @@ -397,16 +408,12 @@ impl Player { } } - pub fn load(&mut self, track_id: SpotifyId, start_playing: bool, position_ms: u32) -> u64 { - let play_request_id = self.play_request_id_generator.get(); + pub fn load(&self, track_id: SpotifyId, start_playing: bool, position_ms: u32) { self.command(PlayerCommand::Load { track_id, - play_request_id, play: start_playing, position_ms, }); - - play_request_id } pub fn preload(&self, track_id: SpotifyId) { @@ -429,6 +436,10 @@ impl Player { self.command(PlayerCommand::Seek(position_ms)); } + pub fn set_session(&self, session: Session) { + self.command(PlayerCommand::SetSession(session)); + } + pub fn get_player_event_channel(&self) -> PlayerEventChannel { let (event_sender, event_receiver) = mpsc::unbounded_channel(); self.command(PlayerCommand::AddEventSender(event_sender)); @@ -1264,10 +1275,6 @@ impl Future for PlayerInternal { } } - if self.session.is_invalid() { - return Poll::Ready(()); - } - if (!self.state.is_playing()) && all_futures_completed_or_not_ready { return Poll::Pending; } @@ -1515,10 +1522,15 @@ impl PlayerInternal { fn handle_command_load( &mut self, track_id: SpotifyId, - play_request_id: u64, + play_request_id_option: Option, play: bool, position_ms: u32, ) -> PlayerResult { + let play_request_id = + play_request_id_option.unwrap_or(self.play_request_id_generator.get()); + + self.send_event(PlayerEvent::PlayRequestIdChanged { play_request_id }); + if !self.config.gapless { self.ensure_sink_stopped(play); } @@ -1771,7 +1783,7 @@ impl PlayerInternal { { return self.handle_command_load( track_id, - play_request_id, + Some(play_request_id), start_playback, position_ms, ); @@ -1828,10 +1840,9 @@ impl PlayerInternal { match cmd { PlayerCommand::Load { track_id, - play_request_id, play, position_ms, - } => self.handle_command_load(track_id, play_request_id, play, position_ms)?, + } => self.handle_command_load(track_id, None, play, position_ms)?, PlayerCommand::Preload { track_id } => self.handle_command_preload(track_id), @@ -1843,6 +1854,8 @@ impl PlayerInternal { PlayerCommand::Stop => self.handle_player_stop(), + PlayerCommand::SetSession(session) => self.session = session, + PlayerCommand::AddEventSender(sender) => self.event_senders.push(sender), PlayerCommand::SetSinkEventCallback(callback) => self.sink_event_callback = callback, @@ -2057,6 +2070,7 @@ impl fmt::Debug for PlayerCommand { PlayerCommand::Pause => f.debug_tuple("Pause").finish(), PlayerCommand::Stop => f.debug_tuple("Stop").finish(), PlayerCommand::Seek(position) => f.debug_tuple("Seek").field(&position).finish(), + PlayerCommand::SetSession(_) => f.debug_tuple("SetSession").finish(), PlayerCommand::AddEventSender(_) => f.debug_tuple("AddEventSender").finish(), PlayerCommand::SetSinkEventCallback(_) => { f.debug_tuple("SetSinkEventCallback").finish() diff --git a/src/main.rs b/src/main.rs index f959267a7..99a930c8e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1747,6 +1747,31 @@ async fn main() { exit(1); } + let mixer_config = setup.mixer_config.clone(); + let mixer = (setup.mixer)(mixer_config); + let player_config = setup.player_config.clone(); + + let soft_volume = mixer.get_soft_volume(); + let format = setup.format; + let backend = setup.backend; + let device = setup.device.clone(); + let player = Player::new(player_config, session.clone(), soft_volume, move || { + (backend)(device, format) + }); + + if let Some(player_event_program) = setup.player_event_program.clone() { + _event_handler = Some(EventHandler::new( + player.get_player_event_channel(), + &player_event_program, + )); + + if setup.emit_sink_events { + player.set_sink_event_callback(Some(Box::new(move |sink_status| { + run_program_on_sink_events(sink_status, &player_event_program) + }))); + } + } + loop { tokio::select! { credentials = async { @@ -1769,6 +1794,9 @@ async fn main() { // Continue shutdown in its own task tokio::spawn(spirc_task); } + if !session.is_invalid() { + session.shutdown(); + } connecting = true; }, @@ -1781,11 +1809,9 @@ async fn main() { _ = async {}, if connecting && last_credentials.is_some() => { if session.is_invalid() { session = Session::new(setup.session_config.clone(), setup.cache.clone()); + player.set_session(session.clone()); } - let mixer_config = setup.mixer_config.clone(); - let mixer = (setup.mixer)(mixer_config); - let player_config = setup.player_config.clone(); let connect_config = setup.connect_config.clone(); let soft_volume = mixer.get_soft_volume(); @@ -1844,12 +1870,19 @@ async fn main() { if last_credentials.is_some() && !reconnect_exceeds_rate_limit() { auto_connect_times.push(Instant::now()); + if !session.is_invalid() { + session.shutdown(); + } connecting = true; } else { error!("Spirc shut down too often. Not reconnecting automatically."); exit(1); } }, + _ = async {}, if player.is_invalid() => { + error!("Player shut down unexpectedly"); + exit(1); + }, _ = tokio::signal::ctrl_c() => { break; }, diff --git a/src/player_event_handler.rs b/src/player_event_handler.rs index 0c16a0535..8af195fc8 100644 --- a/src/player_event_handler.rs +++ b/src/player_event_handler.rs @@ -42,6 +42,10 @@ impl EventHandler { let mut env_vars = HashMap::new(); match event { + PlayerEvent::PlayRequestIdChanged { play_request_id } => { + env_vars.insert("PLAYER_EVENT", "play_request_id_changed".to_string()); + env_vars.insert("PLAY_REQUEST_ID", play_request_id.to_string()); + } PlayerEvent::TrackChanged { audio_item } => { match audio_item.track_id.to_base62() { Err(e) => {