diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9f9a9375..9d3407f3 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -33,10 +33,10 @@ jobs: run: cargo fmt --check -- --config "unstable_features=true,imports_granularity=Crate,group_imports=StdExternalCrate" - name: Clippy no-default-features - run: cargo clippy --no-default-features --all-targets -- --deny warnings + run: cargo +stable clippy --no-default-features --all-targets -- --deny warnings - name: Clippy - run: cargo clippy --all-features --all-targets -- --deny warnings + run: cargo +stable clippy --all-features --all-targets -- --deny warnings build: needs: check diff --git a/src/lib.rs b/src/lib.rs index 5ee90d56..891afb50 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -60,8 +60,8 @@ pub(crate) mod zenoh { pubsub::{Publisher, Reliability, Subscriber}, qos::{CongestionControl, Priority}, query::{ - ConsolidationMode, Parameters, Query, QueryTarget, Queryable, Reply, ReplyError, - Selector, + ConsolidationMode, Parameters, Query, QueryConsolidation, QueryTarget, Queryable, + Reply, ReplyError, Selector, }, sample::{Sample, SampleKind}, scouting::{scout, Hello, Scout}, diff --git a/src/query.rs b/src/query.rs index a0135987..8a81ad95 100644 --- a/src/query.rs +++ b/src/query.rs @@ -25,7 +25,7 @@ use crate::{ key_expr::KeyExpr, macros::{build, downcast_or_new, enum_mapper, option_wrapper, wrapper}, qos::{CongestionControl, Priority}, - utils::{generic, wait, IntoPyResult, IntoPython, MapInto}, + utils::{generic, wait, IntoPyResult, IntoPython, IntoRust, MapInto}, }; enum_mapper!(zenoh::query::QueryTarget: u8 { @@ -53,6 +53,29 @@ impl ConsolidationMode { const DEFAULT: Self = Self::Auto; } +wrapper!(zenoh::query::QueryConsolidation: Clone); +downcast_or_new!(QueryConsolidation => ConsolidationMode); + +#[pymethods] +impl QueryConsolidation { + #[classattr] + const AUTO: Self = Self(zenoh::query::QueryConsolidation::AUTO); + #[classattr] + const DEFAULT: Self = Self(zenoh::query::QueryConsolidation::DEFAULT); + + #[new] + fn new(mode: Option) -> PyResult { + let Some(mode) = mode else { + return Ok(Self::DEFAULT); + }; + Ok(Self(mode.into_rust().into())) + } + + fn mode(&self) -> ConsolidationMode { + self.0.mode().into() + } +} + wrapper!(zenoh::query::Query: Clone); #[pymethods] @@ -308,7 +331,10 @@ downcast_or_new!(Parameters); #[pymethods] impl Parameters { #[new] - pub(crate) fn new(obj: &Bound) -> PyResult { + pub(crate) fn new(obj: Option<&Bound>) -> PyResult { + let Some(obj) = obj else { + return Ok(Self(zenoh::query::Parameters::empty())); + }; if let Ok(map) = >::extract_bound(obj) { return Ok(Self(map.into())); } @@ -319,24 +345,20 @@ impl Parameters { self.0.is_empty() } - fn contains_key(&self, key: String) -> bool { - self.0.contains_key(key) - } - #[pyo3(signature = (key, default = None))] - fn get(&self, key: String, default: Option) -> Option { + fn get(&self, key: &str, default: Option) -> Option { self.0.get(key).map_into().or(default) } - fn values(&self, key: String) -> Vec<&str> { + fn values(&self, key: &str) -> Vec<&str> { self.0.values(key).collect() } - fn insert(&mut self, key: String, value: String) -> Option { + fn insert(&mut self, key: &str, value: &str) -> Option { self.0.insert(key, value) } - fn remove(&mut self, key: String) -> Option { + fn remove(&mut self, key: &str) -> Option { self.0.remove(key) } @@ -352,11 +374,11 @@ impl Parameters { !self.0.is_empty() } - fn __contains__(&self, key: String) -> bool { - self.contains_key(key) + fn __contains__(&self, key: &str) -> bool { + self.0.contains_key(key) } - fn __getitem__(&self, key: String) -> Option { + fn __getitem__(&self, key: &str) -> Option { self.get(key, None) } diff --git a/src/session.rs b/src/session.rs index fd1dea43..d85459cc 100644 --- a/src/session.rs +++ b/src/session.rs @@ -28,7 +28,8 @@ use crate::{ macros::{build, with, wrapper}, pubsub::{Publisher, Reliability, Subscriber}, qos::{CongestionControl, Priority}, - query::{ConsolidationMode, QueryTarget, Queryable, Reply, Selector}, + query::{QueryConsolidation, QueryTarget, Queryable, Reply, Selector}, + time::Timestamp, utils::{wait, IntoPython, MapInto}, }; @@ -56,12 +57,14 @@ impl Session { Ok(self.0.zid().into()) } - // TODO HLC - fn close(&self, py: Python) -> PyResult<()> { wait(py, self.0.close()) } + fn is_closed(&self) -> bool { + self.0.is_closed() + } + fn undeclare(&self, obj: &Bound) -> PyResult<()> { if let Ok(key_expr) = KeyExpr::from_py(obj) { return wait(obj.py(), self.0.undeclare(key_expr.0)); @@ -70,6 +73,10 @@ impl Session { Ok(()) } + fn new_timestamp(&self) -> Timestamp { + self.0.new_timestamp().into() + } + fn declare_keyexpr( &self, py: Python, @@ -130,7 +137,9 @@ impl Session { #[pyo3(from_py_with = "Selector::from_py")] selector: Selector, handler: Option<&Bound>, target: Option, - consolidation: Option, + #[pyo3(from_py_with = "QueryConsolidation::from_py_opt")] consolidation: Option< + QueryConsolidation, + >, #[pyo3(from_py_with = "timeout")] timeout: Option, congestion_control: Option, priority: Option, diff --git a/src/time.rs b/src/time.rs index b3d116b5..fa7958e1 100644 --- a/src/time.rs +++ b/src/time.rs @@ -11,13 +11,48 @@ // Contributors: // ZettaScale Zenoh Team, // -use std::time::SystemTime; +use std::{ + hash::{Hash, Hasher}, + time::{Duration, SystemTime}, +}; -use pyo3::prelude::*; +use pyo3::{prelude::*, types::PyType}; -use crate::macros::wrapper; +use crate::{macros::wrapper, utils::IntoPyResult}; -wrapper!(zenoh::time::Timestamp: Clone, PartialEq, PartialOrd); +wrapper!(zenoh::time::TimestampId: Copy, Clone, PartialEq, PartialOrd); + +#[pymethods] +impl TimestampId { + fn __richcmp__(&self, other: &Self, op: pyo3::pyclass::CompareOp) -> bool { + match op { + pyo3::pyclass::CompareOp::Lt => self < other, + pyo3::pyclass::CompareOp::Le => self <= other, + pyo3::pyclass::CompareOp::Eq => self == other, + pyo3::pyclass::CompareOp::Ne => self != other, + pyo3::pyclass::CompareOp::Gt => self > other, + pyo3::pyclass::CompareOp::Ge => self >= other, + } + } + + fn __bytes__(&self) -> [u8; zenoh::time::TimestampId::MAX_SIZE] { + self.0.to_le_bytes() + } + + fn __hash__(&self, py: Python) -> PyResult { + self.__bytes__().into_py(py).bind(py).hash() + } + + fn __repr__(&self) -> String { + format!("{:?}", self.0) + } + + fn __str__(&self) -> String { + format!("{}", self.0) + } +} + +wrapper!(zenoh::time::Timestamp: Clone, PartialEq, PartialOrd, Hash); #[pymethods] impl Timestamp { @@ -25,6 +60,27 @@ impl Timestamp { self.0.get_time().to_system_time() } + fn get_id(&self) -> TimestampId { + (*self.0.get_id()).into() + } + + fn get_diff_duration(&self, other: Timestamp) -> Duration { + self.0.get_diff_duration(&other.0) + } + + fn to_string_rfc3339_lossy(&self) -> String { + self.0.to_string_rfc3339_lossy() + } + + #[classmethod] + fn parse_rfc3339(_cls: &Bound, s: &str) -> PyResult { + Ok(Self( + zenoh::time::Timestamp::parse_rfc3339(s) + .map_err(|err| err.cause) + .into_pyres()?, + )) + } + fn __richcmp__(&self, other: &Self, op: pyo3::pyclass::CompareOp) -> bool { match op { pyo3::pyclass::CompareOp::Lt => self < other, @@ -36,6 +92,12 @@ impl Timestamp { } } + fn __hash__(&self) -> u64 { + let mut hasher = std::collections::hash_map::DefaultHasher::new(); + self.0.hash(&mut hasher); + hasher.finish() + } + fn __repr__(&self) -> String { format!("{:?}", self.0) } diff --git a/src/utils.rs b/src/utils.rs index fd9c2505..20e04f29 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -93,34 +93,6 @@ pub(crate) fn generic(cls: &Bound, args: &Bound) -> PyObject { .unbind() } -pub(crate) struct TryProcessIter<'a, I, E> { - iter: I, - error: &'a mut Option, -} - -impl>, T, E> Iterator for TryProcessIter<'_, I, E> { - type Item = T; - - fn next(&mut self) -> Option { - match self.iter.next() { - Some(Ok(x)) => Some(x), - Some(Err(err)) => { - *self.error = Some(err); - None - } - None => None, - } - } - - fn size_hint(&self) -> (usize, Option) { - if self.error.is_some() { - (0, Some(0)) - } else { - self.iter.size_hint() - } - } -} - pub(crate) fn short_type_name() -> &'static str { let name = std::any::type_name::(); name.rsplit_once("::").map_or(name, |(_, name)| name) @@ -128,7 +100,7 @@ pub(crate) fn short_type_name() -> &'static str { pub(crate) fn wait( py: Python, - resolve: impl zenoh::Resolve> + Send, + resolve: impl zenoh::Resolve>, ) -> PyResult { py.allow_threads(|| resolve.wait()).into_pyres() } diff --git a/zenoh/__init__.pyi b/zenoh/__init__.pyi index 0d11b7e0..32d411e8 100644 --- a/zenoh/__init__.pyi +++ b/zenoh/__init__.pyi @@ -438,13 +438,10 @@ _IntoKeyExpr = KeyExpr | str @final class Parameters: - def __new__(cls, parameters: dict[str, str] | str): ... + def __new__(cls, parameters: dict[str, str] | str | None = None): ... def is_empty(self) -> bool: """Returns true if properties does not contain anything.""" - def contains_key(self, key: str) -> bool: - """Returns true if properties contains the specified key.""" - def get(self, key: str, default: str | None = None) -> str | None: """Returns the value corresponding to the key.""" @@ -528,6 +525,8 @@ class Query: @property def key_expr(self) -> KeyExpr: ... @property + def parameters(self) -> Parameters: ... + @property def payload(self) -> ZBytes | None: ... @property def encoding(self) -> Encoding | None: ... @@ -590,6 +589,16 @@ class Queryable(Generic[_H]): @overload def __iter__(self) -> Never: ... +@final +class QueryConsolidation: + AUTO: Self + DEFAULT: Self + def __new__(cls, mode: ConsolidationMode, /) -> Self: ... + @property + def mode(self) -> ConsolidationMode: ... + +_IntoQueryConsolidation = ConsolidationMode + @final class QueryTarget(Enum): """The kind of consolidation used.""" @@ -744,7 +753,16 @@ class Session: Sessions are automatically closed when dropped, but you may want to use this function to handle errors or close the Session asynchronously. """ + def is_closed(self) -> bool: + """Check if the session has been closed.""" + def undeclare(self, obj: KeyExpr): ... + def new_timestamp(self) -> Timestamp: + """Get a new Timestamp from a Zenoh session. + + The returned timestamp has the current time, with the session's runtime ZenohId. + """ + def declare_keyexpr(self, key_expr: _IntoKeyExpr): """Informs Zenoh that you intend to use the provided key_expr multiple times and that it should optimize its transmission. The returned KeyExpr's internal structure may differ from what you would have obtained through a simple key_expr.try_into(), to save time on detecting the optimizations that have been associated with it. @@ -781,7 +799,7 @@ class Session: handler: _RustHandler[Reply] | None = None, *, target: QueryTarget | None = None, - consolidation: ConsolidationMode | None = None, + consolidation: _IntoQueryConsolidation | None = None, timeout: float | int | None = None, congestion_control: CongestionControl | None = None, priority: Priority | None = None, @@ -801,7 +819,7 @@ class Session: handler: _PythonHandler[Reply, _H], *, target: QueryTarget | None = None, - consolidation: ConsolidationMode | None = None, + consolidation: _IntoQueryConsolidation | None = None, timeout: float | int | None = None, congestion_control: CongestionControl | None = None, priority: Priority | None = None, @@ -821,7 +839,7 @@ class Session: handler: _PythonCallback[Reply], *, target: QueryTarget | None = None, - consolidation: ConsolidationMode | None = None, + consolidation: _IntoQueryConsolidation | None = None, timeout: float | int | None = None, congestion_control: CongestionControl | None = None, priority: Priority | None = None,