Skip to content

Commit

Permalink
Merge pull request dbus2#1046 from zeenix/drop-deps
Browse files Browse the repository at this point in the history
`blocking` & `Guid::generate` now feature-gated
  • Loading branch information
zeenix authored Oct 5, 2024
2 parents de03bcd + 9d2d6ec commit bc32cce
Show file tree
Hide file tree
Showing 32 changed files with 179 additions and 119 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions book/src/blocking.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ in panics and hangs. This is not a limitation of zbus but rather a
[well-known general problem][wkgp] in the Rust async/await world. The [`blocking` crate],
[`async-std`][assb] and [`tokio`][tsb] crates provide a easy way around this problem.

**Note:** Since zbus 5.0, blocking API can be disabled through the `blocking-api` cargo feature. If
you use this API, make sure you are not unintentionally disabling it by disabling the default
features in your `Cargo.toml`.

## Establishing a connection

The only difference to that of [asynchronous `Connection` API] is that you use
Expand Down
7 changes: 4 additions & 3 deletions zbus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ camino = ["zvariant/camino"]
# Enables API that is only needed for bus implementations (enables `p2p`).
bus-impl = ["p2p"]
# Enables API that is only needed for peer-to-peer (p2p) connections.
p2p = []
p2p = ["dep:rand"]
async-io = [
"dep:async-io",
"async-executor",
Expand All @@ -38,6 +38,8 @@ async-io = [
tokio = ["dep:tokio"]
vsock = ["dep:vsock", "dep:async-io"]
tokio-vsock = ["dep:tokio-vsock", "tokio"]
# Enable blocking API (default).
blocking-api = ["zbus_macros/blocking-api"]

[dependencies]
zbus_macros = { path = "../zbus_macros", version = "=4.4.0" }
Expand All @@ -49,15 +51,14 @@ serde = { version = "1.0.200", features = ["derive"] }
serde_repr = "0.1.19"
enumflags2 = { version = "0.7.9", features = ["serde"] }
futures-core = "0.3.30"
futures-sink = "0.3.30"
futures-util = { version = "0.3.30", default-features = false, features = [
"sink",
"std",
] }
async-broadcast = "0.7.0"
hex = "0.4.3"
ordered-stream = "0.2"
rand = "0.8.5"
rand = { version = "0.8.5", optional = true }
event-listener = "5.3.0"
static_assertions = "1.1.0"
async-trait = "0.1.80"
Expand Down
3 changes: 2 additions & 1 deletion zbus/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ async fn main() -> Result<()> {
## Blocking API

While zbus is primarily asynchronous (since 2.0), [blocking wrappers][bw] are provided for
convenience.
convenience. Since zbus 5.0, blocking API can be disabled by disabling the `blocking-api` cargo
feature.

## Compatibility with async runtimes

Expand Down
11 changes: 10 additions & 1 deletion zbus/src/blocking/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,16 @@ impl Connection {
///
/// The `ObjectServer` is created on-demand.
pub fn object_server(&self) -> impl Deref<Target = ObjectServer> + '_ {
self.inner.sync_object_server(true, None)
struct Wrapper(ObjectServer);
impl Deref for Wrapper {
type Target = ObjectServer;

fn deref(&self) -> &Self::Target {
&self.0
}
}

Wrapper(ObjectServer::new(&self.inner))
}

/// Get a reference to the underlying async Connection.
Expand Down
6 changes: 4 additions & 2 deletions zbus/src/blocking/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
//!
//! This module hosts all our blocking API. All the types under this module are thin wrappers
//! around the corresponding asynchronous types. Most of the method calls are simply calling their
//! asynchronous counterparts on the underlying types and use [`async_io::block_on`] to turn them
//! into blocking calls.
//! asynchronous counterparts on the underlying types and use [`async_io::block_on`] (or
//! [`tokio::runtime::Runtime::block_on`]) to turn them into blocking calls.
//!
//! This module is only available when the `blocking-api` feature is enabled (default).
//!
//! # Caveats
//!
Expand Down
4 changes: 2 additions & 2 deletions zbus/src/blocking/object_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ where
/// quit_listener.wait();
/// # Ok::<_, Box<dyn Error + Send + Sync>>(())
/// ```
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct ObjectServer {
azync: crate::ObjectServer,
}
Expand All @@ -138,7 +138,7 @@ impl ObjectServer {
/// Create a new D-Bus `ObjectServer`.
pub(crate) fn new(conn: &crate::Connection) -> Self {
Self {
azync: crate::ObjectServer::new(conn),
azync: conn.object_server().clone(),
}
}

Expand Down
3 changes: 1 addition & 2 deletions zbus/src/connection/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,11 +423,10 @@ impl<'a> Builder<'a> {
conn.set_max_queued(self.max_queued.unwrap_or(DEFAULT_MAX_QUEUED));

if !self.interfaces.is_empty() {
let object_server = conn.sync_object_server(false, None);
let object_server = conn.ensure_object_server(false);
for (path, interfaces) in self.interfaces {
for (name, iface) in interfaces {
let added = object_server
.inner()
.add_arc_interface(path.clone(), name.clone(), iface.clone())
.await?;
if !added {
Expand Down
38 changes: 9 additions & 29 deletions zbus/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use std::{
collections::HashMap,
io::{self, ErrorKind},
num::NonZeroU32,
ops::Deref,
pin::Pin,
sync::{Arc, OnceLock, Weak},
task::{Context, Poll},
Expand All @@ -22,7 +21,6 @@ use futures_util::StreamExt;

use crate::{
async_lock::{Mutex, Semaphore, SemaphorePermit},
blocking,
fdo::{self, ConnectionCredentials, RequestNameFlags, RequestNameReply},
is_flatpak,
message::{Flags, Message, Type},
Expand Down Expand Up @@ -76,7 +74,7 @@ pub(crate) struct ConnectionInner {

subscriptions: Mutex<Subscriptions>,

object_server: OnceLock<blocking::ObjectServer>,
object_server: OnceLock<ObjectServer>,
object_server_dispatch_task: OnceLock<Task<()>>,

drop_event: Event,
Expand Down Expand Up @@ -913,41 +911,22 @@ impl Connection {
/// **Note**: Once the `ObjectServer` is created, it will be replying to all method calls
/// received on `self`. If you want to manually reply to method calls, do not use this
/// method (or any of the `ObjectServer` related API).
pub fn object_server(&self) -> impl Deref<Target = ObjectServer> + '_ {
// FIXME: Maybe it makes sense after all to implement Deref<Target= ObjectServer> for
// crate::ObjectServer instead of this wrapper?
struct Wrapper<'a>(&'a blocking::ObjectServer);
impl<'a> Deref for Wrapper<'a> {
type Target = ObjectServer;

fn deref(&self) -> &Self::Target {
self.0.inner()
}
}

Wrapper(self.sync_object_server(true, None))
pub fn object_server(&self) -> &ObjectServer {
self.ensure_object_server(true)
}

pub(crate) fn sync_object_server(
&self,
start: bool,
started_event: Option<Event>,
) -> &blocking::ObjectServer {
pub(crate) fn ensure_object_server(&self, start: bool) -> &ObjectServer {
self.inner
.object_server
.get_or_init(move || self.setup_object_server(start, started_event))
.get_or_init(move || self.setup_object_server(start, None))
}

fn setup_object_server(
&self,
start: bool,
started_event: Option<Event>,
) -> blocking::ObjectServer {
fn setup_object_server(&self, start: bool, started_event: Option<Event>) -> ObjectServer {
if start {
self.start_object_server(started_event);
}

blocking::ObjectServer::new(self)
ObjectServer::new(self)
}

#[instrument(skip(self))]
Expand Down Expand Up @@ -1325,14 +1304,15 @@ impl Connection {
}
}

#[cfg(feature = "blocking-api")]
impl From<crate::blocking::Connection> for Connection {
fn from(conn: crate::blocking::Connection) -> Self {
conn.into_inner()
}
}

// Internal API that allows keeping a weak connection ref around.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub(crate) struct WeakConnection {
inner: Weak<ConnectionInner>,
}
Expand Down
1 change: 1 addition & 0 deletions zbus/src/fdo/dbus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,4 +362,5 @@ pub trait DBus {
}

assert_impl_all!(DBusProxy<'_>: Send, Sync, Unpin);
#[cfg(feature = "blocking-api")]
assert_impl_all!(DBusProxyBlocking<'_>: Send, Sync, Unpin);
1 change: 1 addition & 0 deletions zbus/src/fdo/introspectable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,5 @@ impl Introspectable {
}

assert_impl_all!(IntrospectableProxy<'_>: Send, Sync, Unpin);
#[cfg(feature = "blocking-api")]
assert_impl_all!(IntrospectableProxyBlocking<'_>: Send, Sync, Unpin);
15 changes: 10 additions & 5 deletions zbus/src/fdo/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,14 +135,17 @@ mod tests {
#[test]
#[timeout(15000)]
fn no_object_manager_signals_before_hello() {
use zbus::blocking;
crate::block_on(no_object_manager_signals_before_hello_async());
}

async fn no_object_manager_signals_before_hello_async() {
// We were emitting `InterfacesAdded` signals before `Hello` was called, which is wrong and
// results in us getting disconnected by the bus. This test case ensures we don't do that
// and also that the signals are eventually emitted.

// Let's first create an interator to get the signals (it has to be another connection).
let conn = blocking::Connection::session().unwrap();
let mut iterator = blocking::MessageIterator::for_match_rule(
let conn = zbus::Connection::session().await.unwrap();
let mut stream = zbus::MessageStream::for_match_rule(
zbus::MatchRule::builder()
.msg_type(zbus::message::Type::Signal)
.interface("org.freedesktop.DBus.ObjectManager")
Expand All @@ -153,6 +156,7 @@ mod tests {
&conn,
None,
)
.await
.unwrap();

// Now create the service side.
Expand All @@ -164,7 +168,7 @@ mod tests {
"test".into()
}
}
let _conn = blocking::connection::Builder::session()
let _conn = zbus::conn::Builder::session()
.unwrap()
.name("org.zbus.NoObjectManagerSignalsBeforeHello")
.unwrap()
Expand All @@ -176,10 +180,11 @@ mod tests {
)
.unwrap()
.build()
.await
.unwrap();

// Let's see if the `InterfacesAdded` signal was emitted.
let msg = iterator.next().unwrap().unwrap();
let msg = stream.next().await.unwrap().unwrap();
let signal = super::InterfacesAdded::from_message(msg).unwrap();
assert_eq!(
signal.args().unwrap().interfaces_and_properties,
Expand Down
1 change: 1 addition & 0 deletions zbus/src/fdo/monitoring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,5 @@ pub trait Monitoring {
}

assert_impl_all!(MonitoringProxy<'_>: Send, Sync, Unpin);
#[cfg(feature = "blocking-api")]
assert_impl_all!(MonitoringProxyBlocking<'_>: Send, Sync, Unpin);
1 change: 1 addition & 0 deletions zbus/src/fdo/object_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,5 @@ impl ObjectManager {
}

assert_impl_all!(ObjectManagerProxy<'_>: Send, Sync, Unpin);
#[cfg(feature = "blocking-api")]
assert_impl_all!(ObjectManagerProxyBlocking<'_>: Send, Sync, Unpin);
1 change: 1 addition & 0 deletions zbus/src/fdo/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,5 @@ impl Peer {
}

assert_impl_all!(PeerProxy<'_>: Send, Sync, Unpin);
#[cfg(feature = "blocking-api")]
assert_impl_all!(PeerProxyBlocking<'_>: Send, Sync, Unpin);
1 change: 1 addition & 0 deletions zbus/src/fdo/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,4 +125,5 @@ impl Properties {
}

assert_impl_all!(PropertiesProxy<'_>: Send, Sync, Unpin);
#[cfg(feature = "blocking-api")]
assert_impl_all!(PropertiesProxyBlocking<'_>: Send, Sync, Unpin);
1 change: 1 addition & 0 deletions zbus/src/fdo/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,5 @@ pub trait Stats {
}

assert_impl_all!(StatsProxy<'_>: Send, Sync, Unpin);
#[cfg(feature = "blocking-api")]
assert_impl_all!(StatsProxyBlocking<'_>: Send, Sync, Unpin);
12 changes: 8 additions & 4 deletions zbus/src/guid.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
use std::{
borrow::{Borrow, Cow},
fmt::{self, Debug, Display, Formatter},
iter::repeat_with,
ops::Deref,
str::FromStr,
time::{SystemTime, UNIX_EPOCH},
};

use serde::{de, Deserialize, Serialize};
Expand All @@ -27,9 +25,14 @@ assert_impl_all!(Guid<'_>: Send, Sync, Unpin);
impl Guid<'_> {
/// Generate a D-Bus GUID that can be used with e.g.
/// [`connection::Builder::server`](crate::connection::Builder::server).
///
/// This method is only available when the `p2p` feature is enabled (disabled by default).
#[cfg(feature = "p2p")]
pub fn generate() -> Guid<'static> {
let r: Vec<u32> = repeat_with(rand::random::<u32>).take(3).collect();
let r3 = match SystemTime::now().duration_since(UNIX_EPOCH) {
let r: Vec<u32> = std::iter::repeat_with(rand::random::<u32>)
.take(3)
.collect();
let r3 = match std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH) {
Ok(n) => n.as_secs() as u32,
Err(_) => rand::random::<u32>(),
};
Expand Down Expand Up @@ -245,6 +248,7 @@ impl Display for OwnedGuid {
}

#[cfg(test)]
#[cfg(feature = "p2p")]
mod tests {
use crate::Guid;
use test_log::test;
Expand Down
1 change: 1 addition & 0 deletions zbus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ pub use utils::*;
#[macro_use]
pub mod fdo;

#[cfg(feature = "blocking-api")]
pub mod blocking;

pub use zbus_macros::{interface, proxy, DBusError};
Expand Down
9 changes: 6 additions & 3 deletions zbus/src/object_server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,10 @@ pub(crate) use node::Node;
/// # })?;
/// # Ok::<_, Box<dyn Error + Send + Sync>>(())
/// ```
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct ObjectServer {
conn: WeakConnection,
root: RwLock<Node>,
root: Arc<RwLock<Node>>,
}

assert_impl_all!(ObjectServer: Send, Sync, Unpin);
Expand All @@ -101,7 +101,9 @@ impl ObjectServer {
pub(crate) fn new(conn: &Connection) -> Self {
Self {
conn: conn.into(),
root: RwLock::new(Node::new("/".try_into().expect("zvariant bug"))),
root: Arc::new(RwLock::new(Node::new(
"/".try_into().expect("zvariant bug"),
))),
}
}

Expand Down Expand Up @@ -445,6 +447,7 @@ impl ObjectServer {
}
}

#[cfg(feature = "blocking-api")]
impl From<crate::blocking::ObjectServer> for ObjectServer {
fn from(server: crate::blocking::ObjectServer) -> Self {
server.into_inner()
Expand Down
1 change: 1 addition & 0 deletions zbus/src/proxy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1334,6 +1334,7 @@ impl AsyncDrop for SignalStream<'_> {
}
}

#[cfg(feature = "blocking-api")]
impl<'a> From<crate::blocking::Proxy<'a>> for Proxy<'a> {
fn from(proxy: crate::blocking::Proxy<'a>) -> Self {
proxy.into_inner()
Expand Down
Loading

0 comments on commit bc32cce

Please sign in to comment.