From 308ad15ea7c145eb48a6b0d1d1250412763acb6d Mon Sep 17 00:00:00 2001 From: Zeeshan Ali Khan Date: Wed, 17 Apr 2024 22:45:17 +0200 Subject: [PATCH] =?UTF-8?q?=E2=9A=A1=EF=B8=8F=20Use=20new=20zbus::socket::?= =?UTF-8?q?Channel=20for=20self-dial=20connection?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This should improve the performance of our own D-Bus API since it avoids deserialization and re-allocation on the receiver side. --- src/bus/mod.rs | 53 +++++++++++++++++++++++-------------------------- src/peer/mod.rs | 20 ++++++++++++++----- src/peers.rs | 23 ++++++++++++++++++++- 3 files changed, 62 insertions(+), 34 deletions(-) diff --git a/src/bus/mod.rs b/src/bus/mod.rs index bc322d0..7194a54 100644 --- a/src/bus/mod.rs +++ b/src/bus/mod.rs @@ -3,11 +3,11 @@ mod cookies; use anyhow::{bail, Error, Ok, Result}; use futures_util::{ future::{select, Either}, - pin_mut, try_join, TryFutureExt, + pin_mut, }; -use std::{cell::OnceCell, str::FromStr, sync::Arc}; #[cfg(unix)] use std::{env, path::Path}; +use std::{str::FromStr, sync::Arc}; #[cfg(unix)] use tokio::fs::remove_file; use tokio::{spawn, task::JoinHandle}; @@ -39,9 +39,9 @@ pub struct Inner { address: Address, peers: Arc, guid: OwnedGuid, - next_id: Option, + next_id: usize, auth_mechanism: AuthMechanism, - self_conn: OnceCell, + _self_conn: Connection, } #[derive(Debug)] @@ -75,27 +75,31 @@ impl Bus { _ => bail!("Unsupported address `{}`.", address), }?; - let mut bus = Self::new(address.clone(), guid.clone(), listener, auth_mechanism).await?; + let peers = Peers::new(); + + let dbus = DBus::new(peers.clone(), guid.clone()); + let monitoring = Monitoring::new(peers.clone()); // Create a peer for ourselves. trace!("Creating self-dial connection."); - let dbus = DBus::new(bus.peers().clone(), guid.clone()); - let monitoring = Monitoring::new(bus.peers().clone()); - let conn_builder_fut = ConnectionBuilder::address(address)? - .auth_mechanisms(&[auth_mechanism]) + let (client_socket, peer_socket) = zbus::connection::socket::Channel::pair(); + let service_conn = ConnectionBuilder::authenticated_socket(client_socket, guid.clone())? .p2p() .unique_name(fdo::BUS_NAME)? .name(fdo::BUS_NAME)? .serve_at(fdo::DBus::PATH, dbus)? .serve_at(fdo::Monitoring::PATH, monitoring)? .build() - .map_err(Into::into); + .await?; + let peer_conn = ConnectionBuilder::authenticated_socket(peer_socket, guid.clone())? + .p2p() + .build() + .await?; - let (conn, ()) = try_join!(conn_builder_fut, bus.accept_next())?; - bus.inner.self_conn.set(conn).unwrap(); + peers.add_us(peer_conn).await; trace!("Self-dial connection created."); - Ok(bus) + Self::new(address, peers, guid, listener, auth_mechanism, service_conn).await } pub fn address(&self) -> &Address { @@ -122,9 +126,11 @@ impl Bus { async fn new( address: Address, + peers: Arc, guid: OwnedGuid, listener: Listener, auth_mechanism: AuthMechanism, + self_conn: Connection, ) -> Result { let cookie_task = if auth_mechanism == AuthMechanism::Cookie { let (cookie_task, cookie_sync_rx) = cookies::run_sync(); @@ -139,11 +145,11 @@ impl Bus { cookie_task, inner: Inner { address, - peers: Peers::new(), + peers, guid, - next_id: None, + next_id: 0, auth_mechanism, - self_conn: OnceCell::new(), + _self_conn: self_conn, }, }) } @@ -260,19 +266,10 @@ impl Bus { self.inner.auth_mechanism } - fn next_id(&mut self) -> Option { - match self.inner.next_id { - None => { - self.inner.next_id = Some(0); - - None - } - Some(id) => { - self.inner.next_id = Some(id + 1); + fn next_id(&mut self) -> usize { + self.inner.next_id += 1; - Some(id) - } - } + self.inner.next_id } } diff --git a/src/peer/mod.rs b/src/peer/mod.rs index d9ae1f1..ee9fb90 100644 --- a/src/peer/mod.rs +++ b/src/peer/mod.rs @@ -26,14 +26,11 @@ pub struct Peer { impl Peer { pub async fn new( guid: OwnedGuid, - id: Option, + id: usize, socket: BoxedSplit, auth_mechanism: AuthMechanism, ) -> Result { - let unique_name = match id { - Some(id) => OwnedUniqueName::try_from(format!(":busd.{id}")).unwrap(), - None => OwnedUniqueName::try_from(fdo::BUS_NAME).unwrap(), - }; + let unique_name = OwnedUniqueName::try_from(format!(":busd.{id}")).unwrap(); let conn = ConnectionBuilder::socket(socket) .server(guid)? .p2p() @@ -51,6 +48,19 @@ impl Peer { }) } + // This the the bus itself, serving the FDO D-Bus API. + pub async fn new_us(conn: Connection) -> Self { + let unique_name = OwnedUniqueName::try_from(fdo::BUS_NAME).unwrap(); + + Self { + conn, + unique_name, + match_rules: MatchRules::default(), + greeted: true, + canceled_event: Event::new(), + } + } + pub fn unique_name(&self) -> &OwnedUniqueName { &self.unique_name } diff --git a/src/peers.rs b/src/peers.rs index 156682c..de3d320 100644 --- a/src/peers.rs +++ b/src/peers.rs @@ -46,7 +46,7 @@ impl Peers { pub async fn add( self: &Arc, guid: &OwnedGuid, - id: Option, + id: usize, socket: BoxedSplit, auth_mechanism: AuthMechanism, ) -> Result<()> { @@ -72,6 +72,27 @@ impl Peers { Ok(()) } + pub async fn add_us(self: &Arc, conn: zbus::Connection) { + let mut peers = self.peers_mut().await; + let peer = Peer::new_us(conn).await; + let unique_name = peer.unique_name().clone(); + match peers.get(&unique_name) { + Some(peer) => panic!( + "Unique name `{}` re-used. We're in deep trouble if this happens", + peer.unique_name() + ), + None => { + let peer_stream = peer.stream(); + let listener = peer.listen_cancellation(); + tokio::spawn( + self.clone() + .serve_peer(peer_stream, listener, unique_name.clone()), + ); + peers.insert(unique_name.clone(), peer); + } + } + } + pub async fn peers(&self) -> impl Deref> + '_ { self.peers.read().await }