diff --git a/Cargo.lock b/Cargo.lock index 0b6d5b0..16226c1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -164,17 +164,6 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "async-recursion" -version = "1.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3b43422f69d8ff38f95f1b2bb76517c91589a924d1559a0e935d7c8ce0274c11" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.65", -] - [[package]] name = "async-signal" version = "0.2.5" @@ -1997,12 +1986,10 @@ dependencies = [ [[package]] name = "zbus" version = "4.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bb97012beadd29e654708a0fdb4c84bc046f537aecfde2c3ee0a9e4b4d48c725" +source = "git+https://github.com/dbus2/zbus/#bdf3e0c05ad903b3ac80fc795f2eb71cf3045c13" dependencies = [ "async-broadcast", "async-process", - "async-recursion", "async-trait", "enumflags2", "event-listener 5.3.1", @@ -2020,7 +2007,7 @@ dependencies = [ "tokio", "tracing", "uds_windows", - "windows-sys 0.52.0", + "windows-sys 0.59.0", "xdg-home", "zbus_macros", "zbus_names", @@ -2030,8 +2017,7 @@ dependencies = [ [[package]] name = "zbus_macros" version = "4.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "267db9407081e90bbfa46d841d3cbc60f59c0351838c4bc65199ecd79ab1983e" +source = "git+https://github.com/dbus2/zbus/#bdf3e0c05ad903b3ac80fc795f2eb71cf3045c13" dependencies = [ "proc-macro-crate", "proc-macro2", @@ -2043,8 +2029,7 @@ dependencies = [ [[package]] name = "zbus_names" version = "3.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4b9b1fef7d021261cc16cba64c351d291b715febe0fa10dc3a443ac5a5022e6c" +source = "git+https://github.com/dbus2/zbus/#bdf3e0c05ad903b3ac80fc795f2eb71cf3045c13" dependencies = [ "serde", "static_assertions", @@ -2054,21 +2039,20 @@ dependencies = [ [[package]] name = "zvariant" version = "4.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2084290ab9a1c471c38fc524945837734fbf124487e105daec2bb57fd48c81fe" +source = "git+https://github.com/dbus2/zbus/#bdf3e0c05ad903b3ac80fc795f2eb71cf3045c13" dependencies = [ "endi", "enumflags2", "serde", "static_assertions", "zvariant_derive", + "zvariant_utils", ] [[package]] name = "zvariant_derive" version = "4.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "73e2ba546bda683a90652bac4a279bc146adad1386f25379cf73200d2002c449" +source = "git+https://github.com/dbus2/zbus/#bdf3e0c05ad903b3ac80fc795f2eb71cf3045c13" dependencies = [ "proc-macro-crate", "proc-macro2", @@ -2079,11 +2063,13 @@ dependencies = [ [[package]] name = "zvariant_utils" -version = "2.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c51bcff7cc3dbb5055396bcf774748c3dab426b4b8659046963523cee4808340" +version = "2.2.0" +source = "git+https://github.com/dbus2/zbus/#bdf3e0c05ad903b3ac80fc795f2eb71cf3045c13" dependencies = [ + "nom", "proc-macro2", "quote", + "serde", + "static_assertions", "syn 2.0.65", ] diff --git a/Cargo.toml b/Cargo.toml index 05e128b..536bdf9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,8 +20,8 @@ name = "busd" path = "src/bin/busd.rs" [dependencies] -#zbus = { git = "https://github.com/dbus2/zbus/", features = [ -zbus = { version = "4.2.0", features = [ +zbus = { git = "https://github.com/dbus2/zbus/", features = [ +#zbus = { version = "4.2.0", features = [ "tokio", "bus-impl", ], default-features = false } diff --git a/src/bin/busd.rs b/src/bin/busd.rs index bc2b2cc..a360905 100644 --- a/src/bin/busd.rs +++ b/src/bin/busd.rs @@ -71,9 +71,9 @@ async fn main() -> Result<()> { busd::tracing_subscriber::init(); let args = Args::parse(); + let address = args.address.unwrap_or_else(default_address); - let mut bus = - bus::Bus::for_address(args.address.as_deref(), args.auth_mechanism.into()).await?; + let mut bus = bus::Bus::for_address(&address, args.auth_mechanism.into()).await?; #[cfg(unix)] if let Some(fd) = args.ready_fd { @@ -111,3 +111,26 @@ async fn main() -> Result<()> { Ok(()) } + +#[cfg(unix)] +fn default_address() -> String { + use std::{env, path::Path}; + + let runtime_dir = env::var("XDG_RUNTIME_DIR") + .as_ref() + .map(|s| Path::new(s).to_path_buf()) + .ok() + .unwrap_or_else(|| { + Path::new("/run") + .join("user") + .join(format!("{}", nix::unistd::Uid::current())) + }); + let path = runtime_dir.join("busd-session"); + + format!("unix:path={}", path.display()) +} + +#[cfg(not(unix))] +fn default_address() -> String { + "tcp:host=127.0.0.1,port=4242".to_string() +} diff --git a/src/bus/mod.rs b/src/bus/mod.rs index c0da0e9..1d5ed4a 100644 --- a/src/bus/mod.rs +++ b/src/bus/mod.rs @@ -5,19 +5,17 @@ use futures_util::{ future::{select, Either}, pin_mut, }; -#[cfg(unix)] -use std::{env, path::Path}; -use std::{str::FromStr, sync::Arc}; +use std::sync::Arc; #[cfg(unix)] use tokio::fs::remove_file; -use tokio::{spawn, task::JoinHandle}; +use tokio::task::{spawn, JoinHandle}; use tracing::{debug, info, trace, warn}; #[cfg(unix)] -use zbus::address::transport::{Unix, UnixSocket}; +use zbus::address::transport::Unix; use zbus::{ - address::{transport::Tcp, Transport}, - connection::socket::BoxedSplit, - Address, AuthMechanism, Connection, ConnectionBuilder, Guid, OwnedGuid, + address::transport::{Tcp, Transport}, + connection::{self, socket::BoxedSplit}, + Address, AuthMechanism, Connection, Guid, OwnedGuid, }; use crate::{ @@ -36,7 +34,7 @@ pub struct Bus { // All (cheaply) cloneable fields of `Bus` go here. #[derive(Clone, Debug)] pub struct Inner { - address: Address, + address: String, peers: Arc, guid: OwnedGuid, next_id: usize, @@ -52,21 +50,18 @@ enum Listener { } impl Bus { - pub async fn for_address(address: Option<&str>, auth_mechanism: AuthMechanism) -> Result { - let mut address = match address { - Some(address) => Address::from_str(address)?, - None => Address::from_str(&default_address())?, - }; - let guid: OwnedGuid = match address.guid() { - Some(guid) => guid.to_owned().into(), + pub async fn for_address(address: &str, auth_mechanism: AuthMechanism) -> Result { + let address = Address::try_from(address)?; + let (guid, addr) = match address.guid() { + Some(guid) => (Guid::try_from(guid)?.to_owned().into(), address.to_string()), None => { - let guid = Guid::generate(); - address = address.set_guid(guid.clone())?; - - guid.into() + let guid: OwnedGuid = Guid::generate().into(); + let addr = format!("{address},guid={guid}"); + (guid, addr) } }; - let listener = match address.transport() { + + let listener = match &address.transport()? { #[cfg(unix)] Transport::Unix(unix) => Self::unix_stream(unix).await, Transport::Tcp(tcp) => Self::tcp_stream(tcp).await, @@ -83,7 +78,7 @@ impl Bus { // Create a peer for ourselves. trace!("Creating self-dial connection."); let (client_socket, peer_socket) = zbus::connection::socket::Channel::pair(); - let service_conn = ConnectionBuilder::authenticated_socket(client_socket, guid.clone())? + let service_conn = connection::Builder::authenticated_socket(client_socket, guid.clone())? .p2p() .unique_name(fdo::BUS_NAME)? .name(fdo::BUS_NAME)? @@ -91,7 +86,7 @@ impl Bus { .serve_at(fdo::Monitoring::PATH, monitoring)? .build() .await?; - let peer_conn = ConnectionBuilder::authenticated_socket(peer_socket, guid.clone())? + let peer_conn = connection::Builder::authenticated_socket(peer_socket, guid.clone())? .p2p() .build() .await?; @@ -112,7 +107,7 @@ impl Bus { listener, cookie_task, inner: Inner { - address, + address: addr, peers, guid, next_id: 0, @@ -122,7 +117,7 @@ impl Bus { }) } - pub fn address(&self) -> &Address { + pub fn address(&self) -> &str { &self.inner.address } @@ -134,38 +129,40 @@ impl Bus { // AsyncDrop would have been nice! pub async fn cleanup(self) -> Result<()> { - match self.inner.address.transport() { - #[cfg(unix)] - Transport::Unix(unix) => match unix.path() { - UnixSocket::File(path) => remove_file(path).await.map_err(Into::into), - _ => Ok(()), - }, - _ => Ok(()), + #[cfg(unix)] + if let Listener::Unix(unix_listener) = self.listener { + let addr = unix_listener.local_addr()?; + if let Some(path) = addr.as_pathname() { + remove_file(path).await?; + } } + + Ok(()) } #[cfg(unix)] - async fn unix_stream(unix: &Unix) -> Result { + async fn unix_stream(unix: &Unix<'_>) -> Result { // TODO: Use tokio::net::UnixListener directly once it supports abstract sockets: // // https://github.com/tokio-rs/tokio/issues/4610 use std::os::unix::net::SocketAddr; + use zbus::address::transport::UnixAddrKind; - let addr = match unix.path() { + let addr = match unix.kind() { #[cfg(target_os = "linux")] - UnixSocket::Abstract(name) => { + UnixAddrKind::Abstract(name) => { use std::os::linux::net::SocketAddrExt; - let addr = SocketAddr::from_abstract_name(name.as_encoded_bytes())?; + let addr = SocketAddr::from_abstract_name(name)?; info!( "Listening on abstract UNIX socket `{}`.", - name.to_string_lossy() + String::from_utf8_lossy(name) ); addr } - UnixSocket::File(path) => { + UnixAddrKind::Path(path) => { let addr = SocketAddr::from_pathname(path)?; info!( "Listening on UNIX socket file `{}`.", @@ -174,8 +171,6 @@ impl Bus { addr } - UnixSocket::Dir(_) => bail!("`dir` transport is not supported (yet)."), - UnixSocket::TmpDir(_) => bail!("`tmpdir` transport is not supported (yet)."), _ => bail!("Unsupported address."), }; let std_listener = @@ -187,12 +182,15 @@ impl Bus { .map_err(Into::into) } - async fn tcp_stream(tcp: &Tcp) -> Result { - if tcp.nonce_file().is_some() { - bail!("`nonce-tcp` transport is not supported (yet)."); - } - info!("Listening on `{}:{}`.", tcp.host(), tcp.port()); - let address = (tcp.host(), tcp.port()); + async fn tcp_stream(tcp: &Tcp<'_>) -> Result { + let Some(host) = tcp.host() else { + bail!("No host= provided."); + }; + let Some(port) = tcp.port() else { + bail!("No port= provided."); + }; + info!("Listening on `{}:{}`.", host, port); + let address = (host, port); tokio::net::TcpListener::bind(address) .await @@ -234,14 +232,19 @@ impl Bus { } async fn accept(&mut self) -> Result { - let stream = match &mut self.listener { + match &mut self.listener { #[cfg(unix)] - Listener::Unix(listener) => listener.accept().await.map(|(stream, _)| stream.into())?, - Listener::Tcp(listener) => listener.accept().await.map(|(stream, _)| stream.into())?, - }; - debug!("Accepted connection on address `{}`", self.inner.address); - - Ok(stream) + Listener::Unix(listener) => { + let (stream, addr) = listener.accept().await?; + debug!("Accepted Unix connection from address `{:?}`", addr); + Ok(stream.into()) + } + Listener::Tcp(listener) => { + let (stream, addr) = listener.accept().await?; + debug!("Accepted TCP connection from address `{}`", addr); + Ok(stream.into()) + } + } } pub fn peers(&self) -> &Arc { @@ -262,24 +265,3 @@ impl Bus { self.inner.next_id } } - -#[cfg(unix)] -fn default_address() -> String { - let runtime_dir = env::var("XDG_RUNTIME_DIR") - .as_ref() - .map(|s| Path::new(s).to_path_buf()) - .ok() - .unwrap_or_else(|| { - Path::new("/run") - .join("user") - .join(format!("{}", nix::unistd::Uid::current())) - }); - let path = runtime_dir.join("busd-session"); - - format!("unix:path={}", path.display()) -} - -#[cfg(not(unix))] -fn default_address() -> String { - "tcp:host=127.0.0.1,port=4242".to_string() -} diff --git a/src/fdo/dbus.rs b/src/fdo/dbus.rs index 0c288e2..d652394 100644 --- a/src/fdo/dbus.rs +++ b/src/fdo/dbus.rs @@ -10,10 +10,11 @@ use zbus::{ fdo::{ ConnectionCredentials, Error, ReleaseNameReply, RequestNameFlags, RequestNameReply, Result, }, - interface, + interface, message, names::{BusName, InterfaceName, OwnedBusName, OwnedUniqueName, UniqueName, WellKnownName}, + object_server::{ResponseDispatchNotifier, SignalEmitter}, zvariant::Optional, - MessageHeader, OwnedGuid, OwnedMatchRule, ResponseDispatchNotifier, SignalContext, + OwnedGuid, OwnedMatchRule, }; use super::msg_sender; @@ -37,7 +38,7 @@ impl DBus { } /// Helper for D-Bus methods that call a function on a peer. - async fn call_mut_on_peer(&self, func: F, hdr: MessageHeader<'_>) -> Result + async fn call_mut_on_peer(&self, func: F, hdr: message::Header<'_>) -> Result where F: FnOnce(&mut Peer) -> Result, { @@ -64,8 +65,8 @@ impl DBus { /// This is already called & handled and we only need to handle it once. async fn hello( &self, - #[zbus(header)] hdr: MessageHeader<'_>, - #[zbus(signal_context)] ctxt: SignalContext<'_>, + #[zbus(header)] hdr: message::Header<'_>, + #[zbus(signal_emitter)] emitter: SignalEmitter<'_>, ) -> Result> { let name = msg_sender(&hdr); let peers = self.peers()?; @@ -80,7 +81,7 @@ impl DBus { // 2. The `Hello` response to arrive before the `NameAcquired` signal. let unique_name = peer.unique_name().clone(); let (response, listener) = ResponseDispatchNotifier::new(unique_name.clone()); - let ctxt = ctxt.to_owned(); + let ctxt = emitter.to_owned(); spawn(async move { listener.await; let owner = UniqueName::from(unique_name); @@ -110,7 +111,7 @@ impl DBus { &self, name: WellKnownName<'_>, flags: BitFlags, - #[zbus(header)] hdr: MessageHeader<'_>, + #[zbus(header)] hdr: message::Header<'_>, ) -> Result { let unique_name = msg_sender(&hdr); let peers = self.peers()?; @@ -133,7 +134,7 @@ impl DBus { async fn release_name( &self, name: WellKnownName<'_>, - #[zbus(header)] hdr: MessageHeader<'_>, + #[zbus(header)] hdr: message::Header<'_>, ) -> Result { let unique_name = msg_sender(&hdr); let peers = self.peers()?; @@ -175,7 +176,7 @@ impl DBus { async fn add_match( &self, rule: OwnedMatchRule, - #[zbus(header)] hdr: MessageHeader<'_>, + #[zbus(header)] hdr: message::Header<'_>, ) -> Result<()> { self.call_mut_on_peer( move |peer| { @@ -192,7 +193,7 @@ impl DBus { async fn remove_match( &self, rule: OwnedMatchRule, - #[zbus(header)] hdr: MessageHeader<'_>, + #[zbus(header)] hdr: message::Header<'_>, ) -> Result<()> { self.call_mut_on_peer(move |peer| peer.remove_match_rule(rule), hdr) .await @@ -384,7 +385,7 @@ impl DBus { /// It's also the signal to use to detect the appearance of new names on the bus. #[zbus(signal)] pub async fn name_owner_changed( - signal_ctxt: &SignalContext<'_>, + emitter: &SignalEmitter<'_>, name: BusName<'_>, old_owner: Optional>, new_owner: Optional>, @@ -392,12 +393,9 @@ impl DBus { /// This signal is sent to a specific application when it loses ownership of a name. #[zbus(signal)] - pub async fn name_lost(signal_ctxt: &SignalContext<'_>, name: BusName<'_>) -> zbus::Result<()>; + pub async fn name_lost(emitter: &SignalEmitter<'_>, name: BusName<'_>) -> zbus::Result<()>; /// This signal is sent to a specific application when it gains ownership of a name. #[zbus(signal)] - pub async fn name_acquired( - signal_ctxt: &SignalContext<'_>, - name: BusName<'_>, - ) -> zbus::Result<()>; + pub async fn name_acquired(emitter: &SignalEmitter<'_>, name: BusName<'_>) -> zbus::Result<()>; } diff --git a/src/fdo/mod.rs b/src/fdo/mod.rs index 446cae7..0e87199 100644 --- a/src/fdo/mod.rs +++ b/src/fdo/mod.rs @@ -2,12 +2,12 @@ mod dbus; pub use dbus::*; mod monitoring; pub use monitoring::*; -use zbus::{names::UniqueName, MessageHeader}; +use zbus::{message, names::UniqueName}; pub const BUS_NAME: &str = "org.freedesktop.DBus"; /// Helper for getting the peer name from a message header. -fn msg_sender<'h>(hdr: &'h MessageHeader<'h>) -> &'h UniqueName<'h> { +fn msg_sender<'h>(hdr: &'h message::Header<'h>) -> &'h UniqueName<'h> { // SAFETY: The bus (that's us!) is supposed to ensure a valid sender on the message. hdr.sender().expect("Missing `sender` header") } diff --git a/src/fdo/monitoring.rs b/src/fdo/monitoring.rs index d9cfad0..19c3d16 100644 --- a/src/fdo/monitoring.rs +++ b/src/fdo/monitoring.rs @@ -4,9 +4,9 @@ use tokio::spawn; use tracing::{debug, warn}; use zbus::{ fdo::{Error, Result}, - interface, + interface, message, + object_server::{ResponseDispatchNotifier, SignalEmitter}, zvariant::Optional, - MessageHeader, ResponseDispatchNotifier, SignalContext, }; use super::msg_sender; @@ -34,8 +34,8 @@ impl Monitoring { &self, match_rules: MatchRules, _flags: u32, - #[zbus(header)] hdr: MessageHeader<'_>, - #[zbus(signal_context)] ctxt: SignalContext<'_>, + #[zbus(header)] hdr: message::Header<'_>, + #[zbus(signal_emitter)] ctxt: SignalEmitter<'_>, ) -> Result> { let owner = msg_sender(&hdr).to_owned(); let peers = self diff --git a/src/peer/mod.rs b/src/peer/mod.rs index ffc3ec3..5e31aa1 100644 --- a/src/peer/mod.rs +++ b/src/peer/mod.rs @@ -7,8 +7,8 @@ pub use monitor::*; use anyhow::Result; use tracing::trace; use zbus::{ - connection::socket::BoxedSplit, names::OwnedUniqueName, AuthMechanism, Connection, - ConnectionBuilder, OwnedGuid, OwnedMatchRule, + connection, connection::socket::BoxedSplit, names::OwnedUniqueName, AuthMechanism, Connection, + OwnedGuid, OwnedMatchRule, }; use crate::{fdo, match_rules::MatchRules, name_registry::NameRegistry}; @@ -31,7 +31,7 @@ impl Peer { auth_mechanism: AuthMechanism, ) -> Result { let unique_name = OwnedUniqueName::try_from(format!(":busd.{id}")).unwrap(); - let conn = ConnectionBuilder::socket(socket) + let conn = connection::Builder::socket(socket) .server(guid)? .p2p() .auth_mechanism(auth_mechanism) diff --git a/src/peer/stream.rs b/src/peer/stream.rs index 926ff27..91a76ec 100644 --- a/src/peer/stream.rs +++ b/src/peer/stream.rs @@ -3,7 +3,7 @@ use std::pin::Pin; use anyhow::{bail, Error, Result}; use futures_util::{Stream as FutureStream, TryStream, TryStreamExt}; use tracing::trace; -use zbus::{zvariant::Type, Message, MessageBuilder, MessageStream, MessageType}; +use zbus::{message, zvariant::Signature, Message, MessageStream}; use crate::peer::Peer; @@ -30,7 +30,8 @@ impl Stream { let header = msg.header(); // Ensure destination field is present and readable for non-signals. - if msg.message_type() != MessageType::Signal && header.destination().is_none() { + if msg.message_type() != message::Type::Signal && header.destination().is_none() + { bail!("missing destination field"); } @@ -42,7 +43,7 @@ impl Stream { None => { let signature = match header.signature() { Some(sig) => sig.clone(), - None => <()>::signature(), + None => Signature::Unit, }; let body = msg.body(); let body_bytes = body.data(); @@ -53,7 +54,7 @@ impl Stream { .map(|fd| fd.try_clone().map(Into::into)) .collect::>>()?; let builder = - MessageBuilder::from(header.clone()).sender(&unique_name)?; + message::Builder::from(header.clone()).sender(&unique_name)?; let new_msg = unsafe { builder.build_raw_body( body_bytes, diff --git a/src/peers.rs b/src/peers.rs index de3d320..ed4af36 100644 --- a/src/peers.rs +++ b/src/peers.rs @@ -13,9 +13,10 @@ use tokio::{spawn, sync::RwLock}; use tracing::{debug, trace, warn}; use zbus::{ connection::socket::BoxedSplit, + message, names::{BusName, OwnedUniqueName, UniqueName}, zvariant::Optional, - AuthMechanism, Message, MessageType, OwnedGuid, + AuthMechanism, Message, OwnedGuid, }; use crate::{ @@ -217,7 +218,7 @@ impl Peers { }; match msg.message_type() { - MessageType::Signal => self.broadcast_msg(msg).await, + message::Type::Signal => self.broadcast_msg(msg).await, _ => match msg.header().destination() { Some(dest) => { if let Err(e) = self.send_msg(msg.clone(), dest.clone()).await { diff --git a/tests/fdo.rs b/tests/fdo.rs index c10ce42..876292a 100644 --- a/tests/fdo.rs +++ b/tests/fdo.rs @@ -13,9 +13,11 @@ use rand::{ use tokio::{select, sync::oneshot::Sender}; use tracing::instrument; use zbus::{ + connection, fdo::{self, DBusProxy, ReleaseNameReply, RequestNameFlags, RequestNameReply}, names::{BusName, WellKnownName}, - AuthMechanism, CacheProperties, ConnectionBuilder, + proxy::CacheProperties, + AuthMechanism, }; #[tokio::test(flavor = "multi_thread", worker_threads = 2)] @@ -40,9 +42,7 @@ async fn name_ownership_changes() { } async fn name_ownership_changes_(address: &str, auth_mechanism: AuthMechanism) { - let mut bus = Bus::for_address(Some(address), auth_mechanism) - .await - .unwrap(); + let mut bus = Bus::for_address(address, auth_mechanism).await.unwrap(); let (tx, rx) = tokio::sync::oneshot::channel(); let handle = tokio::spawn(async move { @@ -65,7 +65,7 @@ async fn name_ownership_changes_(address: &str, auth_mechanism: AuthMechanism) { #[instrument] async fn name_ownership_changes_client(address: &str, tx: Sender<()>) -> anyhow::Result<()> { - let conn = ConnectionBuilder::address(address)?.build().await?; + let conn = connection::Builder::address(address)?.build().await?; let conn_unique_name = conn.unique_name().unwrap().to_owned(); let dbus_proxy = DBusProxy::builder(&conn) .cache_properties(CacheProperties::No) @@ -127,7 +127,7 @@ async fn name_ownership_changes_client(address: &str, tx: Sender<()>) -> anyhow: ); // Now we try with another connection and we should be queued. - let conn2 = ConnectionBuilder::address(address)?.build().await?; + let conn2 = connection::Builder::address(address)?.build().await?; let conn2_unique_name = conn2.unique_name().unwrap().to_owned(); let changed = name_changed_stream.next().await.unwrap(); ensure!( diff --git a/tests/greet.rs b/tests/greet.rs index 1c7bb2a..f11edbe 100644 --- a/tests/greet.rs +++ b/tests/greet.rs @@ -14,11 +14,14 @@ use rand::{ use tokio::{select, sync::mpsc::channel, time::timeout}; use tracing::instrument; use zbus::{ + connection, fdo::{self, DBusProxy}, - interface, proxy, + interface, message, + object_server::SignalEmitter, + proxy, + proxy::CacheProperties, zvariant::ObjectPath, - AsyncDrop, AuthMechanism, CacheProperties, Connection, ConnectionBuilder, MatchRule, - MessageHeader, MessageStream, SignalContext, + AsyncDrop, AuthMechanism, Connection, MatchRule, MessageStream, }; #[tokio::test(flavor = "multi_thread", worker_threads = 2)] @@ -43,9 +46,7 @@ async fn greet() { } async fn greet_(socket_addr: &str, auth_mechanism: AuthMechanism) { - let mut bus = Bus::for_address(Some(socket_addr), auth_mechanism) - .await - .unwrap(); + let mut bus = Bus::for_address(socket_addr, auth_mechanism).await.unwrap(); let (tx, mut rx) = channel(1); let handle = tokio::spawn(async move { @@ -81,8 +82,8 @@ async fn greet_service(socket_addr: &str) -> anyhow::Result { async fn say_hello( &mut self, name: &str, - #[zbus(signal_context)] ctxt: SignalContext<'_>, - #[zbus(header)] header: MessageHeader<'_>, + #[zbus(signal_emitter)] ctxt: SignalEmitter<'_>, + #[zbus(header)] header: message::Header<'_>, ) -> fdo::Result { self.count += 1; let path = header.path().unwrap().clone(); @@ -95,7 +96,7 @@ async fn greet_service(socket_addr: &str) -> anyhow::Result { #[zbus(signal)] async fn greeted( - ctxt: &SignalContext<'_>, + ctxt: &SignalEmitter<'_>, name: &str, count: u64, path: ObjectPath<'_>, @@ -103,7 +104,7 @@ async fn greet_service(socket_addr: &str) -> anyhow::Result { } let greeter = Greeter { count: 0 }; - ConnectionBuilder::address(socket_addr)? + connection::Builder::address(socket_addr)? .name("org.zbus.MyGreeter")? .serve_at("/org/zbus/MyGreeter", greeter)? .build() @@ -124,7 +125,7 @@ async fn greet_client(socket_addr: &str) -> anyhow::Result<()> { async fn greeted(name: &str, count: u64, path: ObjectPath<'_>); } - let conn = ConnectionBuilder::address(socket_addr)?.build().await?; + let conn = connection::Builder::address(socket_addr)?.build().await?; let proxy = MyGreeterProxy::builder(&conn) .destination("org.zbus.MyGreeter")? diff --git a/tests/monitor.rs b/tests/monitor.rs index 84fd463..608295d 100644 --- a/tests/monitor.rs +++ b/tests/monitor.rs @@ -5,9 +5,12 @@ use ntest::timeout; use tokio::{select, sync::oneshot::Sender}; use tracing::instrument; use zbus::{ + connection, fdo::{DBusProxy, MonitoringProxy, NameAcquired, NameLost, NameOwnerChanged, RequestNameFlags}, + message, names::BusName, - AuthMechanism, CacheProperties, ConnectionBuilder, MessageStream, MessageType, + proxy::CacheProperties, + AuthMechanism, MessageStream, }; #[tokio::test(flavor = "multi_thread", worker_threads = 2)] @@ -17,7 +20,7 @@ async fn become_monitor() { busd::tracing_subscriber::init(); let address = "tcp:host=127.0.0.1,port=4242".to_string(); - let mut bus = Bus::for_address(Some(&address), AuthMechanism::Anonymous) + let mut bus = Bus::for_address(&address, AuthMechanism::Anonymous) .await .unwrap(); let (tx, rx) = tokio::sync::oneshot::channel(); @@ -43,7 +46,7 @@ async fn become_monitor() { #[instrument] async fn become_monitor_client(address: &str, tx: Sender<()>) -> anyhow::Result<()> { // Create a monitor that wants all messages. - let conn = ConnectionBuilder::address(address)?.build().await?; + let conn = connection::Builder::address(address)?.build().await?; let mut msg_stream = MessageStream::from(&conn); MonitoringProxy::builder(&conn) .cache_properties(CacheProperties::No) @@ -80,7 +83,7 @@ async fn become_monitor_client(address: &str, tx: Sender<()>) -> anyhow::Result< ); // Now a client that calls a method that triggers a signal. - let conn = ConnectionBuilder::address(address)?.build().await?; + let conn = connection::Builder::address(address)?.build().await?; let name = "org.dbus2.MonitorTest"; DBusProxy::builder(&conn) .cache_properties(CacheProperties::No) @@ -102,7 +105,7 @@ async fn become_monitor_client(address: &str, tx: Sender<()>) -> anyhow::Result< let member = header.member(); match msg.message_type() { - MessageType::MethodCall => match member.unwrap().as_str() { + message::Type::MethodCall => match member.unwrap().as_str() { "Hello" => { hello_serial = Some(msg.primary_header().serial_num()); } @@ -111,7 +114,7 @@ async fn become_monitor_client(address: &str, tx: Sender<()>) -> anyhow::Result< } method => panic!("unexpected method call: {}", method), }, - MessageType::MethodReturn => { + message::Type::MethodReturn => { let serial = header.reply_serial(); if serial == hello_serial { hello_serial = None; @@ -121,7 +124,7 @@ async fn become_monitor_client(address: &str, tx: Sender<()>) -> anyhow::Result< panic!("unexpected method return: {}", serial.unwrap()); } } - MessageType::Signal => { + message::Type::Signal => { if let Some(signal) = NameOwnerChanged::from_message(msg.clone()) { let args = signal.args()?; ensure!( diff --git a/tests/multiple_conns.rs b/tests/multiple_conns.rs index 9aba4ef..c8ab0d0 100644 --- a/tests/multiple_conns.rs +++ b/tests/multiple_conns.rs @@ -11,7 +11,7 @@ use rand::{ }; use tokio::{select, sync::oneshot::channel}; use tracing::instrument; -use zbus::{AuthMechanism, ConnectionBuilder}; +use zbus::{connection, AuthMechanism}; #[tokio::test(flavor = "multi_thread", worker_threads = 8)] #[instrument] @@ -34,9 +34,7 @@ async fn multi_conenct() { } async fn multi_conenct_(socket_addr: &str, auth_mechanism: AuthMechanism) { - let mut bus = Bus::for_address(Some(socket_addr), auth_mechanism) - .await - .unwrap(); + let mut bus = Bus::for_address(socket_addr, auth_mechanism).await.unwrap(); let (tx, rx) = channel(); let handle = tokio::spawn(async move { @@ -62,7 +60,7 @@ async fn multi_conenct_(socket_addr: &str, auth_mechanism: AuthMechanism) { async fn multi_clients_connect(socket_addr: &str) -> anyhow::Result<()> { // Create 10 connections simultaneously. let conns: Vec<_> = (0..10) - .map(|_| ConnectionBuilder::address(socket_addr).unwrap().build()) + .map(|_| connection::Builder::address(socket_addr).unwrap().build()) .collect(); join_all(conns).await;