From 903843fb673b17048030798caf469d1d594853c6 Mon Sep 17 00:00:00 2001 From: wisp3rwind <17089248+wisp3rwind@users.noreply.github.com> Date: Tue, 1 Oct 2024 11:32:52 +0200 Subject: [PATCH 01/17] discovery: use opaque error type for DnsSdError This helps to decouple discovery and core by not leaking implementation details of the zeroconf backend into Error conversion impls in core. --- Cargo.lock | 1 - Cargo.toml | 2 +- core/Cargo.toml | 4 ---- core/src/error.rs | 10 ---------- discovery/Cargo.toml | 2 +- discovery/src/lib.rs | 11 ++++++++--- 6 files changed, 10 insertions(+), 20 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a20c2d43c..aed760d25 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1753,7 +1753,6 @@ dependencies = [ "byteorder", "bytes", "data-encoding", - "dns-sd", "form_urlencoded", "futures-core", "futures-util", diff --git a/Cargo.toml b/Cargo.toml index fa5da5c38..cecb0cbc0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -75,7 +75,7 @@ rodiojack-backend = ["librespot-playback/rodiojack-backend"] sdl-backend = ["librespot-playback/sdl-backend"] gstreamer-backend = ["librespot-playback/gstreamer-backend"] -with-dns-sd = ["librespot-core/with-dns-sd", "librespot-discovery/with-dns-sd"] +with-dns-sd = ["librespot-discovery/with-dns-sd"] passthrough-decoder = ["librespot-playback/passthrough-decoder"] diff --git a/core/Cargo.toml b/core/Cargo.toml index 93357f938..a9208bdfe 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -22,7 +22,6 @@ aes = "0.8" base64 = "0.22" byteorder = "1.4" bytes = "1" -dns-sd = { version = "0.1", optional = true } form_urlencoded = "1.0" futures-core = "0.3" futures-util = { version = "0.3", features = ["alloc", "bilock", "sink", "unstable"] } @@ -71,6 +70,3 @@ vergen-gitcl = { version = "1.0.0", default-features = false, features = ["build [dev-dependencies] tokio = { version = "1", features = ["macros", "parking_lot"] } - -[features] -with-dns-sd = ["dns-sd"] diff --git a/core/src/error.rs b/core/src/error.rs index b18ce91a5..6b0178c95 100644 --- a/core/src/error.rs +++ b/core/src/error.rs @@ -21,9 +21,6 @@ use url::ParseError; use librespot_oauth::OAuthError; -#[cfg(feature = "with-dns-sd")] -use dns_sd::DNSError; - #[derive(Debug)] pub struct Error { pub kind: ErrorKind, @@ -314,13 +311,6 @@ impl From for Error { } } -#[cfg(feature = "with-dns-sd")] -impl From for Error { - fn from(err: DNSError) -> Self { - Self::new(ErrorKind::Unavailable, err) - } -} - impl From for Error { fn from(err: http::Error) -> Self { if err.is::() diff --git a/discovery/Cargo.toml b/discovery/Cargo.toml index 863f5d8f0..a13450786 100644 --- a/discovery/Cargo.toml +++ b/discovery/Cargo.toml @@ -39,4 +39,4 @@ hex = "0.4" tokio = { version = "1", features = ["macros", "parking_lot", "rt"] } [features] -with-dns-sd = ["dns-sd", "librespot-core/with-dns-sd"] +with-dns-sd = ["dns-sd"] diff --git a/discovery/src/lib.rs b/discovery/src/lib.rs index f1f0f6920..dc8fb85f1 100644 --- a/discovery/src/lib.rs +++ b/discovery/src/lib.rs @@ -11,7 +11,7 @@ mod server; use std::{ borrow::Cow, - io, + error::Error as StdError, pin::Pin, task::{Context, Poll}, }; @@ -55,12 +55,16 @@ pub struct Builder { pub enum DiscoveryError { #[error("Creating SHA1 block cipher failed")] AesError(#[from] aes::cipher::InvalidLength), + #[error("Setting up dns-sd failed: {0}")] - DnsSdError(#[from] io::Error), + DnsSdError(#[source] Box), + #[error("Creating SHA1 HMAC failed for base key {0:?}")] HmacError(Vec), + #[error("Setting up the HTTP server failed: {0}")] HttpServerError(#[from] hyper::Error), + #[error("Missing params for key {0}")] ParamsError(&'static str), } @@ -144,7 +148,8 @@ impl Builder { None, port, &["VERSION=1.0", "CPath=/"], - )?; + ) + .map_err(|e| DiscoveryError::DnsSdError(Box::new(e)))?; } #[cfg(not(feature = "with-dns-sd"))] From 09a6171b6b47cca2b7db8a91e0f436bc72d66ea0 Mon Sep 17 00:00:00 2001 From: wisp3rwind <17089248+wisp3rwind@users.noreply.github.com> Date: Mon, 23 Sep 2024 16:25:57 +0200 Subject: [PATCH 02/17] discovery: map all MDNS/DNS-SD errors to DiscoveryError::DnsSdError previously, libmdns errors would use a generic conversion from std::io::Error to core::Error --- discovery/src/lib.rs | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/discovery/src/lib.rs b/discovery/src/lib.rs index dc8fb85f1..638ca8e24 100644 --- a/discovery/src/lib.rs +++ b/discovery/src/lib.rs @@ -137,11 +137,10 @@ impl Builder { let name = self.server_config.name.clone().into_owned(); let server = DiscoveryServer::new(self.server_config, &mut port)?; let _zeroconf_ip = self.zeroconf_ip; - let svc; #[cfg(feature = "with-dns-sd")] - { - svc = dns_sd::DNSService::register( + let svc = { + dns_sd::DNSService::register( Some(name.as_ref()), "_spotify-connect._tcp", None, @@ -149,26 +148,27 @@ impl Builder { port, &["VERSION=1.0", "CPath=/"], ) - .map_err(|e| DiscoveryError::DnsSdError(Box::new(e)))?; - } + .map_err(|e| DiscoveryError::DnsSdError(Box::new(e)))? + }; #[cfg(not(feature = "with-dns-sd"))] - { - let _svc = if !_zeroconf_ip.is_empty() { + let svc = { + if !_zeroconf_ip.is_empty() { libmdns::Responder::spawn_with_ip_list( &tokio::runtime::Handle::current(), _zeroconf_ip, - )? + ) } else { - libmdns::Responder::spawn(&tokio::runtime::Handle::current())? - }; - svc = _svc.register( + libmdns::Responder::spawn(&tokio::runtime::Handle::current()) + } + .map_err(|e| DiscoveryError::DnsSdError(Box::new(e)))? + .register( "_spotify-connect._tcp".to_owned(), name, port, &["VERSION=1.0", "CPath=/"], - ); - } + ) + }; Ok(Discovery { server, _svc: svc }) } From 62e14e55f491853c0a1d9e12acacac5f95bcc737 Mon Sep 17 00:00:00 2001 From: wisp3rwind <17089248+wisp3rwind@users.noreply.github.com> Date: Mon, 23 Sep 2024 16:29:03 +0200 Subject: [PATCH 03/17] discovery: de-duplicate zerconf data into module consts --- discovery/src/lib.rs | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/discovery/src/lib.rs b/discovery/src/lib.rs index 638ca8e24..595186549 100644 --- a/discovery/src/lib.rs +++ b/discovery/src/lib.rs @@ -81,6 +81,9 @@ impl From for Error { } } +const DNS_SD_SERVICE_NAME: &str = "_spotify-connect._tcp"; +const TXT_RECORD: [&str; 2] = ["VERSION=1.0", "CPath=/"]; + impl Builder { /// Starts a new builder using the provided device and client IDs. pub fn new>(device_id: T, client_id: T) -> Self { @@ -142,11 +145,11 @@ impl Builder { let svc = { dns_sd::DNSService::register( Some(name.as_ref()), - "_spotify-connect._tcp", + &DNS_SD_SERVICE_NAME, None, None, port, - &["VERSION=1.0", "CPath=/"], + &TXT_RECORD, ) .map_err(|e| DiscoveryError::DnsSdError(Box::new(e)))? }; @@ -162,12 +165,7 @@ impl Builder { libmdns::Responder::spawn(&tokio::runtime::Handle::current()) } .map_err(|e| DiscoveryError::DnsSdError(Box::new(e)))? - .register( - "_spotify-connect._tcp".to_owned(), - name, - port, - &["VERSION=1.0", "CPath=/"], - ) + .register(DNS_SD_SERVICE_NAME.to_owned(), name, port, &TXT_RECORD) }; Ok(Discovery { server, _svc: svc }) From 9e6c615a2a7fef7f977b5a4b34e4c7ca6f0310d4 Mon Sep 17 00:00:00 2001 From: wisp3rwind <17089248+wisp3rwind@users.noreply.github.com> Date: Mon, 23 Sep 2024 16:39:30 +0200 Subject: [PATCH 04/17] discovery: use an opaque type for the handle to the DNS-SD service in preparation for adding another backend: The only purpose of this is to unregister the service on drop. Thus, it is much easier to work with an opaque type, which avoids proliferation #[cfg(...)] gates. --- discovery/src/lib.rs | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/discovery/src/lib.rs b/discovery/src/lib.rs index 595186549..4546baf1d 100644 --- a/discovery/src/lib.rs +++ b/discovery/src/lib.rs @@ -10,6 +10,7 @@ mod server; use std::{ + any::Any, borrow::Cow, error::Error as StdError, pin::Pin, @@ -37,10 +38,9 @@ pub use crate::core::config::DeviceType; pub struct Discovery { server: DiscoveryServer, - #[cfg(not(feature = "with-dns-sd"))] - _svc: libmdns::Service, - #[cfg(feature = "with-dns-sd")] - _svc: dns_sd::DNSService, + /// An opaque handle to the DNS-SD service. Dropping this will unregister the service. + #[allow(unused)] + svc: Box, } /// A builder for [`Discovery`]. @@ -168,7 +168,10 @@ impl Builder { .register(DNS_SD_SERVICE_NAME.to_owned(), name, port, &TXT_RECORD) }; - Ok(Discovery { server, _svc: svc }) + Ok(Discovery { + server, + svc: Box::new(svc), + }) } } From 5383be11691ff388e714fad3f935a621b5f6f759 Mon Sep 17 00:00:00 2001 From: wisp3rwind <17089248+wisp3rwind@users.noreply.github.com> Date: Tue, 24 Sep 2024 13:14:35 +0200 Subject: [PATCH 05/17] discovery: move service registration into separate functions in preparation for re-working feature flags and adding another backend --- discovery/src/lib.rs | 78 +++++++++++++++++++++++++++----------------- 1 file changed, 48 insertions(+), 30 deletions(-) diff --git a/discovery/src/lib.rs b/discovery/src/lib.rs index 4546baf1d..cded9a3e3 100644 --- a/discovery/src/lib.rs +++ b/discovery/src/lib.rs @@ -84,6 +84,47 @@ impl From for Error { const DNS_SD_SERVICE_NAME: &str = "_spotify-connect._tcp"; const TXT_RECORD: [&str; 2] = ["VERSION=1.0", "CPath=/"]; +#[cfg(feature = "with-dns-sd")] +fn launch_dns_sd( + name: Cow<'static, str>, + _zeroconf_ip: Vec, + port: u16, +) -> Result, Error> { + let svc = dns_sd::DNSService::register( + Some(name.as_ref()), + DNS_SD_SERVICE_NAME, + None, + None, + port, + &TXT_RECORD, + ) + .map_err(|e| DiscoveryError::DnsSdError(Box::new(e)))?; + + Ok(Box::new(svc)) +} + +#[cfg(not(feature = "with-dns-sd"))] +fn launch_libmdns( + name: Cow<'static, str>, + zeroconf_ip: Vec, + port: u16, +) -> Result, Error> { + let svc = if !zeroconf_ip.is_empty() { + libmdns::Responder::spawn_with_ip_list(&tokio::runtime::Handle::current(), zeroconf_ip) + } else { + libmdns::Responder::spawn(&tokio::runtime::Handle::current()) + } + .map_err(|e| DiscoveryError::DnsSdError(Box::new(e)))? + .register( + DNS_SD_SERVICE_NAME.to_owned(), + name.into_owned(), + port, + &TXT_RECORD, + ); + + Ok(Box::new(svc)) +} + impl Builder { /// Starts a new builder using the provided device and client IDs. pub fn new>(device_id: T, client_id: T) -> Self { @@ -136,42 +177,19 @@ impl Builder { /// # Errors /// If setting up the mdns service or creating the server fails, this function returns an error. pub fn launch(self) -> Result { + let name = self.server_config.name.clone(); + let zeroconf_ip = self.zeroconf_ip; + let mut port = self.port; - let name = self.server_config.name.clone().into_owned(); let server = DiscoveryServer::new(self.server_config, &mut port)?; - let _zeroconf_ip = self.zeroconf_ip; #[cfg(feature = "with-dns-sd")] - let svc = { - dns_sd::DNSService::register( - Some(name.as_ref()), - &DNS_SD_SERVICE_NAME, - None, - None, - port, - &TXT_RECORD, - ) - .map_err(|e| DiscoveryError::DnsSdError(Box::new(e)))? - }; + let svc = launch_dns_sd(name, zeroconf_ip, port)?; #[cfg(not(feature = "with-dns-sd"))] - let svc = { - if !_zeroconf_ip.is_empty() { - libmdns::Responder::spawn_with_ip_list( - &tokio::runtime::Handle::current(), - _zeroconf_ip, - ) - } else { - libmdns::Responder::spawn(&tokio::runtime::Handle::current()) - } - .map_err(|e| DiscoveryError::DnsSdError(Box::new(e)))? - .register(DNS_SD_SERVICE_NAME.to_owned(), name, port, &TXT_RECORD) - }; - - Ok(Discovery { - server, - svc: Box::new(svc), - }) + let svc = launch_libmdns(name, zeroconf_ip, port)?; + + Ok(Discovery { server, svc }) } } From 283a501bbe9a91fe2e06ae4b5fe2104561c0b592 Mon Sep 17 00:00:00 2001 From: wisp3rwind <17089248+wisp3rwind@users.noreply.github.com> Date: Mon, 23 Sep 2024 17:32:22 +0200 Subject: [PATCH 06/17] discovery: make features additive i.e. add with-libmdns instead of using not(with-dns-sd). This is in preparation for adding another backend, and in general recommended, cf. https://doc.rust-lang.org/cargo/reference/features.html#mutually-exclusive-features The logic is such that enabling with-dns-sd in addition to the default with-libmdns will still end up using dns-sd, as before. If only with-libmdns is enabled, that will be the default. If none of the features is enabled, attempting to build a `Discovery` will yield an error. --- Cargo.toml | 4 ++- discovery/Cargo.toml | 5 +++- discovery/src/lib.rs | 63 +++++++++++++++++++++++++++++++++++++++----- 3 files changed, 63 insertions(+), 9 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index cecb0cbc0..cae18b0df 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,6 +36,7 @@ version = "0.5.0" [dependencies.librespot-discovery] path = "discovery" version = "0.5.0" +default-features = false [dependencies.librespot-metadata] path = "metadata" @@ -76,10 +77,11 @@ sdl-backend = ["librespot-playback/sdl-backend"] gstreamer-backend = ["librespot-playback/gstreamer-backend"] with-dns-sd = ["librespot-discovery/with-dns-sd"] +with-libmdns = ["librespot-discovery/with-libmdns"] passthrough-decoder = ["librespot-playback/passthrough-decoder"] -default = ["rodio-backend"] +default = ["rodio-backend", "with-libmdns"] [package.metadata.deb] maintainer = "librespot-org" diff --git a/discovery/Cargo.toml b/discovery/Cargo.toml index a13450786..58c3e436f 100644 --- a/discovery/Cargo.toml +++ b/discovery/Cargo.toml @@ -21,7 +21,7 @@ hmac = "0.12" hyper = { version = "1.3", features = ["http1"] } hyper-util = { version = "0.1", features = ["server-auto", "server-graceful", "service"] } http-body-util = "0.1.1" -libmdns = "0.9" +libmdns = { version = "0.9", optional = true } log = "0.4" rand = "0.8" serde_json = "1.0" @@ -40,3 +40,6 @@ tokio = { version = "1", features = ["macros", "parking_lot", "rt"] } [features] with-dns-sd = ["dns-sd"] +with-libmdns = ["libmdns"] + +default = ["with-libmdns"] diff --git a/discovery/src/lib.rs b/discovery/src/lib.rs index cded9a3e3..79648a45b 100644 --- a/discovery/src/lib.rs +++ b/discovery/src/lib.rs @@ -31,6 +31,49 @@ pub use crate::core::authentication::Credentials; /// Determining the icon in the list of available devices. pub use crate::core::config::DeviceType; +pub type ServiceBuilder = + fn(Cow<'static, str>, Vec, u16) -> Result, Error>; + +// Default goes first: This matches the behaviour when feature flags were exlusive, i.e. when there +// was only `feature = "with-dns-sd"` or `not(feature = "with-dns-sd")` +pub const BACKENDS: &[( + &str, + // If None, the backend is known but wasn't compiled. + Option, +)] = &[ + #[cfg(feature = "with-dns-sd")] + ("dns-sd", Some(launch_dns_sd)), + #[cfg(not(feature = "with-dns-sd"))] + ("dns-sd", None), + #[cfg(feature = "with-libmdns")] + ("libmdns", Some(launch_libmdns)), + #[cfg(not(feature = "with-libmdns"))] + ("libmdns", None), +]; + +pub fn find(name: Option<&str>) -> Result { + if let Some(ref name) = name { + match BACKENDS.iter().find(|(id, _)| name == id) { + Some((_id, Some(launch_svc))) => Ok(*launch_svc), + Some((_id, None)) => Err(Error::unavailable(format!( + "librespot built without '{}' support", + name + ))), + None => Err(Error::not_found(format!( + "unknown zeroconf backend '{}'", + name + ))), + } + } else { + BACKENDS + .iter() + .find_map(|(_, launch_svc)| *launch_svc) + .ok_or(Error::unavailable( + "librespot built without zeroconf backends", + )) + } +} + /// Makes this device visible to Spotify clients in the local network. /// /// `Discovery` implements the [`Stream`] trait. Every time this device @@ -48,6 +91,7 @@ pub struct Builder { server_config: server::Config, port: u16, zeroconf_ip: Vec, + zeroconf_backend: Option, } /// Errors that can occur while setting up a [`Discovery`] instance. @@ -81,7 +125,9 @@ impl From for Error { } } +#[allow(unused)] const DNS_SD_SERVICE_NAME: &str = "_spotify-connect._tcp"; +#[allow(unused)] const TXT_RECORD: [&str; 2] = ["VERSION=1.0", "CPath=/"]; #[cfg(feature = "with-dns-sd")] @@ -103,7 +149,7 @@ fn launch_dns_sd( Ok(Box::new(svc)) } -#[cfg(not(feature = "with-dns-sd"))] +#[cfg(feature = "with-libmdns")] fn launch_libmdns( name: Cow<'static, str>, zeroconf_ip: Vec, @@ -138,6 +184,7 @@ impl Builder { }, port: 0, zeroconf_ip: vec![], + zeroconf_backend: None, } } @@ -165,6 +212,12 @@ impl Builder { self } + /// Set the zeroconf (MDNS and DNS-SD) implementation to use. + pub fn zeroconf_backend(mut self, zeroconf_backend: ServiceBuilder) -> Self { + self.zeroconf_backend = Some(zeroconf_backend); + self + } + /// Sets the port on which it should listen to incoming connections. /// The default value `0` means any port. pub fn port(mut self, port: u16) -> Self { @@ -183,12 +236,8 @@ impl Builder { let mut port = self.port; let server = DiscoveryServer::new(self.server_config, &mut port)?; - #[cfg(feature = "with-dns-sd")] - let svc = launch_dns_sd(name, zeroconf_ip, port)?; - - #[cfg(not(feature = "with-dns-sd"))] - let svc = launch_libmdns(name, zeroconf_ip, port)?; - + let launch_svc = self.zeroconf_backend.unwrap_or(find(None)?); + let svc = launch_svc(name, zeroconf_ip, port)?; Ok(Discovery { server, svc }) } } From c8744a52364f96d83f94297ea3d4da8c1628e7a4 Mon Sep 17 00:00:00 2001 From: wisp3rwind <17089248+wisp3rwind@users.noreply.github.com> Date: Mon, 23 Sep 2024 17:33:41 +0200 Subject: [PATCH 07/17] discovery: add --zeroconf-backend CLI flag --- src/main.rs | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/src/main.rs b/src/main.rs index f87f332ee..e44300db7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -217,6 +217,7 @@ struct Setup { player_event_program: Option, emit_sink_events: bool, zeroconf_ip: Vec, + zeroconf_backend: librespot::discovery::ServiceBuilder, } fn get_setup() -> Setup { @@ -277,6 +278,7 @@ fn get_setup() -> Setup { const VOLUME_RANGE: &str = "volume-range"; const ZEROCONF_PORT: &str = "zeroconf-port"; const ZEROCONF_INTERFACE: &str = "zeroconf-interface"; + const ZEROCONF_BACKEND: &str = "zeroconf-backend"; // Mostly arbitrary. const AP_PORT_SHORT: &str = "a"; @@ -327,6 +329,7 @@ fn get_setup() -> Setup { const NORMALISATION_RELEASE_SHORT: &str = "y"; const NORMALISATION_THRESHOLD_SHORT: &str = "Z"; const ZEROCONF_PORT_SHORT: &str = "z"; + const ZEROCONF_BACKEND_SHORT: &str = ""; // no short flag // Options that have different descriptions // depending on what backends were enabled at build time. @@ -638,6 +641,12 @@ fn get_setup() -> Setup { ZEROCONF_INTERFACE, "Comma-separated interface IP addresses on which zeroconf will bind. Defaults to all interfaces. Ignored by DNS-SD.", "IP" + ) + .optopt( + ZEROCONF_BACKEND_SHORT, + ZEROCONF_BACKEND, + "Zeroconf (MDNS/DNS-SD) backend to use. Valid values are 'dns-sd' and 'libmdns', if librespot is compiled with the corresponding feature flags.", + "BACKEND" ); #[cfg(feature = "passthrough-decoder")] @@ -1293,6 +1302,29 @@ fn get_setup() -> Setup { vec![] }; + let zeroconf_backend_name = opt_str(ZEROCONF_BACKEND); + let zeroconf_backend = librespot::discovery::find(zeroconf_backend_name.as_deref()) + .unwrap_or_else(|_| { + let available_backends: Vec<_> = librespot::discovery::BACKENDS + .iter() + .filter_map(|(id, launch_svc)| launch_svc.map(|_| *id)) + .collect(); + let default_backend = librespot::discovery::BACKENDS + .iter() + .find_map(|(id, launch_svc)| launch_svc.map(|_| *id)) + .unwrap_or(""); + + invalid_error_msg( + ZEROCONF_BACKEND, + ZEROCONF_BACKEND_SHORT, + &zeroconf_backend_name.unwrap_or_default(), + &available_backends.join(", "), + default_backend, + ); + + exit(1); + }); + let connect_config = { let connect_default_config = ConnectConfig::default(); @@ -1739,6 +1771,7 @@ fn get_setup() -> Setup { player_event_program, emit_sink_events, zeroconf_ip, + zeroconf_backend, } } @@ -1787,6 +1820,7 @@ async fn main() { .is_group(setup.connect_config.is_group) .port(setup.zeroconf_port) .zeroconf_ip(setup.zeroconf_ip.clone()) + .zeroconf_backend(setup.zeroconf_backend) .launch() { Ok(d) => break Some(d), From 512c5580aa76468e88e58e2dcb4a735729fbb631 Mon Sep 17 00:00:00 2001 From: wisp3rwind <17089248+wisp3rwind@users.noreply.github.com> Date: Mon, 23 Sep 2024 18:02:14 +0200 Subject: [PATCH 08/17] discovery: Add minimal Avahi zeroconf backend --- Cargo.toml | 1 + discovery/Cargo.toml | 3 + discovery/src/avahi.rs | 151 +++++++++++++++++++++++++++++++++++++++++ discovery/src/lib.rs | 65 ++++++++++++++++++ src/main.rs | 2 +- 5 files changed, 221 insertions(+), 1 deletion(-) create mode 100644 discovery/src/avahi.rs diff --git a/Cargo.toml b/Cargo.toml index cae18b0df..090e57b2a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -76,6 +76,7 @@ rodiojack-backend = ["librespot-playback/rodiojack-backend"] sdl-backend = ["librespot-playback/sdl-backend"] gstreamer-backend = ["librespot-playback/gstreamer-backend"] +with-avahi = ["librespot-discovery/with-avahi"] with-dns-sd = ["librespot-discovery/with-dns-sd"] with-libmdns = ["librespot-discovery/with-libmdns"] diff --git a/discovery/Cargo.toml b/discovery/Cargo.toml index 58c3e436f..e26795f76 100644 --- a/discovery/Cargo.toml +++ b/discovery/Cargo.toml @@ -24,10 +24,12 @@ http-body-util = "0.1.1" libmdns = { version = "0.9", optional = true } log = "0.4" rand = "0.8" +serde = { version = "1", default-features = false, features = ["derive"], optional = true } serde_json = "1.0" sha1 = "0.10" thiserror = "1.0" tokio = { version = "1", features = ["parking_lot", "sync", "rt"] } +zbus = { version = "4", default-features = false, features = ["tokio"], optional = true } [dependencies.librespot-core] path = "../core" @@ -39,6 +41,7 @@ hex = "0.4" tokio = { version = "1", features = ["macros", "parking_lot", "rt"] } [features] +with-avahi = ["zbus", "serde"] with-dns-sd = ["dns-sd"] with-libmdns = ["libmdns"] diff --git a/discovery/src/avahi.rs b/discovery/src/avahi.rs new file mode 100644 index 000000000..a12a996a2 --- /dev/null +++ b/discovery/src/avahi.rs @@ -0,0 +1,151 @@ +#![cfg(feature = "with-avahi")] + +#[allow(unused)] +pub use server::ServerProxy; + +#[allow(unused)] +pub use entry_group::{ + EntryGroupProxy, EntryGroupState, StateChangedStream as EntryGroupStateChangedStream, +}; + +mod server { + // This is not the full interface, just the methods we need! + // Avahi also implements a newer version of the interface ("org.freedesktop.Avahi.Server2"), but + // the additions are not relevant for us, and the older version is not intended to be deprecated. + // cf. the release notes for 0.8 at https://github.com/avahi/avahi/blob/master/docs/NEWS + #[zbus::proxy( + interface = "org.freedesktop.Avahi.Server", + default_service = "org.freedesktop.Avahi", + default_path = "/", + gen_blocking = false + )] + trait Server { + /// EntryGroupNew method + #[zbus(object = "super::entry_group::EntryGroup")] + fn entry_group_new(&self); + + /// GetState method + fn get_state(&self) -> zbus::Result; + + /// StateChanged signal + #[zbus(signal)] + fn state_changed(&self, state: i32, error: &str) -> zbus::Result<()>; + } +} + +mod entry_group { + use serde::Deserialize; + use zbus::zvariant; + + #[derive(Clone, Copy, Debug, Deserialize)] + #[repr(i32)] + pub enum EntryGroupState { + // The group has not yet been committed, the user must still call avahi_entry_group_commit() + Uncommited = 0, + // The entries of the group are currently being registered + Registering = 1, + // The entries have successfully been established + Established = 2, + // A name collision for one of the entries in the group has been detected, the entries have been withdrawn + Collision = 3, + // Some kind of failure happened, the entries have been withdrawn + Failure = 4, + } + + impl zvariant::Type for EntryGroupState { + fn signature() -> zvariant::Signature<'static> { + zvariant::Signature::try_from("i").unwrap() + } + } + + #[zbus::proxy( + interface = "org.freedesktop.Avahi.EntryGroup", + default_service = "org.freedesktop.Avahi", + gen_blocking = false + )] + trait EntryGroup { + /// AddAddress method + fn add_address( + &self, + interface: i32, + protocol: i32, + flags: u32, + name: &str, + address: &str, + ) -> zbus::Result<()>; + + /// AddRecord method + #[allow(clippy::too_many_arguments)] + fn add_record( + &self, + interface: i32, + protocol: i32, + flags: u32, + name: &str, + clazz: u16, + type_: u16, + ttl: u32, + rdata: &[u8], + ) -> zbus::Result<()>; + + /// AddService method + #[allow(clippy::too_many_arguments)] + fn add_service( + &self, + interface: i32, + protocol: i32, + flags: u32, + name: &str, + type_: &str, + domain: &str, + host: &str, + port: u16, + txt: &[&[u8]], + ) -> zbus::Result<()>; + + /// AddServiceSubtype method + #[allow(clippy::too_many_arguments)] + fn add_service_subtype( + &self, + interface: i32, + protocol: i32, + flags: u32, + name: &str, + type_: &str, + domain: &str, + subtype: &str, + ) -> zbus::Result<()>; + + /// Commit method + fn commit(&self) -> zbus::Result<()>; + + /// Free method + fn free(&self) -> zbus::Result<()>; + + /// GetState method + fn get_state(&self) -> zbus::Result; + + /// IsEmpty method + fn is_empty(&self) -> zbus::Result; + + /// Reset method + fn reset(&self) -> zbus::Result<()>; + + /// UpdateServiceTxt method + #[allow(clippy::too_many_arguments)] + fn update_service_txt( + &self, + interface: i32, + protocol: i32, + flags: u32, + name: &str, + type_: &str, + domain: &str, + txt: &[&[u8]], + ) -> zbus::Result<()>; + + /// StateChanged signal + #[zbus(signal)] + fn state_changed(&self, state: EntryGroupState, error: &str) -> zbus::Result<()>; + } +} diff --git a/discovery/src/lib.rs b/discovery/src/lib.rs index 79648a45b..943b9bff0 100644 --- a/discovery/src/lib.rs +++ b/discovery/src/lib.rs @@ -7,6 +7,7 @@ //! This library uses mDNS and DNS-SD so that other devices can find it, //! and spawns an http server to answer requests of Spotify clients. +mod avahi; mod server; use std::{ @@ -41,6 +42,10 @@ pub const BACKENDS: &[( // If None, the backend is known but wasn't compiled. Option, )] = &[ + #[cfg(feature = "with-avahi")] + ("avahi", Some(launch_avahi)), + #[cfg(not(feature = "with-avahi"))] + ("avahi", None), #[cfg(feature = "with-dns-sd")] ("dns-sd", Some(launch_dns_sd)), #[cfg(not(feature = "with-dns-sd"))] @@ -130,6 +135,66 @@ const DNS_SD_SERVICE_NAME: &str = "_spotify-connect._tcp"; #[allow(unused)] const TXT_RECORD: [&str; 2] = ["VERSION=1.0", "CPath=/"]; +#[cfg(feature = "with-avahi")] +async fn avahi_task(name: Cow<'static, str>, port: u16) -> zbus::Result<()> { + use self::avahi::ServerProxy; + + let conn = zbus::Connection::system().await?; + + // Connect to Avahi and publish the service + let avahi_server = ServerProxy::new(&conn).await?; + log::trace!("Connected to Avahi"); + + let entry_group = avahi_server.entry_group_new().await?; + + entry_group + .add_service( + -1, // AVAHI_IF_UNSPEC + -1, // IPv4 and IPv6 + 0, // flags + &name, + DNS_SD_SERVICE_NAME, // type + "", // domain: let the server choose + "", // host: let the server choose + port, + &TXT_RECORD.map(|s| s.as_bytes()), + ) + .await?; + + entry_group.commit().await?; + log::debug!("Commited zeroconf service with name {}", &name); + + let _: () = std::future::pending().await; + + Ok(()) +} + +#[cfg(feature = "with-avahi")] +fn launch_avahi( + name: Cow<'static, str>, + _zeroconf_ip: Vec, + port: u16, +) -> Result, Error> { + let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>(); + tokio::spawn(async move { + tokio::select! { + res = avahi_task(name, port) => { + if let Err(e) = res { + log::error!("Avahi error {}, shutting down discovery", e); + } + }, + _ = shutdown_rx => { + log::debug!("Un-publishing zeroconf service") + // FIXME: Call EntryGroup.Free() and ensure that the future + // continues to be polled until that has completed. + }, + } + }); + + // Dropping the shutdown_tx will wake the shutdown_rx.await + Ok(Box::new(shutdown_tx)) +} + #[cfg(feature = "with-dns-sd")] fn launch_dns_sd( name: Cow<'static, str>, diff --git a/src/main.rs b/src/main.rs index e44300db7..20e96e48e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -645,7 +645,7 @@ fn get_setup() -> Setup { .optopt( ZEROCONF_BACKEND_SHORT, ZEROCONF_BACKEND, - "Zeroconf (MDNS/DNS-SD) backend to use. Valid values are 'dns-sd' and 'libmdns', if librespot is compiled with the corresponding feature flags.", + "Zeroconf (MDNS/DNS-SD) backend to use. Valid values are 'avahi', 'dns-sd' and 'libmdns', if librespot is compiled with the corresponding feature flags.", "BACKEND" ); From c9f0aeb7c0408f71f99e732e4fb7fa6d1a4d6497 Mon Sep 17 00:00:00 2001 From: wisp3rwind <17089248+wisp3rwind@users.noreply.github.com> Date: Tue, 24 Sep 2024 11:52:34 +0200 Subject: [PATCH 09/17] bump MSRV to 1.75 required by zbus >= 4 --- .devcontainer/Dockerfile | 2 +- .github/workflows/test.yml | 6 +++--- CHANGELOG.md | 1 + Cargo.toml | 2 +- contrib/Dockerfile | 2 +- oauth/Cargo.toml | 2 +- 6 files changed, 8 insertions(+), 7 deletions(-) diff --git a/.devcontainer/Dockerfile b/.devcontainer/Dockerfile index a28250673..ce845a52b 100644 --- a/.devcontainer/Dockerfile +++ b/.devcontainer/Dockerfile @@ -1,6 +1,6 @@ # syntax=docker/dockerfile:1 ARG debian_version=slim-bookworm -ARG rust_version=1.74.0 +ARG rust_version=1.75.0 FROM rust:${rust_version}-${debian_version} ARG DEBIAN_FRONTEND=noninteractive diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index e0a0203ac..2e945fac7 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -109,7 +109,7 @@ jobs: matrix: os: [ubuntu-latest] toolchain: - - "1.74" # MSRV (Minimum supported rust version) + - "1.75" # MSRV (Minimum supported rust version) - stable experimental: [false] # Ignore failures in beta @@ -164,7 +164,7 @@ jobs: matrix: os: [windows-latest] toolchain: - - "1.74" # MSRV (Minimum supported rust version) + - "1.75" # MSRV (Minimum supported rust version) - stable steps: - name: Checkout code @@ -215,7 +215,7 @@ jobs: - aarch64-unknown-linux-gnu - riscv64gc-unknown-linux-gnu toolchain: - - "1.74" # MSRV (Minimum supported rust version) + - "1.75" # MSRV (Minimum supported rust version) - stable steps: - name: Checkout code diff --git a/CHANGELOG.md b/CHANGELOG.md index fdedb5544..14eb14639 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed - [core] The `access_token` for http requests is now acquired by `login5` +- [core] MSRV is now 1.75 (breaking) ### Added diff --git a/Cargo.toml b/Cargo.toml index 090e57b2a..c79c140cd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "librespot" version = "0.5.0" -rust-version = "1.74" +rust-version = "1.75" authors = ["Librespot Org"] license = "MIT" description = "An open source client library for Spotify, with support for Spotify Connect" diff --git a/contrib/Dockerfile b/contrib/Dockerfile index cf9725823..a36fef88c 100644 --- a/contrib/Dockerfile +++ b/contrib/Dockerfile @@ -29,7 +29,7 @@ RUN apt-get install -y curl git build-essential crossbuild-essential-arm64 cross RUN apt-get install -y libasound2-dev libasound2-dev:arm64 libasound2-dev:armel libasound2-dev:armhf RUN apt-get install -y libpulse0 libpulse0:arm64 libpulse0:armel libpulse0:armhf -RUN curl https://sh.rustup.rs -sSf | sh -s -- --default-toolchain 1.74 -y +RUN curl https://sh.rustup.rs -sSf | sh -s -- --default-toolchain 1.75 -y ENV PATH="/root/.cargo/bin/:${PATH}" RUN rustup target add aarch64-unknown-linux-gnu RUN rustup target add arm-unknown-linux-gnueabi diff --git a/oauth/Cargo.toml b/oauth/Cargo.toml index 3d52555ed..7426b87fc 100644 --- a/oauth/Cargo.toml +++ b/oauth/Cargo.toml @@ -15,4 +15,4 @@ thiserror = "1.0" url = "2.2" [dev-dependencies] -env_logger = { version = "0.11.2", default-features = false, features = ["color", "humantime", "auto-color"] } \ No newline at end of file +env_logger = { version = "0.11.2", default-features = false, features = ["color", "humantime", "auto-color"] } From 582537a0313b1d5b359d1e791901b0d80b613f00 Mon Sep 17 00:00:00 2001 From: wisp3rwind <17089248+wisp3rwind@users.noreply.github.com> Date: Tue, 24 Sep 2024 13:36:04 +0200 Subject: [PATCH 10/17] update Cargo.lock and docs --- CHANGELOG.md | 4 + COMPILING.md | 11 ++ Cargo.lock | 442 +++++++++++++++++++++++++++++++++++++++++++++++++-- 3 files changed, 446 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 14eb14639..e585ea8bb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,11 +11,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - [core] The `access_token` for http requests is now acquired by `login5` - [core] MSRV is now 1.75 (breaking) +- [discovery] librespot can now be compiled with multiple MDNS/DNS-SD backends + (avahi, dns_sd, libmdns) which can be selected using a CLI flag. The defaults + are unchanged (breaking). ### Added - [core] Add `login` (mobile) and `auth_token` retrieval via login5 - [core] Add `OS` and `os_version` to `config.rs` +- [discovery] Added a new MDNS/DNS-SD backend which connects to Avahi via D-Bus. ### Removed diff --git a/COMPILING.md b/COMPILING.md index d5b94b0e6..4b4b58af1 100644 --- a/COMPILING.md +++ b/COMPILING.md @@ -56,6 +56,17 @@ On Fedora systems: sudo dnf install alsa-lib-devel ``` +### Zeroconf library dependencies +Depending on the chosen backend, specific development libraries are required. + +*_Note this is an non-exhaustive list, open a PR to add to it!_* + +| Zeroconf backend | Debian/Ubuntu | Fedora | macOS | +|--------------------|------------------------------|-----------------------------------|-------------| +|avahi | | | | +|dns_sd | `libavahi-compat-libdnssd-dev pkg-config` | `avahi-compat-libdns_sd-devel` | | +|libmdns (default) | | | | + ### Getting the Source The recommended method is to first fork the repo, so that you have a copy that you have read/write access to. After that, it’s a simple case of cloning your fork. diff --git a/Cargo.lock b/Cargo.lock index aed760d25..028f41924 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -135,6 +135,114 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" +[[package]] +name = "async-broadcast" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "20cd0e2e25ea8e5f7e9df04578dc6cf5c83577fd09b1a46aaf5c85e1c33f2a7e" +dependencies = [ + "event-listener", + "event-listener-strategy", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-channel" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89b47800b0be77592da0afd425cc03468052844aff33b84e33cc696f64e77b6a" +dependencies = [ + "concurrent-queue", + "event-listener-strategy", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-io" +version = "2.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "444b0228950ee6501b3568d3c93bf1176a1fdbc3b758dcd9475046d30f4dc7e8" +dependencies = [ + "async-lock", + "cfg-if", + "concurrent-queue", + "futures-io", + "futures-lite", + "parking", + "polling", + "rustix", + "slab", + "tracing", + "windows-sys 0.59.0", +] + +[[package]] +name = "async-lock" +version = "3.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff6e472cdea888a4bd64f342f09b3f50e1886d32afe8df3d663c01140b811b18" +dependencies = [ + "event-listener", + "event-listener-strategy", + "pin-project-lite", +] + +[[package]] +name = "async-process" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "63255f1dc2381611000436537bbedfe83183faa303a5a0edaf191edef06526bb" +dependencies = [ + "async-channel", + "async-io", + "async-lock", + "async-signal", + "async-task", + "blocking", + "cfg-if", + "event-listener", + "futures-lite", + "rustix", + "tracing", +] + +[[package]] +name = "async-recursion" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b43422f69d8ff38f95f1b2bb76517c91589a924d1559a0e935d7c8ce0274c11" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.79", +] + +[[package]] +name = "async-signal" +version = "0.2.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "637e00349800c0bdf8bfc21ebbc0b6524abea702b0da4168ac00d070d0c0b9f3" +dependencies = [ + "async-io", + "async-lock", + "atomic-waker", + "cfg-if", + "futures-core", + "futures-io", + "rustix", + "signal-hook-registry", + "slab", + "windows-sys 0.59.0", +] + +[[package]] +name = "async-task" +version = "4.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b75356056920673b02621b35afd0f7dda9306d03c79a30f5c56c44cf256e3de" + [[package]] name = "async-trait" version = "0.1.83" @@ -292,6 +400,19 @@ dependencies = [ "generic-array", ] +[[package]] +name = "blocking" +version = "1.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "703f41c54fc768e63e091340b424302bb1c29ef4aa0c7f10fe849dfb114d29ea" +dependencies = [ + "async-channel", + "async-task", + "futures-io", + "futures-lite", + "piper", +] + [[package]] name = "bumpalo" version = "3.16.0" @@ -358,6 +479,12 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "cfg_aliases" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" + [[package]] name = "chrono" version = "0.4.38" @@ -419,6 +546,15 @@ dependencies = [ "memchr", ] +[[package]] +name = "concurrent-queue" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "const-oid" version = "0.9.6" @@ -494,6 +630,12 @@ dependencies = [ "libc", ] +[[package]] +name = "crossbeam-utils" +version = "0.8.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22ec99545bb0ed0ea7bb9b8e1e9122ea386ff8a48c0922e43f36d45ab09e0e80" + [[package]] name = "crypto-common" version = "0.1.6" @@ -654,6 +796,33 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "endi" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a3d8a32ae18130a3c84dd492d4215c3d913c3b07c6b63c2eb3eb7ff1101ab7bf" + +[[package]] +name = "enumflags2" +version = "0.7.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d232db7f5956f3f14313dc2f87985c58bd2c695ce124c8cdd984e08e15ac133d" +dependencies = [ + "enumflags2_derive", + "serde", +] + +[[package]] +name = "enumflags2_derive" +version = "0.7.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "de0d48a183585823424a4ce1aa132d174a6a81bd540895822eb4c8373a8e49e8" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.79", +] + [[package]] name = "env_filter" version = "0.1.2" @@ -692,6 +861,27 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "event-listener" +version = "5.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6032be9bd27023a771701cc49f9f053c751055f71efb2e0ae5c15809093675ba" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + +[[package]] +name = "event-listener-strategy" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f214dc438f977e6d4e3500aaa277f5ad94ca83fbbd9b1a15713ce2344ccc5a1" +dependencies = [ + "event-listener", + "pin-project-lite", +] + [[package]] name = "fastrand" version = "2.1.1" @@ -773,6 +963,19 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" +[[package]] +name = "futures-lite" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52527eb5074e35e9339c6b4e8d12600c7128b68fb25dcb9fa9dec18f7c25f3a5" +dependencies = [ + "fastrand", + "futures-core", + "futures-io", + "parking", + "pin-project-lite", +] + [[package]] name = "futures-macro" version = "0.3.31" @@ -1152,6 +1355,12 @@ version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" +[[package]] +name = "hermit-abi" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fbf6a919d6cf397374f7dfeeea91d974c7c0a7221d0d0f4f20d859d329e53fcc" + [[package]] name = "hex" version = "0.4.3" @@ -1820,10 +2029,12 @@ dependencies = [ "librespot-core", "log", "rand", + "serde", "serde_json", "sha1", "thiserror", "tokio", + "zbus", ] [[package]] @@ -1930,6 +2141,15 @@ version = "2.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" +[[package]] +name = "memoffset" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "488016bfae457b036d996092f6cb448677611ce4449e970ceaf42695203f218a" +dependencies = [ + "autocfg", +] + [[package]] name = "mime" version = "0.3.17" @@ -1957,7 +2177,7 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "80e04d1dcff3aae0704555fe5fee3bcfaf3d1fdf8a7e521d5b9d2b42acb52cec" dependencies = [ - "hermit-abi", + "hermit-abi 0.3.9", "libc", "wasi", "windows-sys 0.52.0", @@ -2013,6 +2233,19 @@ dependencies = [ "jni-sys", ] +[[package]] +name = "nix" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "71e2746dc3a24dd78b3cfcb7be93368c6de9963d30f43a6a73998a9cf4b17b46" +dependencies = [ + "bitflags 2.6.0", + "cfg-if", + "cfg_aliases", + "libc", + "memoffset", +] + [[package]] name = "no-std-compat" version = "0.4.1" @@ -2252,6 +2485,22 @@ dependencies = [ "paste", ] +[[package]] +name = "ordered-stream" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9aa2b01e1d916879f73a53d01d1d6cee68adbb31d6d9177a8cfce093cced1d50" +dependencies = [ + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "parking" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba" + [[package]] name = "parking_lot" version = "0.12.3" @@ -2331,6 +2580,17 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "piper" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96c8c490f422ef9a4efd2cb5b42b76c8613d7e7dfc1caf667b8a3350a5acc066" +dependencies = [ + "atomic-waker", + "fastrand", + "futures-io", +] + [[package]] name = "pkcs1" version = "0.7.5" @@ -2358,6 +2618,21 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "953ec861398dccce10c670dfeaf3ec4911ca479e9c02154b3a215178c5f566f2" +[[package]] +name = "polling" +version = "3.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc2790cd301dec6cd3b7a025e4815cf825724a51c98dccfe6a3e55f05ffb6511" +dependencies = [ + "cfg-if", + "concurrent-queue", + "hermit-abi 0.4.0", + "pin-project-lite", + "rustix", + "tracing", + "windows-sys 0.59.0", +] + [[package]] name = "portable-atomic" version = "1.9.0" @@ -2441,9 +2716,9 @@ dependencies = [ [[package]] name = "protobuf" -version = "3.6.0" +version = "3.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3018844a02746180074f621e847703737d27d89d7f0721a7a4da317f88b16385" +checksum = "a3a7c64d9bf75b1b8d981124c14c179074e8caa7dfe7b6a12e6222ddcd0c8f72" dependencies = [ "once_cell", "protobuf-support", @@ -2452,9 +2727,9 @@ dependencies = [ [[package]] name = "protobuf-codegen" -version = "3.6.0" +version = "3.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "411c15a212b4de05eb8bc989fd066a74c86bd3c04e27d6e86bd7703b806d7734" +checksum = "e26b833f144769a30e04b1db0146b2aaa53fd2fd83acf10a6b5f996606c18144" dependencies = [ "anyhow", "once_cell", @@ -2467,9 +2742,9 @@ dependencies = [ [[package]] name = "protobuf-parse" -version = "3.6.0" +version = "3.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "06f45f16b522d92336e839b5e40680095a045e36a1e7f742ba682ddc85236772" +checksum = "322330e133eab455718444b4e033ebfac7c6528972c784fcde28d2cc783c6257" dependencies = [ "anyhow", "indexmap", @@ -2483,9 +2758,9 @@ dependencies = [ [[package]] name = "protobuf-support" -version = "3.6.0" +version = "3.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "faf96d872914fcda2b66d66ea3fff2be7c66865d31c7bb2790cff32c0e714880" +checksum = "b088fd20b938a875ea00843b6faf48579462630015c3788d397ad6a786663252" dependencies = [ "thiserror", ] @@ -2945,6 +3220,17 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_repr" +version = "0.1.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c64451ba24fc7a6a2d60fc75dd9c83c90903b19028d4eff35e88fc1e86564e9" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.79", +] + [[package]] name = "serde_spanned" version = "0.6.8" @@ -3078,6 +3364,12 @@ dependencies = [ "der", ] +[[package]] +name = "static_assertions" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" + [[package]] name = "strsim" version = "0.11.1" @@ -3361,6 +3653,7 @@ dependencies = [ "signal-hook-registry", "socket2", "tokio-macros", + "tracing", "windows-sys 0.52.0", ] @@ -3494,9 +3787,21 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" dependencies = [ "pin-project-lite", + "tracing-attributes", "tracing-core", ] +[[package]] +name = "tracing-attributes" +version = "0.1.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.79", +] + [[package]] name = "tracing-core" version = "0.1.32" @@ -3538,6 +3843,17 @@ version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" +[[package]] +name = "uds_windows" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89daebc3e6fd160ac4aa9fc8b3bf71e1f74fbf92367ae71fb83a037e8bf164b9" +dependencies = [ + "memoffset", + "tempfile", + "winapi", +] + [[package]] name = "unicode-bidi" version = "0.3.17" @@ -3597,9 +3913,9 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" [[package]] name = "uuid" -version = "1.10.0" +version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81dfa00651efa65069b0b6b651f4aaa31ba9e3c3ce0137aaad053604ee7e0314" +checksum = "f8c5f0a0af699448548ad1a2fbf920fb4bee257eae39953ba95cb84891a0446a" dependencies = [ "getrandom", "rand", @@ -4147,6 +4463,73 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "xdg-home" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec1cdab258fb55c0da61328dc52c8764709b249011b2cad0454c72f0bf10a1f6" +dependencies = [ + "libc", + "windows-sys 0.59.0", +] + +[[package]] +name = "zbus" +version = "4.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb97012beadd29e654708a0fdb4c84bc046f537aecfde2c3ee0a9e4b4d48c725" +dependencies = [ + "async-broadcast", + "async-process", + "async-recursion", + "async-trait", + "enumflags2", + "event-listener", + "futures-core", + "futures-sink", + "futures-util", + "hex", + "nix", + "ordered-stream", + "rand", + "serde", + "serde_repr", + "sha1", + "static_assertions", + "tokio", + "tracing", + "uds_windows", + "windows-sys 0.52.0", + "xdg-home", + "zbus_macros", + "zbus_names", + "zvariant", +] + +[[package]] +name = "zbus_macros" +version = "4.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "267db9407081e90bbfa46d841d3cbc60f59c0351838c4bc65199ecd79ab1983e" +dependencies = [ + "proc-macro-crate", + "proc-macro2", + "quote", + "syn 2.0.79", + "zvariant_utils", +] + +[[package]] +name = "zbus_names" +version = "3.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b9b1fef7d021261cc16cba64c351d291b715febe0fa10dc3a443ac5a5022e6c" +dependencies = [ + "serde", + "static_assertions", + "zvariant", +] + [[package]] name = "zerocopy" version = "0.7.35" @@ -4173,3 +4556,40 @@ name = "zeroize" version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ced3678a2879b30306d323f4542626697a464a97c0a07c9aebf7ebca65cd4dde" + +[[package]] +name = "zvariant" +version = "4.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2084290ab9a1c471c38fc524945837734fbf124487e105daec2bb57fd48c81fe" +dependencies = [ + "endi", + "enumflags2", + "serde", + "static_assertions", + "zvariant_derive", +] + +[[package]] +name = "zvariant_derive" +version = "4.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73e2ba546bda683a90652bac4a279bc146adad1386f25379cf73200d2002c449" +dependencies = [ + "proc-macro-crate", + "proc-macro2", + "quote", + "syn 2.0.79", + "zvariant_utils", +] + +[[package]] +name = "zvariant_utils" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c51bcff7cc3dbb5055396bcf774748c3dab426b4b8659046963523cee4808340" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.79", +] From 17cdfd8f82e80cb4387663751068f48f00b0bc8b Mon Sep 17 00:00:00 2001 From: wisp3rwind <17089248+wisp3rwind@users.noreply.github.com> Date: Tue, 24 Sep 2024 17:53:07 +0200 Subject: [PATCH 11/17] discovery: ensure that server and dns-sd backend shutdown gracefully Previously, on drop the the shutdown_tx/close_tx, it wasn't guaranteed the corresponding tasks would continue to be polled until they actually completed their shutdown. Since dns_sd::Service is not Send and non-async, and because libmdns is non-async, put them on their own threads. --- discovery/src/lib.rs | 148 ++++++++++++++++++++++++++++------------ discovery/src/server.rs | 18 ++++- src/main.rs | 21 ++++-- 3 files changed, 136 insertions(+), 51 deletions(-) diff --git a/discovery/src/lib.rs b/discovery/src/lib.rs index 943b9bff0..3bee07786 100644 --- a/discovery/src/lib.rs +++ b/discovery/src/lib.rs @@ -11,8 +11,8 @@ mod avahi; mod server; use std::{ - any::Any, borrow::Cow, + convert::Infallible, error::Error as StdError, pin::Pin, task::{Context, Poll}, @@ -20,6 +20,7 @@ use std::{ use futures_core::Stream; use thiserror::Error; +use tokio::sync::oneshot; use self::server::DiscoveryServer; @@ -32,8 +33,32 @@ pub use crate::core::authentication::Credentials; /// Determining the icon in the list of available devices. pub use crate::core::config::DeviceType; +pub enum DiscoveryEvent { + Credentials(Credentials), + ServerError(DiscoveryError), + ZeroconfError(DiscoveryError), +} + +pub struct DnsSdHandle { + task_handle: tokio::task::JoinHandle<()>, + shutdown_tx: oneshot::Sender, +} + +impl DnsSdHandle { + async fn shutdown(self) { + log::debug!("Shutting down zeroconf responder"); + let Self { + task_handle, + shutdown_tx, + } = self; + std::mem::drop(shutdown_tx); + let _ = task_handle.await; + log::debug!("Zeroconf responder stopped"); + } +} + pub type ServiceBuilder = - fn(Cow<'static, str>, Vec, u16) -> Result, Error>; + fn(Cow<'static, str>, Vec, u16) -> Result; // Default goes first: This matches the behaviour when feature flags were exlusive, i.e. when there // was only `feature = "with-dns-sd"` or `not(feature = "with-dns-sd")` @@ -88,7 +113,7 @@ pub struct Discovery { /// An opaque handle to the DNS-SD service. Dropping this will unregister the service. #[allow(unused)] - svc: Box, + svc: DnsSdHandle, } /// A builder for [`Discovery`]. @@ -136,7 +161,11 @@ const DNS_SD_SERVICE_NAME: &str = "_spotify-connect._tcp"; const TXT_RECORD: [&str; 2] = ["VERSION=1.0", "CPath=/"]; #[cfg(feature = "with-avahi")] -async fn avahi_task(name: Cow<'static, str>, port: u16) -> zbus::Result<()> { +async fn avahi_task( + name: Cow<'static, str>, + port: u16, + entry_group: &mut Option>, +) -> zbus::Result<()> { use self::avahi::ServerProxy; let conn = zbus::Connection::system().await?; @@ -145,9 +174,11 @@ async fn avahi_task(name: Cow<'static, str>, port: u16) -> zbus::Result<()> { let avahi_server = ServerProxy::new(&conn).await?; log::trace!("Connected to Avahi"); - let entry_group = avahi_server.entry_group_new().await?; + *entry_group = Some(avahi_server.entry_group_new().await?); entry_group + .as_mut() + .unwrap() .add_service( -1, // AVAHI_IF_UNSPEC -1, // IPv4 and IPv6 @@ -161,7 +192,7 @@ async fn avahi_task(name: Cow<'static, str>, port: u16) -> zbus::Result<()> { ) .await?; - entry_group.commit().await?; + entry_group.as_mut().unwrap().commit().await?; log::debug!("Commited zeroconf service with name {}", &name); let _: () = std::future::pending().await; @@ -174,25 +205,32 @@ fn launch_avahi( name: Cow<'static, str>, _zeroconf_ip: Vec, port: u16, -) -> Result, Error> { - let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>(); - tokio::spawn(async move { +) -> Result { + let (shutdown_tx, shutdown_rx) = oneshot::channel(); + let task_handle = tokio::spawn(async move { + let mut entry_group = None; tokio::select! { - res = avahi_task(name, port) => { + res = avahi_task(name, port, &mut entry_group) => { if let Err(e) = res { log::error!("Avahi error {}, shutting down discovery", e); } }, _ = shutdown_rx => { - log::debug!("Un-publishing zeroconf service") - // FIXME: Call EntryGroup.Free() and ensure that the future - // continues to be polled until that has completed. + if let Some(entry_group) = entry_group.as_mut() { + if let Err(e) = entry_group.free().await { + log::warn!("Failed to un-publish zeroconf service: {}", e); + } else { + log::debug!("Un-published zeroconf service"); + } + } }, } }); - // Dropping the shutdown_tx will wake the shutdown_rx.await - Ok(Box::new(shutdown_tx)) + Ok(DnsSdHandle { + task_handle, + shutdown_tx, + }) } #[cfg(feature = "with-dns-sd")] @@ -200,18 +238,29 @@ fn launch_dns_sd( name: Cow<'static, str>, _zeroconf_ip: Vec, port: u16, -) -> Result, Error> { - let svc = dns_sd::DNSService::register( - Some(name.as_ref()), - DNS_SD_SERVICE_NAME, - None, - None, - port, - &TXT_RECORD, - ) - .map_err(|e| DiscoveryError::DnsSdError(Box::new(e)))?; - - Ok(Box::new(svc)) +) -> Result { + let (shutdown_tx, shutdown_rx) = oneshot::channel(); + let task_handle = tokio::task::spawn_blocking(move || { + let svc = dns_sd::DNSService::register( + Some(name.as_ref()), + DNS_SD_SERVICE_NAME, + None, + None, + port, + &TXT_RECORD, + ) + .map_err(|e| DiscoveryError::DnsSdError(Box::new(e))) + .unwrap(); + + let _ = shutdown_rx.blocking_recv(); + + std::mem::drop(svc); + }); + + Ok(DnsSdHandle { + shutdown_tx, + task_handle, + }) } #[cfg(feature = "with-libmdns")] @@ -219,21 +268,32 @@ fn launch_libmdns( name: Cow<'static, str>, zeroconf_ip: Vec, port: u16, -) -> Result, Error> { - let svc = if !zeroconf_ip.is_empty() { - libmdns::Responder::spawn_with_ip_list(&tokio::runtime::Handle::current(), zeroconf_ip) - } else { - libmdns::Responder::spawn(&tokio::runtime::Handle::current()) - } - .map_err(|e| DiscoveryError::DnsSdError(Box::new(e)))? - .register( - DNS_SD_SERVICE_NAME.to_owned(), - name.into_owned(), - port, - &TXT_RECORD, - ); - - Ok(Box::new(svc)) +) -> Result { + let (shutdown_tx, shutdown_rx) = oneshot::channel(); + let task_handle = tokio::task::spawn_blocking(move || { + let svc = if !zeroconf_ip.is_empty() { + libmdns::Responder::spawn_with_ip_list(&tokio::runtime::Handle::current(), zeroconf_ip) + } else { + libmdns::Responder::spawn(&tokio::runtime::Handle::current()) + } + .map_err(|e| DiscoveryError::DnsSdError(Box::new(e))) + .unwrap() + .register( + DNS_SD_SERVICE_NAME.to_owned(), + name.into_owned(), + port, + &TXT_RECORD, + ); + + let _ = shutdown_rx.blocking_recv(); + + std::mem::drop(svc); + }); + + Ok(DnsSdHandle { + shutdown_tx, + task_handle, + }) } impl Builder { @@ -317,6 +377,10 @@ impl Discovery { pub fn new>(device_id: T, client_id: T) -> Result { Self::builder(device_id, client_id).launch() } + + pub async fn shutdown(self) { + tokio::join!(self.server.shutdown(), self.svc.shutdown(),); + } } impl Stream for Discovery { diff --git a/discovery/src/server.rs b/discovery/src/server.rs index f3c979b9d..e7a940b61 100644 --- a/discovery/src/server.rs +++ b/discovery/src/server.rs @@ -260,7 +260,8 @@ impl RequestHandler { pub struct DiscoveryServer { cred_rx: mpsc::UnboundedReceiver, - _close_tx: oneshot::Sender, + close_tx: oneshot::Sender, + task_handle: tokio::task::JoinHandle<()>, } impl DiscoveryServer { @@ -297,7 +298,7 @@ impl DiscoveryServer { } } - tokio::spawn(async move { + let task_handle = tokio::spawn(async move { let discovery = Arc::new(discovery); let server = hyper::server::conn::http1::Builder::new(); @@ -338,9 +339,20 @@ impl DiscoveryServer { Ok(Self { cred_rx, - _close_tx: close_tx, + close_tx, + task_handle, }) } + + pub async fn shutdown(self) { + let Self { + close_tx, + task_handle, + .. + } = self; + std::mem::drop(close_tx); + let _ = task_handle.await; + } } impl Stream for DiscoveryServer { diff --git a/src/main.rs b/src/main.rs index 20e96e48e..b3e9402e2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1989,18 +1989,27 @@ async fn main() { info!("Gracefully shutting down"); + let mut shutdown_tasks = tokio::task::JoinSet::new(); + // Shutdown spirc if necessary if let Some(spirc) = spirc { if let Err(e) = spirc.shutdown() { error!("error sending spirc shutdown message: {}", e); } - if let Some(mut spirc_task) = spirc_task { - tokio::select! { - _ = tokio::signal::ctrl_c() => (), - _ = spirc_task.as_mut() => (), - else => (), - } + if let Some(spirc_task) = spirc_task { + shutdown_tasks.spawn(spirc_task); } } + + if let Some(discovery) = discovery { + shutdown_tasks.spawn(discovery.shutdown()); + } + + tokio::select! { + _ = tokio::signal::ctrl_c() => (), + _ = async { + while shutdown_tasks.join_next().await.is_some() {} + } => (), + } } From b340f3b9e678e929c3fbf77bc7a2b511824ceb09 Mon Sep 17 00:00:00 2001 From: wisp3rwind <17089248+wisp3rwind@users.noreply.github.com> Date: Tue, 24 Sep 2024 23:23:14 +0200 Subject: [PATCH 12/17] discovery: use a shared channel for server and zeroconf status messages --- discovery/src/lib.rs | 133 ++++++++++++++++++++++++++++------------ discovery/src/server.rs | 40 +++++------- 2 files changed, 108 insertions(+), 65 deletions(-) diff --git a/discovery/src/lib.rs b/discovery/src/lib.rs index 3bee07786..6610bac48 100644 --- a/discovery/src/lib.rs +++ b/discovery/src/lib.rs @@ -20,7 +20,7 @@ use std::{ use futures_core::Stream; use thiserror::Error; -use tokio::sync::oneshot; +use tokio::sync::{mpsc, oneshot}; use self::server::DiscoveryServer; @@ -57,8 +57,12 @@ impl DnsSdHandle { } } -pub type ServiceBuilder = - fn(Cow<'static, str>, Vec, u16) -> Result; +pub type ServiceBuilder = fn( + Cow<'static, str>, + Vec, + u16, + mpsc::UnboundedSender, +) -> Result; // Default goes first: This matches the behaviour when feature flags were exlusive, i.e. when there // was only `feature = "with-dns-sd"` or `not(feature = "with-dns-sd")` @@ -114,6 +118,8 @@ pub struct Discovery { /// An opaque handle to the DNS-SD service. Dropping this will unregister the service. #[allow(unused)] svc: DnsSdHandle, + + event_rx: mpsc::UnboundedReceiver, } /// A builder for [`Discovery`]. @@ -143,6 +149,13 @@ pub enum DiscoveryError { ParamsError(&'static str), } +#[cfg(feature = "with-avahi")] +impl From for DiscoveryError { + fn from(error: zbus::Error) -> Self { + Self::DnsSdError(Box::new(error)) + } +} + impl From for Error { fn from(err: DiscoveryError) -> Self { match err { @@ -165,7 +178,7 @@ async fn avahi_task( name: Cow<'static, str>, port: u16, entry_group: &mut Option>, -) -> zbus::Result<()> { +) -> Result<(), DiscoveryError> { use self::avahi::ServerProxy; let conn = zbus::Connection::system().await?; @@ -205,14 +218,17 @@ fn launch_avahi( name: Cow<'static, str>, _zeroconf_ip: Vec, port: u16, + status_tx: mpsc::UnboundedSender, ) -> Result { let (shutdown_tx, shutdown_rx) = oneshot::channel(); + let task_handle = tokio::spawn(async move { let mut entry_group = None; tokio::select! { res = avahi_task(name, port, &mut entry_group) => { if let Err(e) = res { - log::error!("Avahi error {}, shutting down discovery", e); + log::error!("Avahi error: {}", e); + let _ = status_tx.send(DiscoveryEvent::ZeroconfError(e)); } }, _ = shutdown_rx => { @@ -238,23 +254,33 @@ fn launch_dns_sd( name: Cow<'static, str>, _zeroconf_ip: Vec, port: u16, + status_tx: mpsc::UnboundedSender, ) -> Result { let (shutdown_tx, shutdown_rx) = oneshot::channel(); - let task_handle = tokio::task::spawn_blocking(move || { - let svc = dns_sd::DNSService::register( - Some(name.as_ref()), - DNS_SD_SERVICE_NAME, - None, - None, - port, - &TXT_RECORD, - ) - .map_err(|e| DiscoveryError::DnsSdError(Box::new(e))) - .unwrap(); - - let _ = shutdown_rx.blocking_recv(); - std::mem::drop(svc); + let task_handle = tokio::task::spawn_blocking(move || { + let inner = move || -> Result<(), DiscoveryError> { + let svc = dns_sd::DNSService::register( + Some(name.as_ref()), + DNS_SD_SERVICE_NAME, + None, + None, + port, + &TXT_RECORD, + ) + .map_err(|e| DiscoveryError::DnsSdError(Box::new(e)))?; + + let _ = shutdown_rx.blocking_recv(); + + std::mem::drop(svc); + + Ok(()) + }; + + if let Err(e) = inner() { + log::error!("dns_sd error: {}", e); + let _ = status_tx.send(DiscoveryEvent::ZeroconfError(e)); + } }); Ok(DnsSdHandle { @@ -268,26 +294,40 @@ fn launch_libmdns( name: Cow<'static, str>, zeroconf_ip: Vec, port: u16, + status_tx: mpsc::UnboundedSender, ) -> Result { let (shutdown_tx, shutdown_rx) = oneshot::channel(); + let task_handle = tokio::task::spawn_blocking(move || { - let svc = if !zeroconf_ip.is_empty() { - libmdns::Responder::spawn_with_ip_list(&tokio::runtime::Handle::current(), zeroconf_ip) - } else { - libmdns::Responder::spawn(&tokio::runtime::Handle::current()) + let inner = move || -> Result<(), DiscoveryError> { + let svc = if !zeroconf_ip.is_empty() { + libmdns::Responder::spawn_with_ip_list( + &tokio::runtime::Handle::current(), + zeroconf_ip, + ) + } else { + libmdns::Responder::spawn(&tokio::runtime::Handle::current()) + } + .map_err(|e| DiscoveryError::DnsSdError(Box::new(e))) + .unwrap() + .register( + DNS_SD_SERVICE_NAME.to_owned(), + name.into_owned(), + port, + &TXT_RECORD, + ); + + let _ = shutdown_rx.blocking_recv(); + + std::mem::drop(svc); + + Ok(()) + }; + + if let Err(e) = inner() { + log::error!("libmdns error: {}", e); + let _ = status_tx.send(DiscoveryEvent::ZeroconfError(e)); } - .map_err(|e| DiscoveryError::DnsSdError(Box::new(e))) - .unwrap() - .register( - DNS_SD_SERVICE_NAME.to_owned(), - name.into_owned(), - port, - &TXT_RECORD, - ); - - let _ = shutdown_rx.blocking_recv(); - - std::mem::drop(svc); }); Ok(DnsSdHandle { @@ -358,12 +398,18 @@ impl Builder { let name = self.server_config.name.clone(); let zeroconf_ip = self.zeroconf_ip; + let (event_tx, event_rx) = mpsc::unbounded_channel(); + let mut port = self.port; - let server = DiscoveryServer::new(self.server_config, &mut port)?; + let server = DiscoveryServer::new(self.server_config, &mut port, event_tx.clone())?; let launch_svc = self.zeroconf_backend.unwrap_or(find(None)?); - let svc = launch_svc(name, zeroconf_ip, port)?; - Ok(Discovery { server, svc }) + let svc = launch_svc(name, zeroconf_ip, port, event_tx)?; + Ok(Discovery { + server, + svc, + event_rx, + }) } } @@ -387,6 +433,15 @@ impl Stream for Discovery { type Item = Credentials; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.server).poll_next(cx) + match Pin::new(&mut self.event_rx).poll_recv(cx) { + // Yields credentials + Poll::Ready(Some(DiscoveryEvent::Credentials(creds))) => Poll::Ready(Some(creds)), + // Also terminate the stream on fatal server or MDNS/DNS-SD errors. + Poll::Ready(Some( + DiscoveryEvent::ServerError(_) | DiscoveryEvent::ZeroconfError(_), + )) => Poll::Ready(None), + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => Poll::Pending, + } } } diff --git a/discovery/src/server.rs b/discovery/src/server.rs index e7a940b61..683af854e 100644 --- a/discovery/src/server.rs +++ b/discovery/src/server.rs @@ -3,16 +3,13 @@ use std::{ collections::BTreeMap, convert::Infallible, net::{Ipv4Addr, Ipv6Addr, SocketAddr, TcpListener}, - pin::Pin, sync::{Arc, Mutex}, - task::{Context, Poll}, }; use aes::cipher::{KeyIvInit, StreamCipher}; use base64::engine::general_purpose::STANDARD as BASE64; use base64::engine::Engine as _; use bytes::Bytes; -use futures_core::Stream; use futures_util::{FutureExt, TryFutureExt}; use hmac::{Hmac, Mac}; use http_body_util::{BodyExt, Full}; @@ -24,7 +21,7 @@ use serde_json::json; use sha1::{Digest, Sha1}; use tokio::sync::{mpsc, oneshot}; -use super::DiscoveryError; +use super::{DiscoveryError, DiscoveryEvent}; use crate::{ core::config::DeviceType, @@ -47,21 +44,17 @@ struct RequestHandler { config: Config, username: Mutex>, keys: DhLocalKeys, - tx: mpsc::UnboundedSender, + event_tx: mpsc::UnboundedSender, } impl RequestHandler { - fn new(config: Config) -> (Self, mpsc::UnboundedReceiver) { - let (tx, rx) = mpsc::unbounded_channel(); - - let discovery = Self { + fn new(config: Config, event_tx: mpsc::UnboundedSender) -> Self { + Self { config, username: Mutex::new(None), keys: DhLocalKeys::random(&mut rand::thread_rng()), - tx, - }; - - (discovery, rx) + event_tx, + } } fn active_user(&self) -> String { @@ -202,7 +195,8 @@ impl RequestHandler { { let maybe_username = self.username.lock(); - self.tx.send(credentials)?; + self.event_tx + .send(DiscoveryEvent::Credentials(credentials))?; if let Ok(mut username_field) = maybe_username { *username_field = Some(String::from(username)); } else { @@ -259,14 +253,17 @@ impl RequestHandler { } pub struct DiscoveryServer { - cred_rx: mpsc::UnboundedReceiver, close_tx: oneshot::Sender, task_handle: tokio::task::JoinHandle<()>, } impl DiscoveryServer { - pub fn new(config: Config, port: &mut u16) -> Result { - let (discovery, cred_rx) = RequestHandler::new(config); + pub fn new( + config: Config, + port: &mut u16, + event_tx: mpsc::UnboundedSender, + ) -> Result { + let discovery = RequestHandler::new(config, event_tx); let address = if cfg!(windows) { SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), *port) } else { @@ -338,7 +335,6 @@ impl DiscoveryServer { }); Ok(Self { - cred_rx, close_tx, task_handle, }) @@ -354,11 +350,3 @@ impl DiscoveryServer { let _ = task_handle.await; } } - -impl Stream for DiscoveryServer { - type Item = Credentials; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.cred_rx.poll_recv(cx) - } -} From 692cde006949b27098667e41ce4d0109c0856b29 Mon Sep 17 00:00:00 2001 From: wisp3rwind <17089248+wisp3rwind@users.noreply.github.com> Date: Tue, 24 Sep 2024 23:49:13 +0200 Subject: [PATCH 13/17] discovery: add Avahi reconnection logic This deals gracefully with the case where the Avahi daemon is restarted or not running initially. --- discovery/src/lib.rs | 140 +++++++++++++++++++++++++++++++++++-------- 1 file changed, 115 insertions(+), 25 deletions(-) diff --git a/discovery/src/lib.rs b/discovery/src/lib.rs index 6610bac48..d8f039520 100644 --- a/discovery/src/lib.rs +++ b/discovery/src/lib.rs @@ -179,38 +179,128 @@ async fn avahi_task( port: u16, entry_group: &mut Option>, ) -> Result<(), DiscoveryError> { - use self::avahi::ServerProxy; + use self::avahi::{EntryGroupState, ServerProxy}; + use futures_util::StreamExt; let conn = zbus::Connection::system().await?; - // Connect to Avahi and publish the service - let avahi_server = ServerProxy::new(&conn).await?; - log::trace!("Connected to Avahi"); - - *entry_group = Some(avahi_server.entry_group_new().await?); - - entry_group - .as_mut() - .unwrap() - .add_service( - -1, // AVAHI_IF_UNSPEC - -1, // IPv4 and IPv6 - 0, // flags - &name, - DNS_SD_SERVICE_NAME, // type - "", // domain: let the server choose - "", // host: let the server choose - port, - &TXT_RECORD.map(|s| s.as_bytes()), - ) + // Wait for the daemon to show up. + // On error: Failed to listen for NameOwnerChanged signal => Fatal DBus issue + let bus = zbus::fdo::DBusProxy::new(&conn).await?; + let mut stream = bus + .receive_name_owner_changed_with_args(&[(0, "org.freedesktop.Avahi")]) .await?; - entry_group.as_mut().unwrap().commit().await?; - log::debug!("Commited zeroconf service with name {}", &name); + loop { + // Wait for Avahi daemon to be started + 'wait_avahi: { + while let Poll::Ready(Some(_)) = futures_util::poll!(stream.next()) { + // Drain queued name owner changes, since we're going to connect in a second + } + + // Ping after we connected to the signal since it might have shown up in the meantime + if let Ok(avahi_peer) = + zbus::fdo::PeerProxy::new(&conn, "org.freedesktop.Avahi", "/").await + { + if avahi_peer.ping().await.is_ok() { + log::debug!("Pinged Avahi: Available"); + break 'wait_avahi; + } + } + log::warn!("Failed to connect to Avahi, zeroconf discovery will not work until avahi-daemon is started. Check that it is installed and running"); + + // If it didn't, wait for the signal + match stream.next().await { + Some(_signal) => { + log::debug!("Avahi appeared"); + break 'wait_avahi; + } + // The stream ended, but this should never happen + None => { + return Err(zbus::Error::Failure("DBus disappeared".to_owned()).into()); + } + } + } + + // Connect to Avahi and publish the service + let avahi_server = ServerProxy::new(&conn).await?; + log::trace!("Connected to Avahi"); + + *entry_group = Some(avahi_server.entry_group_new().await?); - let _: () = std::future::pending().await; + let mut entry_group_state_stream = entry_group + .as_mut() + .unwrap() + .receive_state_changed() + .await?; - Ok(()) + entry_group + .as_mut() + .unwrap() + .add_service( + -1, // AVAHI_IF_UNSPEC + -1, // IPv4 and IPv6 + 0, // flags + &name, + DNS_SD_SERVICE_NAME, // type + "", // domain: let the server choose + "", // host: let the server choose + port, + &TXT_RECORD.map(|s| s.as_bytes()), + ) + .await?; + + entry_group.as_mut().unwrap().commit().await?; + log::debug!("Commited zeroconf service with name {}", &name); + + 'monitor_service: loop { + tokio::select! { + Some(state_changed) = entry_group_state_stream.next() => { + let (state, error) = match state_changed.args() { + Ok(sc) => (sc.state, sc.error), + Err(e) => { + log::warn!("Error on receiving EntryGroup state from Avahi: {}", e); + continue 'monitor_service; + } + }; + match state { + EntryGroupState::Uncommited | EntryGroupState::Registering => { + // Not yet registered, ignore. + } + EntryGroupState::Established => { + log::info!("Published zeroconf service"); + } + EntryGroupState::Collision => { + // This most likely means that librespot has unintentionally been started twice. + // Thus, don't retry with a new name, but abort. + // + // Note that the error would usually already be returned by + // entry_group.add_service above, so this state_changed handler + // won't be hit. + // + // EntryGroup has been withdrawn at this point already! + log::error!("zeroconf collision for name '{}'", &name); + return Err(zbus::Error::Failure(format!("zeroconf collision for name: {}", name)).into()); + } + EntryGroupState::Failure => { + // TODO: Back off/treat as fatal? + // EntryGroup has been withdrawn at this point already! + // There seems to be no code in Avahi that actually sets this state. + log::error!("zeroconf failure: {}", error); + return Err(zbus::Error::Failure(format!("zeroconf failure: {}", error)).into()); + } + } + } + _name_owner_change = stream.next() => { + break 'monitor_service; + } + } + } + + // Avahi disappeared (or the service was immediately taken over by a + // new daemon) => drop all handles, and reconnect + log::info!("Avahi disappeared, trying to reconnect"); + } } #[cfg(feature = "with-avahi")] From 36083fc438fae080062e7eb11ea9b7b5075c0612 Mon Sep 17 00:00:00 2001 From: wisp3rwind <17089248+wisp3rwind@users.noreply.github.com> Date: Sun, 6 Oct 2024 14:07:58 +0200 Subject: [PATCH 14/17] discovery: allow running when compiled without zeroconf backend... ...but exit with an error if there's no way to authenticate --- src/main.rs | 64 +++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 48 insertions(+), 16 deletions(-) diff --git a/src/main.rs b/src/main.rs index b3e9402e2..9d1717f53 100644 --- a/src/main.rs +++ b/src/main.rs @@ -212,12 +212,11 @@ struct Setup { credentials: Option, enable_oauth: bool, oauth_port: Option, - enable_discovery: bool, zeroconf_port: u16, player_event_program: Option, emit_sink_events: bool, zeroconf_ip: Vec, - zeroconf_backend: librespot::discovery::ServiceBuilder, + zeroconf_backend: Option, } fn get_setup() -> Setup { @@ -1199,9 +1198,22 @@ fn get_setup() -> Setup { } }; - let enable_discovery = !opt_present(DISABLE_DISCOVERY); + let no_discovery_reason = if !cfg!(any( + feature = "with-libmdns", + feature = "with-dns-sd", + feature = "with-avahi" + )) { + Some("librespot compiled without zeroconf backend".to_owned()) + } else if opt_present(DISABLE_DISCOVERY) { + Some(format!( + "the `--{}` / `-{}` flag set", + DISABLE_DISCOVERY, DISABLE_DISCOVERY_SHORT, + )) + } else { + None + }; - if credentials.is_none() && !enable_discovery && !enable_oauth { + if credentials.is_none() && no_discovery_reason.is_some() && !enable_oauth { error!("Credentials are required if discovery and oauth login are disabled."); exit(1); } @@ -1234,14 +1246,16 @@ fn get_setup() -> Setup { Some(5588) }; - if !enable_discovery && opt_present(ZEROCONF_PORT) { - warn!( - "With the `--{}` / `-{}` flag set `--{}` / `-{}` has no effect.", - DISABLE_DISCOVERY, DISABLE_DISCOVERY_SHORT, ZEROCONF_PORT, ZEROCONF_PORT_SHORT - ); + if let Some(reason) = no_discovery_reason.as_deref() { + if opt_present(ZEROCONF_PORT) { + warn!( + "With {} `--{}` / `-{}` has no effect.", + reason, ZEROCONF_PORT, ZEROCONF_PORT_SHORT + ); + } } - let zeroconf_port = if enable_discovery { + let zeroconf_port = if no_discovery_reason.is_none() { opt_str(ZEROCONF_PORT) .map(|port| match port.parse::() { Ok(value) if value != 0 => value, @@ -1277,6 +1291,15 @@ fn get_setup() -> Setup { None => SessionConfig::default().autoplay, }; + if let Some(reason) = no_discovery_reason.as_deref() { + if opt_present(ZEROCONF_INTERFACE) { + warn!( + "With {} `--{}` / `-{}` has no effect.", + reason, ZEROCONF_INTERFACE, ZEROCONF_INTERFACE_SHORT + ); + } + } + let zeroconf_ip: Vec = if opt_present(ZEROCONF_INTERFACE) { if let Some(zeroconf_ip) = opt_str(ZEROCONF_INTERFACE) { zeroconf_ip @@ -1302,9 +1325,18 @@ fn get_setup() -> Setup { vec![] }; + if let Some(reason) = no_discovery_reason.as_deref() { + if opt_present(ZEROCONF_BACKEND) { + warn!( + "With {} `--{}` / `-{}` has no effect.", + reason, ZEROCONF_BACKEND, ZEROCONF_BACKEND_SHORT + ); + } + } + let zeroconf_backend_name = opt_str(ZEROCONF_BACKEND); - let zeroconf_backend = librespot::discovery::find(zeroconf_backend_name.as_deref()) - .unwrap_or_else(|_| { + let zeroconf_backend = no_discovery_reason.is_none().then(|| { + librespot::discovery::find(zeroconf_backend_name.as_deref()).unwrap_or_else(|_| { let available_backends: Vec<_> = librespot::discovery::BACKENDS .iter() .filter_map(|(id, launch_svc)| launch_svc.map(|_| *id)) @@ -1323,7 +1355,8 @@ fn get_setup() -> Setup { ); exit(1); - }); + }) + }); let connect_config = { let connect_default_config = ConnectConfig::default(); @@ -1766,7 +1799,6 @@ fn get_setup() -> Setup { credentials, enable_oauth, oauth_port, - enable_discovery, zeroconf_port, player_event_program, emit_sink_events, @@ -1800,7 +1832,7 @@ async fn main() { let mut sys = System::new(); - if setup.enable_discovery { + if let Some(zeroconf_backend) = setup.zeroconf_backend { // When started at boot as a service discovery may fail due to it // trying to bind to interfaces before the network is actually up. // This could be prevented in systemd by starting the service after @@ -1820,7 +1852,7 @@ async fn main() { .is_group(setup.connect_config.is_group) .port(setup.zeroconf_port) .zeroconf_ip(setup.zeroconf_ip.clone()) - .zeroconf_backend(setup.zeroconf_backend) + .zeroconf_backend(zeroconf_backend) .launch() { Ok(d) => break Some(d), From c112ab362cdc22a261130ab697d105fa268fe098 Mon Sep 17 00:00:00 2001 From: wisp3rwind <17089248+wisp3rwind@users.noreply.github.com> Date: Sun, 6 Oct 2024 17:09:28 +0200 Subject: [PATCH 15/17] better error messages for invalid options with no short flag --- src/main.rs | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/src/main.rs b/src/main.rs index 9d1717f53..66b569eb0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -811,12 +811,22 @@ fn get_setup() -> Setup { exit(0); } + // Can't use `-> fmt::Arguments` due to https://github.com/rust-lang/rust/issues/92698 + fn format_flag(long: &str, short: &str) -> String { + if short.is_empty() { + format!("`--{long}`") + } else { + format!("`--{long}` / `-{short}`") + } + } + let invalid_error_msg = |long: &str, short: &str, invalid: &str, valid_values: &str, default_value: &str| { - error!("Invalid `--{long}` / `-{short}`: \"{invalid}\""); + let flag = format_flag(long, short); + error!("Invalid {flag}: \"{invalid}\""); if !valid_values.is_empty() { - println!("Valid `--{long}` / `-{short}` values: {valid_values}"); + println!("Valid {flag} values: {valid_values}"); } if !default_value.is_empty() { @@ -1294,8 +1304,9 @@ fn get_setup() -> Setup { if let Some(reason) = no_discovery_reason.as_deref() { if opt_present(ZEROCONF_INTERFACE) { warn!( - "With {} `--{}` / `-{}` has no effect.", - reason, ZEROCONF_INTERFACE, ZEROCONF_INTERFACE_SHORT + "With {} {} has no effect.", + reason, + format_flag(ZEROCONF_INTERFACE, ZEROCONF_INTERFACE_SHORT), ); } } From 80e5fc090bd1c6a5e5bb903eb331517fbef0419f Mon Sep 17 00:00:00 2001 From: wisp3rwind <17089248+wisp3rwind@users.noreply.github.com> Date: Tue, 15 Oct 2024 23:55:23 +0200 Subject: [PATCH 16/17] address review --- Cargo.lock | 1 + Cargo.toml | 2 +- discovery/Cargo.toml | 1 + discovery/src/avahi.rs | 4 ++-- discovery/src/lib.rs | 10 +++++----- src/main.rs | 7 +++---- 6 files changed, 13 insertions(+), 12 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 028f41924..7da57b923 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2031,6 +2031,7 @@ dependencies = [ "rand", "serde", "serde_json", + "serde_repr", "sha1", "thiserror", "tokio", diff --git a/Cargo.toml b/Cargo.toml index c79c140cd..a6d216b3f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -63,7 +63,7 @@ log = "0.4" sha1 = "0.10" sysinfo = { version = "0.31.3", default-features = false, features = ["system"] } thiserror = "1.0" -tokio = { version = "1", features = ["rt", "macros", "signal", "sync", "parking_lot", "process"] } +tokio = { version = "1.40", features = ["rt", "macros", "signal", "sync", "parking_lot", "process"] } url = "2.2" [features] diff --git a/discovery/Cargo.toml b/discovery/Cargo.toml index e26795f76..fa0746eff 100644 --- a/discovery/Cargo.toml +++ b/discovery/Cargo.toml @@ -25,6 +25,7 @@ libmdns = { version = "0.9", optional = true } log = "0.4" rand = "0.8" serde = { version = "1", default-features = false, features = ["derive"], optional = true } +serde_repr = "0.1" serde_json = "1.0" sha1 = "0.10" thiserror = "1.0" diff --git a/discovery/src/avahi.rs b/discovery/src/avahi.rs index a12a996a2..7c098168e 100644 --- a/discovery/src/avahi.rs +++ b/discovery/src/avahi.rs @@ -34,10 +34,10 @@ mod server { } mod entry_group { - use serde::Deserialize; + use serde_repr::Deserialize_repr; use zbus::zvariant; - #[derive(Clone, Copy, Debug, Deserialize)] + #[derive(Clone, Copy, Debug, Deserialize_repr)] #[repr(i32)] pub enum EntryGroupState { // The group has not yet been committed, the user must still call avahi_entry_group_commit() diff --git a/discovery/src/lib.rs b/discovery/src/lib.rs index d8f039520..624bc167a 100644 --- a/discovery/src/lib.rs +++ b/discovery/src/lib.rs @@ -57,7 +57,7 @@ impl DnsSdHandle { } } -pub type ServiceBuilder = fn( +pub type DnsSdServiceBuilder = fn( Cow<'static, str>, Vec, u16, @@ -69,7 +69,7 @@ pub type ServiceBuilder = fn( pub const BACKENDS: &[( &str, // If None, the backend is known but wasn't compiled. - Option, + Option, )] = &[ #[cfg(feature = "with-avahi")] ("avahi", Some(launch_avahi)), @@ -85,7 +85,7 @@ pub const BACKENDS: &[( ("libmdns", None), ]; -pub fn find(name: Option<&str>) -> Result { +pub fn find(name: Option<&str>) -> Result { if let Some(ref name) = name { match BACKENDS.iter().find(|(id, _)| name == id) { Some((_id, Some(launch_svc))) => Ok(*launch_svc), @@ -127,7 +127,7 @@ pub struct Builder { server_config: server::Config, port: u16, zeroconf_ip: Vec, - zeroconf_backend: Option, + zeroconf_backend: Option, } /// Errors that can occur while setting up a [`Discovery`] instance. @@ -468,7 +468,7 @@ impl Builder { } /// Set the zeroconf (MDNS and DNS-SD) implementation to use. - pub fn zeroconf_backend(mut self, zeroconf_backend: ServiceBuilder) -> Self { + pub fn zeroconf_backend(mut self, zeroconf_backend: DnsSdServiceBuilder) -> Self { self.zeroconf_backend = Some(zeroconf_backend); self } diff --git a/src/main.rs b/src/main.rs index 66b569eb0..2da9323a1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -22,6 +22,7 @@ use librespot::{ authentication::Credentials, cache::Cache, config::DeviceType, version, Session, SessionConfig, }, + discovery::DnsSdServiceBuilder, playback::{ audio_backend::{self, SinkBuilder, BACKENDS}, config::{ @@ -216,7 +217,7 @@ struct Setup { player_event_program: Option, emit_sink_events: bool, zeroconf_ip: Vec, - zeroconf_backend: Option, + zeroconf_backend: Option, } fn get_setup() -> Setup { @@ -2051,8 +2052,6 @@ async fn main() { tokio::select! { _ = tokio::signal::ctrl_c() => (), - _ = async { - while shutdown_tasks.join_next().await.is_some() {} - } => (), + _ = shutdown_tasks.join_all() => (), } } From e6645f11041db792797dd7ac276c88dada29791e Mon Sep 17 00:00:00 2001 From: wisp3rwind <17089248+wisp3rwind@users.noreply.github.com> Date: Mon, 21 Oct 2024 11:02:55 +0200 Subject: [PATCH 17/17] discovery: revise shutdown handling after review --- discovery/src/lib.rs | 16 +++++++++++----- discovery/src/server.rs | 18 ++++++++++++------ 2 files changed, 23 insertions(+), 11 deletions(-) diff --git a/discovery/src/lib.rs b/discovery/src/lib.rs index 624bc167a..d829e0f57 100644 --- a/discovery/src/lib.rs +++ b/discovery/src/lib.rs @@ -12,7 +12,6 @@ mod server; use std::{ borrow::Cow, - convert::Infallible, error::Error as StdError, pin::Pin, task::{Context, Poll}, @@ -39,9 +38,13 @@ pub enum DiscoveryEvent { ZeroconfError(DiscoveryError), } +enum ZeroconfCmd { + Shutdown, +} + pub struct DnsSdHandle { task_handle: tokio::task::JoinHandle<()>, - shutdown_tx: oneshot::Sender, + shutdown_tx: oneshot::Sender, } impl DnsSdHandle { @@ -51,9 +54,12 @@ impl DnsSdHandle { task_handle, shutdown_tx, } = self; - std::mem::drop(shutdown_tx); - let _ = task_handle.await; - log::debug!("Zeroconf responder stopped"); + if shutdown_tx.send(ZeroconfCmd::Shutdown).is_err() { + log::warn!("Zeroconf responder unexpectedly disappeared"); + } else { + let _ = task_handle.await; + log::debug!("Zeroconf responder stopped"); + } } } diff --git a/discovery/src/server.rs b/discovery/src/server.rs index 683af854e..aa66fcb26 100644 --- a/discovery/src/server.rs +++ b/discovery/src/server.rs @@ -1,7 +1,6 @@ use std::{ borrow::Cow, collections::BTreeMap, - convert::Infallible, net::{Ipv4Addr, Ipv6Addr, SocketAddr, TcpListener}, sync::{Arc, Mutex}, }; @@ -252,8 +251,12 @@ impl RequestHandler { } } +pub(crate) enum DiscoveryServerCmd { + Shutdown, +} + pub struct DiscoveryServer { - close_tx: oneshot::Sender, + close_tx: oneshot::Sender, task_handle: tokio::task::JoinHandle<()>, } @@ -324,14 +327,12 @@ impl DiscoveryServer { }); } _ = &mut close_rx => { - debug!("Shutting down discovery server"); break; } } } graceful.shutdown().await; - debug!("Discovery server stopped"); }); Ok(Self { @@ -346,7 +347,12 @@ impl DiscoveryServer { task_handle, .. } = self; - std::mem::drop(close_tx); - let _ = task_handle.await; + log::debug!("Shutting down discovery server"); + if close_tx.send(DiscoveryServerCmd::Shutdown).is_err() { + log::warn!("Discovery server unexpectedly disappeared"); + } else { + let _ = task_handle.await; + log::debug!("Discovery server stopped"); + } } }