Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

⚡️ Use new zbus::socket::Channel for self-dial connection #73

Merged
merged 2 commits into from
Apr 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 22 additions & 35 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ name = "busd"
path = "src/bin/busd.rs"

[dependencies]
zbus = { git = "https://github.com/dbus2/zbus/", features = ["tokio", "bus-impl"], default-features = false }
zbus = { git = "https://github.com/dbus2/zbus/", features = [
"tokio",
"bus-impl",
], default-features = false }
#zbus = { version = "4", features = ["tokio", "bus-impl"], default-features = false }
tokio = { version = "1.37.0", features = [
"macros",
Expand Down
93 changes: 40 additions & 53 deletions src/bus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ mod cookies;
use anyhow::{bail, Error, Ok, Result};
use futures_util::{
future::{select, Either},
pin_mut, try_join, TryFutureExt,
pin_mut,
};
use std::{cell::OnceCell, str::FromStr, sync::Arc};
#[cfg(unix)]
use std::{env, path::Path};
use std::{str::FromStr, sync::Arc};
#[cfg(unix)]
use tokio::fs::remove_file;
use tokio::{spawn, task::JoinHandle};
Expand Down Expand Up @@ -39,9 +39,9 @@ pub struct Inner {
address: Address,
peers: Arc<Peers>,
guid: OwnedGuid,
next_id: Option<usize>,
next_id: usize,
auth_mechanism: AuthMechanism,
self_conn: OnceCell<Connection>,
_self_conn: Connection,
}

#[derive(Debug)]
Expand Down Expand Up @@ -75,27 +75,51 @@ impl Bus {
_ => bail!("Unsupported address `{}`.", address),
}?;

let mut bus = Self::new(address.clone(), guid.clone(), listener, auth_mechanism).await?;
let peers = Peers::new();

let dbus = DBus::new(peers.clone(), guid.clone());
let monitoring = Monitoring::new(peers.clone());

// Create a peer for ourselves.
trace!("Creating self-dial connection.");
let dbus = DBus::new(bus.peers().clone(), guid.clone());
let monitoring = Monitoring::new(bus.peers().clone());
let conn_builder_fut = ConnectionBuilder::address(address)?
.auth_mechanisms(&[auth_mechanism])
let (client_socket, peer_socket) = zbus::connection::socket::Channel::pair();
let service_conn = ConnectionBuilder::authenticated_socket(client_socket, guid.clone())?
.p2p()
.unique_name(fdo::BUS_NAME)?
.name(fdo::BUS_NAME)?
.serve_at(fdo::DBus::PATH, dbus)?
.serve_at(fdo::Monitoring::PATH, monitoring)?
.build()
.map_err(Into::into);
.await?;
let peer_conn = ConnectionBuilder::authenticated_socket(peer_socket, guid.clone())?
.p2p()
.build()
.await?;

let (conn, ()) = try_join!(conn_builder_fut, bus.accept_next())?;
bus.inner.self_conn.set(conn).unwrap();
peers.add_us(peer_conn).await;
trace!("Self-dial connection created.");

Ok(bus)
let cookie_task = if auth_mechanism == AuthMechanism::Cookie {
let (cookie_task, cookie_sync_rx) = cookies::run_sync();
cookie_sync_rx.await?;

Some(cookie_task)
} else {
None
};

Ok(Self {
listener,
cookie_task,
inner: Inner {
address,
peers,
guid,
next_id: 0,
auth_mechanism,
_self_conn: service_conn,
},
})
}

pub fn address(&self) -> &Address {
Expand All @@ -120,34 +144,6 @@ impl Bus {
}
}

async fn new(
address: Address,
guid: OwnedGuid,
listener: Listener,
auth_mechanism: AuthMechanism,
) -> Result<Self> {
let cookie_task = if auth_mechanism == AuthMechanism::Cookie {
let (cookie_task, cookie_sync_rx) = cookies::run_sync();
cookie_sync_rx.await?;

Some(cookie_task)
} else {
None
};
Ok(Self {
listener,
cookie_task,
inner: Inner {
address,
peers: Peers::new(),
guid,
next_id: None,
auth_mechanism,
self_conn: OnceCell::new(),
},
})
}

#[cfg(unix)]
async fn unix_stream(unix: &Unix) -> Result<Listener> {
// TODO: Use tokio::net::UnixListener directly once it supports abstract sockets:
Expand Down Expand Up @@ -260,19 +256,10 @@ impl Bus {
self.inner.auth_mechanism
}

fn next_id(&mut self) -> Option<usize> {
match self.inner.next_id {
None => {
self.inner.next_id = Some(0);

None
}
Some(id) => {
self.inner.next_id = Some(id + 1);
fn next_id(&mut self) -> usize {
self.inner.next_id += 1;

Some(id)
}
}
self.inner.next_id
}
}

Expand Down
20 changes: 15 additions & 5 deletions src/peer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,11 @@ pub struct Peer {
impl Peer {
pub async fn new(
guid: OwnedGuid,
id: Option<usize>,
id: usize,
socket: BoxedSplit,
auth_mechanism: AuthMechanism,
) -> Result<Self> {
let unique_name = match id {
Some(id) => OwnedUniqueName::try_from(format!(":busd.{id}")).unwrap(),
None => OwnedUniqueName::try_from(fdo::BUS_NAME).unwrap(),
};
let unique_name = OwnedUniqueName::try_from(format!(":busd.{id}")).unwrap();
let conn = ConnectionBuilder::socket(socket)
.server(guid)?
.p2p()
Expand All @@ -51,6 +48,19 @@ impl Peer {
})
}

// This the the bus itself, serving the FDO D-Bus API.
pub async fn new_us(conn: Connection) -> Self {
let unique_name = OwnedUniqueName::try_from(fdo::BUS_NAME).unwrap();

Self {
conn,
unique_name,
match_rules: MatchRules::default(),
greeted: true,
canceled_event: Event::new(),
}
}

pub fn unique_name(&self) -> &OwnedUniqueName {
&self.unique_name
}
Expand Down
Loading