Skip to content

Commit

Permalink
⚡️ Use new zbus::socket::Channel for self-dial connection
Browse files Browse the repository at this point in the history
This should improve the performance of our own D-Bus API since it avoids
deserialization and re-allocation on the receiver side.
  • Loading branch information
Zeeshan Ali Khan committed Apr 19, 2024
1 parent 4d49272 commit 308ad15
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 34 deletions.
53 changes: 25 additions & 28 deletions src/bus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ mod cookies;
use anyhow::{bail, Error, Ok, Result};
use futures_util::{
future::{select, Either},
pin_mut, try_join, TryFutureExt,
pin_mut,
};
use std::{cell::OnceCell, str::FromStr, sync::Arc};
#[cfg(unix)]
use std::{env, path::Path};
use std::{str::FromStr, sync::Arc};
#[cfg(unix)]
use tokio::fs::remove_file;
use tokio::{spawn, task::JoinHandle};
Expand Down Expand Up @@ -39,9 +39,9 @@ pub struct Inner {
address: Address,
peers: Arc<Peers>,
guid: OwnedGuid,
next_id: Option<usize>,
next_id: usize,
auth_mechanism: AuthMechanism,
self_conn: OnceCell<Connection>,
_self_conn: Connection,
}

#[derive(Debug)]
Expand Down Expand Up @@ -75,27 +75,31 @@ impl Bus {
_ => bail!("Unsupported address `{}`.", address),
}?;

let mut bus = Self::new(address.clone(), guid.clone(), listener, auth_mechanism).await?;
let peers = Peers::new();

let dbus = DBus::new(peers.clone(), guid.clone());
let monitoring = Monitoring::new(peers.clone());

// Create a peer for ourselves.
trace!("Creating self-dial connection.");
let dbus = DBus::new(bus.peers().clone(), guid.clone());
let monitoring = Monitoring::new(bus.peers().clone());
let conn_builder_fut = ConnectionBuilder::address(address)?
.auth_mechanisms(&[auth_mechanism])
let (client_socket, peer_socket) = zbus::connection::socket::Channel::pair();
let service_conn = ConnectionBuilder::authenticated_socket(client_socket, guid.clone())?
.p2p()
.unique_name(fdo::BUS_NAME)?
.name(fdo::BUS_NAME)?
.serve_at(fdo::DBus::PATH, dbus)?
.serve_at(fdo::Monitoring::PATH, monitoring)?
.build()
.map_err(Into::into);
.await?;
let peer_conn = ConnectionBuilder::authenticated_socket(peer_socket, guid.clone())?
.p2p()
.build()
.await?;

let (conn, ()) = try_join!(conn_builder_fut, bus.accept_next())?;
bus.inner.self_conn.set(conn).unwrap();
peers.add_us(peer_conn).await;
trace!("Self-dial connection created.");

Ok(bus)
Self::new(address, peers, guid, listener, auth_mechanism, service_conn).await
}

pub fn address(&self) -> &Address {
Expand All @@ -122,9 +126,11 @@ impl Bus {

async fn new(
address: Address,
peers: Arc<Peers>,
guid: OwnedGuid,
listener: Listener,
auth_mechanism: AuthMechanism,
self_conn: Connection,
) -> Result<Self> {
let cookie_task = if auth_mechanism == AuthMechanism::Cookie {
let (cookie_task, cookie_sync_rx) = cookies::run_sync();
Expand All @@ -139,11 +145,11 @@ impl Bus {
cookie_task,
inner: Inner {
address,
peers: Peers::new(),
peers,
guid,
next_id: None,
next_id: 0,
auth_mechanism,
self_conn: OnceCell::new(),
_self_conn: self_conn,
},
})
}
Expand Down Expand Up @@ -260,19 +266,10 @@ impl Bus {
self.inner.auth_mechanism
}

fn next_id(&mut self) -> Option<usize> {
match self.inner.next_id {
None => {
self.inner.next_id = Some(0);

None
}
Some(id) => {
self.inner.next_id = Some(id + 1);
fn next_id(&mut self) -> usize {
self.inner.next_id += 1;

Some(id)
}
}
self.inner.next_id
}
}

Expand Down
20 changes: 15 additions & 5 deletions src/peer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,11 @@ pub struct Peer {
impl Peer {
pub async fn new(
guid: OwnedGuid,
id: Option<usize>,
id: usize,
socket: BoxedSplit,
auth_mechanism: AuthMechanism,
) -> Result<Self> {
let unique_name = match id {
Some(id) => OwnedUniqueName::try_from(format!(":busd.{id}")).unwrap(),
None => OwnedUniqueName::try_from(fdo::BUS_NAME).unwrap(),
};
let unique_name = OwnedUniqueName::try_from(format!(":busd.{id}")).unwrap();
let conn = ConnectionBuilder::socket(socket)
.server(guid)?
.p2p()
Expand All @@ -51,6 +48,19 @@ impl Peer {
})
}

// This the the bus itself, serving the FDO D-Bus API.
pub async fn new_us(conn: Connection) -> Self {
let unique_name = OwnedUniqueName::try_from(fdo::BUS_NAME).unwrap();

Self {
conn,
unique_name,
match_rules: MatchRules::default(),
greeted: true,
canceled_event: Event::new(),
}
}

pub fn unique_name(&self) -> &OwnedUniqueName {
&self.unique_name
}
Expand Down
23 changes: 22 additions & 1 deletion src/peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl Peers {
pub async fn add(
self: &Arc<Self>,
guid: &OwnedGuid,
id: Option<usize>,
id: usize,
socket: BoxedSplit,
auth_mechanism: AuthMechanism,
) -> Result<()> {
Expand All @@ -72,6 +72,27 @@ impl Peers {
Ok(())
}

pub async fn add_us(self: &Arc<Self>, conn: zbus::Connection) {
let mut peers = self.peers_mut().await;
let peer = Peer::new_us(conn).await;
let unique_name = peer.unique_name().clone();
match peers.get(&unique_name) {
Some(peer) => panic!(
"Unique name `{}` re-used. We're in deep trouble if this happens",
peer.unique_name()
),
None => {
let peer_stream = peer.stream();
let listener = peer.listen_cancellation();
tokio::spawn(
self.clone()
.serve_peer(peer_stream, listener, unique_name.clone()),
);
peers.insert(unique_name.clone(), peer);
}
}
}

pub async fn peers(&self) -> impl Deref<Target = BTreeMap<OwnedUniqueName, Peer>> + '_ {
self.peers.read().await
}
Expand Down

0 comments on commit 308ad15

Please sign in to comment.