Skip to content

Commit

Permalink
👽️ Port to zbus 4 API
Browse files Browse the repository at this point in the history
This also implies bumping required versions of shared depdendencies, like
event-listener and tokio.
  • Loading branch information
Zeeshan Ali Khan committed Feb 9, 2024
1 parent c756be5 commit ac4c62d
Show file tree
Hide file tree
Showing 14 changed files with 526 additions and 463 deletions.
604 changes: 337 additions & 267 deletions Cargo.lock

Large diffs are not rendered by default.

20 changes: 15 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,22 @@ name = "busd"
path = "src/bin/busd.rs"

[dependencies]
zbus = { git = "https://github.com/dbus2/zbus/", features = ["tokio"], default-features = false }
#zbus = { version = "3.14.1", features = ["tokio"], default-features = false }
tokio = { version = "1.19.2", features = ["macros", "rt-multi-thread", "signal", "tracing", "fs" ] }
zbus = { git = "https://github.com/dbus2/zbus/", features = ["tokio", "bus-impl"], default-features = false }
#zbus = { version = "4", features = ["tokio", "bus-impl"], default-features = false }
tokio = { version = "1.19.2", features = [
"macros",
"rt-multi-thread",
"signal",
"tracing",
"fs",
] }
clap = { version = "4.0.18", features = ["derive"] }
tracing = "0.1.34"
tracing-subscriber = { version = "0.3.11", features = ["env-filter" , "fmt", "ansi"], default-features = false, optional = true }
tracing-subscriber = { version = "0.3.11", features = [
"env-filter",
"fmt",
"ansi",
], default-features = false, optional = true }
anyhow = "1.0.58"
# Explicitly depend on serde to enable `rc` feature.
serde = { version = "1.0.140", features = ["rc"] }
Expand All @@ -36,7 +46,7 @@ console-subscriber = { version = "0.1.8", optional = true }
hex = "0.4.3"
xdg-home = "1.0.0"
rand = "0.8.5"
event-listener = "2.5.3"
event-listener = "5"

[target.'cfg(unix)'.dependencies]
nix = "0.26.0"
Expand Down
163 changes: 90 additions & 73 deletions src/bus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,18 @@ use futures_util::{
};
use std::{cell::OnceCell, str::FromStr, sync::Arc};
#[cfg(unix)]
use std::{
env,
path::{Path, PathBuf},
};
use std::{env, path::Path};
#[cfg(unix)]
use tokio::fs::remove_file;
use tokio::{spawn, task::JoinHandle};
use tracing::{debug, info, trace, warn};
use zbus::{Address, AuthMechanism, Connection, ConnectionBuilder, Guid, Socket, TcpAddress};
#[cfg(unix)]
use zbus::address::transport::{Unix, UnixSocket};
use zbus::{
address::{transport::Tcp, Transport},
connection::socket::BoxedSplit,
Address, AuthMechanism, Connection, ConnectionBuilder, Guid, OwnedGuid,
};

use crate::{
fdo::{self, DBus, Monitoring},
Expand All @@ -35,7 +38,7 @@ pub struct Bus {
pub struct Inner {
address: Address,
peers: Arc<Peers>,
guid: Arc<Guid>,
guid: OwnedGuid,
next_id: Option<usize>,
auth_mechanism: AuthMechanism,
self_conn: OnceCell<Connection>,
Expand All @@ -44,45 +47,38 @@ pub struct Inner {
#[derive(Debug)]
enum Listener {
#[cfg(unix)]
Unix {
listener: tokio::net::UnixListener,
socket_path: PathBuf,
},
Tcp {
listener: tokio::net::TcpListener,
},
Unix(tokio::net::UnixListener),
Tcp(tokio::net::TcpListener),
}

impl Bus {
pub async fn for_address(address: Option<&str>, auth_mechanism: AuthMechanism) -> Result<Self> {
let address = match address {
Some(address) => address.to_string(),
None => default_address(),
let mut address = match address {
Some(address) => Address::from_str(address)?,
None => Address::from_str(&default_address())?,
};
let address = Address::from_str(&address)?;
let listener = match &address {
#[cfg(unix)]
Address::Unix(path) => {
let path = Path::new(&path);
info!("Listening on {}.", path.display());

Self::unix_stream(path).await
}
Address::Tcp(address) => {
info!("Listening on `{}:{}`.", address.host(), address.port());
let guid: OwnedGuid = match address.guid() {
Some(guid) => guid.to_owned().into(),
None => {
let guid = Guid::generate();
address = address.set_guid(guid.clone())?;

Self::tcp_stream(address).await
guid.into()
}
Address::NonceTcp { .. } => bail!("`nonce-tcp` transport is not supported (yet)."),
Address::Autolaunch(_) => bail!("`autolaunch` transport is not supported (yet)."),
};
let listener = match address.transport() {
#[cfg(unix)]
Transport::Unix(unix) => Self::unix_stream(unix).await,
Transport::Tcp(tcp) => Self::tcp_stream(tcp).await,
#[cfg(windows)]
Transport::Autolaunch(_) => bail!("`autolaunch` transport is not supported (yet)."),
_ => bail!("Unsupported address `{}`.", address),
}?;

let mut bus = Self::new(address.clone(), listener, auth_mechanism).await?;
let mut bus = Self::new(address.clone(), guid.clone(), listener, auth_mechanism).await?;

// Create a peer for ourselves.
trace!("Creating self-dial connection.");
let guid = bus.guid().clone();
let dbus = DBus::new(bus.peers().clone(), guid.clone());
let monitoring = Monitoring::new(bus.peers().clone());
let conn_builder_fut = ConnectionBuilder::address(address)?
Expand Down Expand Up @@ -114,17 +110,18 @@ impl Bus {

// AsyncDrop would have been nice!
pub async fn cleanup(self) -> Result<()> {
match self.listener {
#[cfg(unix)]
Listener::Unix { socket_path, .. } => {
remove_file(socket_path).await.map_err(Into::into)
}
Listener::Tcp { .. } => Ok(()),
match self.inner.address.transport() {
Transport::Unix(unix) => match unix.path() {
UnixSocket::File(path) => remove_file(path).await.map_err(Into::into),
_ => Ok(()),
},
_ => Ok(()),
}
}

async fn new(
address: Address,
guid: OwnedGuid,
listener: Listener,
auth_mechanism: AuthMechanism,
) -> Result<Self> {
Expand All @@ -142,7 +139,7 @@ impl Bus {
inner: Inner {
address,
peers: Peers::new(),
guid: Arc::new(Guid::generate()),
guid,
next_id: None,
auth_mechanism,
self_conn: OnceCell::new(),
Expand All @@ -151,23 +148,56 @@ impl Bus {
}

#[cfg(unix)]
async fn unix_stream(socket_path: &Path) -> Result<Listener> {
let socket_path = socket_path.to_path_buf();
let listener = Listener::Unix {
listener: tokio::net::UnixListener::bind(&socket_path)?,
socket_path,
};
async fn unix_stream(unix: &Unix) -> Result<Listener> {
// TODO: Use tokio::net::UnixListener directly once it supports abstract sockets:
//
// https://github.com/tokio-rs/tokio/issues/4610

use std::os::{linux::net::SocketAddrExt, unix::net::SocketAddr};

let addr = match unix.path() {
UnixSocket::Abstract(name) => {
let addr = SocketAddr::from_abstract_name(name.as_encoded_bytes())?;
info!(
"Listening on abstract UNIX socket `{}`.",
name.to_string_lossy()
);

Ok(listener)
}
addr
}
UnixSocket::File(path) => {
let addr = SocketAddr::from_pathname(path)?;
info!(
"Listening on UNIX socket file `{}`.",
path.to_string_lossy()
);

async fn tcp_stream(address: &TcpAddress) -> Result<Listener> {
let address = (address.host(), address.port());
let listener = Listener::Tcp {
listener: tokio::net::TcpListener::bind(address).await?,
addr
}
UnixSocket::Dir(_) => bail!("`dir` transport is not supported (yet)."),
UnixSocket::TmpDir(_) => bail!("`tmpdir` transport is not supported (yet)."),
_ => bail!("Unsupported address."),
};
let std_listener =
tokio::task::spawn_blocking(move || std::os::unix::net::UnixListener::bind_addr(&addr))
.await??;
std_listener.set_nonblocking(true)?;
tokio::net::UnixListener::from_std(std_listener)
.map(Listener::Unix)
.map_err(Into::into)
}

async fn tcp_stream(tcp: &Tcp) -> Result<Listener> {
if tcp.nonce_file().is_some() {
bail!("`nonce-tcp` transport is not supported (yet).");
}
info!("Listening on `{}:{}`.", tcp.host(), tcp.port());
let address = (tcp.host(), tcp.port());

Ok(listener)
tokio::net::TcpListener::bind(address)
.await
.map(Listener::Tcp)
.map_err(Into::into)
}

async fn accept_next(&mut self) -> Result<()> {
Expand Down Expand Up @@ -203,35 +233,22 @@ impl Bus {
Ok(())
}

async fn accept(&mut self) -> Result<Box<dyn Socket + 'static>> {
match &mut self.listener {
async fn accept(&mut self) -> Result<BoxedSplit> {
let stream = match &mut self.listener {
#[cfg(unix)]
Listener::Unix {
listener,
socket_path,
} => {
let (unix_stream, _) = listener.accept().await?;
debug!(
"Accepted connection on socket file {}",
socket_path.display()
);

Ok(Box::new(unix_stream))
}
Listener::Tcp { listener } => {
let (tcp_stream, addr) = listener.accept().await?;
debug!("Accepted connection from {addr}");
Listener::Unix(listener) => listener.accept().await.map(|(stream, _)| stream.into())?,
Listener::Tcp(listener) => listener.accept().await.map(|(stream, _)| stream.into())?,
};
debug!("Accepted connection on address `{}`", self.inner.address);

Ok(Box::new(tcp_stream))
}
}
Ok(stream)
}

pub fn peers(&self) -> &Arc<Peers> {
&self.inner.peers
}

pub fn guid(&self) -> &Arc<Guid> {
pub fn guid(&self) -> &OwnedGuid {
&self.inner.guid
}

Expand Down
26 changes: 13 additions & 13 deletions src/fdo/dbus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ use enumflags2::BitFlags;
use tokio::spawn;
use tracing::warn;
use zbus::{
dbus_interface,
fdo::{
ConnectionCredentials, Error, ReleaseNameReply, RequestNameFlags, RequestNameReply, Result,
},
interface,
names::{BusName, InterfaceName, OwnedBusName, OwnedUniqueName, UniqueName, WellKnownName},
zvariant::Optional,
Guid, MessageHeader, OwnedMatchRule, ResponseDispatchNotifier, SignalContext,
MessageHeader, OwnedGuid, OwnedMatchRule, ResponseDispatchNotifier, SignalContext,
};

use super::msg_sender;
Expand All @@ -22,14 +22,14 @@ use crate::{peer::Peer, peers::Peers};
#[derive(Debug)]
pub struct DBus {
peers: Weak<Peers>,
guid: Arc<Guid>,
guid: OwnedGuid,
}

impl DBus {
pub const PATH: &'static str = "/org/freedesktop/DBus";
pub const INTERFACE: &'static str = "org.freedesktop.DBus";

pub fn new(peers: Arc<Peers>, guid: Arc<Guid>) -> Self {
pub fn new(peers: Arc<Peers>, guid: OwnedGuid) -> Self {
Self {
peers: Arc::downgrade(&peers),
guid,
Expand Down Expand Up @@ -59,7 +59,7 @@ impl DBus {
}
}

#[dbus_interface(interface = "org.freedesktop.DBus")]
#[interface(interface = "org.freedesktop.DBus")]
impl DBus {
/// This is already called & handled and we only need to handle it once.
async fn hello(
Expand Down Expand Up @@ -224,7 +224,7 @@ impl DBus {
}

/// Returns the security context used by SELinux, in an unspecified format.
#[dbus_interface(name = "GetConnectionSELinuxSecurityContext")]
#[zbus(name = "GetConnectionSELinuxSecurityContext")]
async fn get_connection_selinux_security_context(
&self,
bus_name: BusName<'_>,
Expand All @@ -239,7 +239,7 @@ impl DBus {
}

/// Returns the Unix process ID of the process connected to the server.
#[dbus_interface(name = "GetConnectionUnixProcessID")]
#[zbus(name = "GetConnectionUnixProcessID")]
async fn get_connection_unix_process_id(&self, bus_name: BusName<'_>) -> Result<u32> {
self.get_connection_credentials(bus_name.clone())
.await
Expand All @@ -264,7 +264,7 @@ impl DBus {
}

/// Gets the unique ID of the bus.
fn get_id(&self) -> &Guid {
fn get_id(&self) -> &OwnedGuid {
&self.guid
}

Expand Down Expand Up @@ -356,7 +356,7 @@ impl DBus {

/// This property lists abstract “features” provided by the message bus, and can be used by
/// clients to detect the capabilities of the message bus with which they are communicating.
#[dbus_interface(property)]
#[zbus(property)]
fn features(&self) -> &[&str] {
&[]
}
Expand All @@ -373,7 +373,7 @@ impl DBus {
/// `org.freedesktop.DBus` was successful. The standard `org.freedesktop.DBus.Peer` and
/// `org.freedesktop.DBus.Introspectable` interfaces are not included in the value of this
/// property either, because they do not indicate features of the message bus implementation.
#[dbus_interface(property)]
#[zbus(property)]
fn interfaces(&self) -> &[InterfaceName<'_>] {
// TODO: List `org.freedesktop.DBus.Monitoring` when we support it.
&[]
Expand All @@ -382,7 +382,7 @@ impl DBus {
/// This signal indicates that the owner of a name has changed.
///
/// It's also the signal to use to detect the appearance of new names on the bus.
#[dbus_interface(signal)]
#[zbus(signal)]
pub async fn name_owner_changed(
signal_ctxt: &SignalContext<'_>,
name: BusName<'_>,
Expand All @@ -391,11 +391,11 @@ impl DBus {
) -> zbus::Result<()>;

/// This signal is sent to a specific application when it loses ownership of a name.
#[dbus_interface(signal)]
#[zbus(signal)]
pub async fn name_lost(signal_ctxt: &SignalContext<'_>, name: BusName<'_>) -> zbus::Result<()>;

/// This signal is sent to a specific application when it gains ownership of a name.
#[dbus_interface(signal)]
#[zbus(signal)]
pub async fn name_acquired(
signal_ctxt: &SignalContext<'_>,
name: BusName<'_>,
Expand Down
Loading

0 comments on commit ac4c62d

Please sign in to comment.