Skip to content

Commit

Permalink
WIP: Update to zbus git
Browse files Browse the repository at this point in the history
There is an issue with non-Send future
  • Loading branch information
elmarco committed Oct 2, 2024
1 parent 6ae240f commit 0be27c0
Show file tree
Hide file tree
Showing 14 changed files with 157 additions and 164 deletions.
38 changes: 12 additions & 26 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ name = "busd"
path = "src/bin/busd.rs"

[dependencies]
#zbus = { git = "https://github.com/dbus2/zbus/", features = [
zbus = { version = "4.2.0", features = [
zbus = { git = "https://github.com/dbus2/zbus/", features = [
#zbus = { version = "4.2.0", features = [
"tokio",
"bus-impl",
], default-features = false }
Expand Down
27 changes: 25 additions & 2 deletions src/bin/busd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ async fn main() -> Result<()> {
busd::tracing_subscriber::init();

let args = Args::parse();
let address = args.address.unwrap_or_else(default_address);

let mut bus =
bus::Bus::for_address(args.address.as_deref(), args.auth_mechanism.into()).await?;
let mut bus = bus::Bus::for_address(&address, args.auth_mechanism.into()).await?;

#[cfg(unix)]
if let Some(fd) = args.ready_fd {
Expand Down Expand Up @@ -111,3 +111,26 @@ async fn main() -> Result<()> {

Ok(())
}

#[cfg(unix)]
fn default_address() -> String {
use std::{env, path::Path};

let runtime_dir = env::var("XDG_RUNTIME_DIR")
.as_ref()
.map(|s| Path::new(s).to_path_buf())
.ok()
.unwrap_or_else(|| {
Path::new("/run")
.join("user")
.join(format!("{}", nix::unistd::Uid::current()))
});
let path = runtime_dir.join("busd-session");

format!("unix:path={}", path.display())
}

#[cfg(not(unix))]
fn default_address() -> String {
"tcp:host=127.0.0.1,port=4242".to_string()
}
130 changes: 56 additions & 74 deletions src/bus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,17 @@ use futures_util::{
future::{select, Either},
pin_mut,
};
#[cfg(unix)]
use std::{env, path::Path};
use std::{str::FromStr, sync::Arc};
use std::sync::Arc;
#[cfg(unix)]
use tokio::fs::remove_file;
use tokio::{spawn, task::JoinHandle};
use tokio::task::{spawn, JoinHandle};
use tracing::{debug, info, trace, warn};
#[cfg(unix)]
use zbus::address::transport::{Unix, UnixSocket};
use zbus::address::transport::Unix;
use zbus::{
address::{transport::Tcp, Transport},
connection::socket::BoxedSplit,
Address, AuthMechanism, Connection, ConnectionBuilder, Guid, OwnedGuid,
address::transport::{Tcp, Transport},
connection::{self, socket::BoxedSplit},
Address, AuthMechanism, Connection, Guid, OwnedGuid,
};

use crate::{
Expand All @@ -36,7 +34,7 @@ pub struct Bus {
// All (cheaply) cloneable fields of `Bus` go here.
#[derive(Clone, Debug)]
pub struct Inner {
address: Address,
address: String,
peers: Arc<Peers>,
guid: OwnedGuid,
next_id: usize,
Expand All @@ -52,21 +50,18 @@ enum Listener {
}

impl Bus {
pub async fn for_address(address: Option<&str>, auth_mechanism: AuthMechanism) -> Result<Self> {
let mut address = match address {
Some(address) => Address::from_str(address)?,
None => Address::from_str(&default_address())?,
};
let guid: OwnedGuid = match address.guid() {
Some(guid) => guid.to_owned().into(),
pub async fn for_address(address: &str, auth_mechanism: AuthMechanism) -> Result<Self> {
let address = Address::try_from(address)?;
let (guid, addr) = match address.guid() {
Some(guid) => (Guid::try_from(guid)?.to_owned().into(), address.to_string()),
None => {
let guid = Guid::generate();
address = address.set_guid(guid.clone())?;

guid.into()
let guid: OwnedGuid = Guid::generate().into();
let addr = format!("{address},guid={guid}");
(guid, addr)
}
};
let listener = match address.transport() {

let listener = match &address.transport()? {
#[cfg(unix)]
Transport::Unix(unix) => Self::unix_stream(unix).await,
Transport::Tcp(tcp) => Self::tcp_stream(tcp).await,
Expand All @@ -83,15 +78,15 @@ impl Bus {
// Create a peer for ourselves.
trace!("Creating self-dial connection.");
let (client_socket, peer_socket) = zbus::connection::socket::Channel::pair();
let service_conn = ConnectionBuilder::authenticated_socket(client_socket, guid.clone())?
let service_conn = connection::Builder::authenticated_socket(client_socket, guid.clone())?
.p2p()
.unique_name(fdo::BUS_NAME)?
.name(fdo::BUS_NAME)?
.serve_at(fdo::DBus::PATH, dbus)?
.serve_at(fdo::Monitoring::PATH, monitoring)?
.build()
.await?;
let peer_conn = ConnectionBuilder::authenticated_socket(peer_socket, guid.clone())?
let peer_conn = connection::Builder::authenticated_socket(peer_socket, guid.clone())?
.p2p()
.build()
.await?;
Expand All @@ -112,7 +107,7 @@ impl Bus {
listener,
cookie_task,
inner: Inner {
address,
address: addr,
peers,
guid,
next_id: 0,
Expand All @@ -122,7 +117,7 @@ impl Bus {
})
}

pub fn address(&self) -> &Address {
pub fn address(&self) -> &str {
&self.inner.address
}

Expand All @@ -134,38 +129,40 @@ impl Bus {

// AsyncDrop would have been nice!
pub async fn cleanup(self) -> Result<()> {
match self.inner.address.transport() {
#[cfg(unix)]
Transport::Unix(unix) => match unix.path() {
UnixSocket::File(path) => remove_file(path).await.map_err(Into::into),
_ => Ok(()),
},
_ => Ok(()),
#[cfg(unix)]
if let Listener::Unix(unix_listener) = self.listener {
let addr = unix_listener.local_addr()?;
if let Some(path) = addr.as_pathname() {
remove_file(path).await?;
}
}

Ok(())
}

#[cfg(unix)]
async fn unix_stream(unix: &Unix) -> Result<Listener> {
async fn unix_stream(unix: &Unix<'_>) -> Result<Listener> {
// TODO: Use tokio::net::UnixListener directly once it supports abstract sockets:
//
// https://github.com/tokio-rs/tokio/issues/4610

use std::os::unix::net::SocketAddr;
use zbus::address::transport::UnixAddrKind;

let addr = match unix.path() {
let addr = match unix.kind() {
#[cfg(target_os = "linux")]
UnixSocket::Abstract(name) => {
UnixAddrKind::Abstract(name) => {
use std::os::linux::net::SocketAddrExt;

let addr = SocketAddr::from_abstract_name(name.as_encoded_bytes())?;
let addr = SocketAddr::from_abstract_name(name)?;
info!(
"Listening on abstract UNIX socket `{}`.",
name.to_string_lossy()
String::from_utf8_lossy(name)
);

addr
}
UnixSocket::File(path) => {
UnixAddrKind::Path(path) => {
let addr = SocketAddr::from_pathname(path)?;
info!(
"Listening on UNIX socket file `{}`.",
Expand All @@ -174,8 +171,6 @@ impl Bus {

addr
}
UnixSocket::Dir(_) => bail!("`dir` transport is not supported (yet)."),
UnixSocket::TmpDir(_) => bail!("`tmpdir` transport is not supported (yet)."),
_ => bail!("Unsupported address."),
};
let std_listener =
Expand All @@ -187,12 +182,15 @@ impl Bus {
.map_err(Into::into)
}

async fn tcp_stream(tcp: &Tcp) -> Result<Listener> {
if tcp.nonce_file().is_some() {
bail!("`nonce-tcp` transport is not supported (yet).");
}
info!("Listening on `{}:{}`.", tcp.host(), tcp.port());
let address = (tcp.host(), tcp.port());
async fn tcp_stream(tcp: &Tcp<'_>) -> Result<Listener> {
let Some(host) = tcp.host() else {
bail!("No host= provided.");
};
let Some(port) = tcp.port() else {
bail!("No port= provided.");
};
info!("Listening on `{}:{}`.", host, port);
let address = (host, port);

tokio::net::TcpListener::bind(address)
.await
Expand Down Expand Up @@ -234,14 +232,19 @@ impl Bus {
}

async fn accept(&mut self) -> Result<BoxedSplit> {
let stream = match &mut self.listener {
match &mut self.listener {
#[cfg(unix)]
Listener::Unix(listener) => listener.accept().await.map(|(stream, _)| stream.into())?,
Listener::Tcp(listener) => listener.accept().await.map(|(stream, _)| stream.into())?,
};
debug!("Accepted connection on address `{}`", self.inner.address);

Ok(stream)
Listener::Unix(listener) => {
let (stream, addr) = listener.accept().await?;
debug!("Accepted Unix connection from address `{:?}`", addr);
Ok(stream.into())
}
Listener::Tcp(listener) => {
let (stream, addr) = listener.accept().await?;
debug!("Accepted TCP connection from address `{}`", addr);
Ok(stream.into())
}
}
}

pub fn peers(&self) -> &Arc<Peers> {
Expand All @@ -262,24 +265,3 @@ impl Bus {
self.inner.next_id
}
}

#[cfg(unix)]
fn default_address() -> String {
let runtime_dir = env::var("XDG_RUNTIME_DIR")
.as_ref()
.map(|s| Path::new(s).to_path_buf())
.ok()
.unwrap_or_else(|| {
Path::new("/run")
.join("user")
.join(format!("{}", nix::unistd::Uid::current()))
});
let path = runtime_dir.join("busd-session");

format!("unix:path={}", path.display())
}

#[cfg(not(unix))]
fn default_address() -> String {
"tcp:host=127.0.0.1,port=4242".to_string()
}
Loading

0 comments on commit 0be27c0

Please sign in to comment.