diff --git a/src/querying_subscriber.rs b/src/querying_subscriber.rs index 2b76bd359..3eb0ed86f 100644 --- a/src/querying_subscriber.rs +++ b/src/querying_subscriber.rs @@ -16,7 +16,7 @@ use std::mem::MaybeUninit; use zenoh::{ bytes::EncodingBuilderTrait, prelude::SessionDeclarations, pubsub::Reliability, - sample::SampleBuilderTrait, session::Session, Wait, + qos::QoSBuilderTrait, sample::SampleBuilderTrait, session::Session, Wait, }; use zenoh_ext::*; @@ -170,7 +170,7 @@ pub unsafe extern "C" fn ze_querying_subscriber_get( .0 .fetch({ move |cb| { - let mut get = session.get(selector).callback(cb); + let mut get = session.get(selector); if let Some(options) = options { if let Some(payload) = options.payload.take() { @@ -179,24 +179,33 @@ pub unsafe extern "C" fn ze_querying_subscriber_get( if let Some(encoding) = options.encoding.take() { get = get.encoding(encoding.take_rust_type()); } - #[cfg(feature = "unstable")] - if let Some(source_info) = options.source_info.take() { - get = get.source_info(source_info.take_rust_type()); - } if let Some(attachment) = options.attachment.take() { get = get.attachment(attachment.take_rust_type()); } get = get .consolidation(options.consolidation) - .target(options.target.into()); + .target(options.target.into()) + .congestion_control(options.congestion_control.into()) + .priority(options.priority.into()) + .express(options.is_express); + + #[cfg(feature = "unstable")] + { + if let Some(source_info) = options.source_info.take() { + get = get.source_info(source_info.take_rust_type()); + } + get = get + .allowed_destination(options.allowed_destination.into()) + .accept_replies(options.accept_replies.into()); + } if options.timeout_ms != 0 { get = get.timeout(std::time::Duration::from_millis(options.timeout_ms)); } } - get.wait() + get.callback(cb).wait() } }) .wait()