Skip to content

Commit

Permalink
chore: API alignment
Browse files Browse the repository at this point in the history
  • Loading branch information
wyfo committed Sep 18, 2024
1 parent 857f777 commit 8cfba9f
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 30 deletions.
4 changes: 2 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ pub(crate) mod zenoh {
pubsub::{Publisher, Reliability, Subscriber},
qos::{CongestionControl, Priority},
query::{
ConsolidationMode, Parameters, Query, QueryTarget, Queryable, Reply, ReplyError,
Selector,
ConsolidationMode, Parameters, Query, QueryConsolidation, QueryTarget, Queryable,
Reply, ReplyError, Selector,
},
sample::{Sample, SampleKind},
scouting::{scout, Hello, Scout},
Expand Down
48 changes: 35 additions & 13 deletions src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::{
key_expr::KeyExpr,
macros::{build, downcast_or_new, enum_mapper, option_wrapper, wrapper},
qos::{CongestionControl, Priority},
utils::{generic, wait, IntoPyResult, IntoPython, MapInto},
utils::{generic, wait, IntoPyResult, IntoPython, IntoRust, MapInto},
};

enum_mapper!(zenoh::query::QueryTarget: u8 {
Expand Down Expand Up @@ -53,6 +53,29 @@ impl ConsolidationMode {
const DEFAULT: Self = Self::Auto;
}

wrapper!(zenoh::query::QueryConsolidation: Clone);
downcast_or_new!(QueryConsolidation => ConsolidationMode);

#[pymethods]
impl QueryConsolidation {
#[classattr]
const AUTO: Self = Self(zenoh::query::QueryConsolidation::AUTO);
#[classattr]
const DEFAULT: Self = Self(zenoh::query::QueryConsolidation::DEFAULT);

#[new]
fn new(mode: Option<ConsolidationMode>) -> PyResult<Self> {
let Some(mode) = mode else {
return Ok(Self::DEFAULT);
};
Ok(Self(mode.into_rust().into()))
}

fn mode(&self) -> ConsolidationMode {
self.0.mode().into()
}
}

wrapper!(zenoh::query::Query: Clone);

#[pymethods]
Expand Down Expand Up @@ -308,7 +331,10 @@ downcast_or_new!(Parameters);
#[pymethods]
impl Parameters {
#[new]
pub(crate) fn new(obj: &Bound<PyAny>) -> PyResult<Self> {
pub(crate) fn new(obj: Option<&Bound<PyAny>>) -> PyResult<Self> {
let Some(obj) = obj else {
return Ok(Self(zenoh::query::Parameters::empty()));
};
if let Ok(map) = <HashMap<String, String>>::extract_bound(obj) {
return Ok(Self(map.into()));
}
Expand All @@ -319,24 +345,20 @@ impl Parameters {
self.0.is_empty()
}

fn contains_key(&self, key: String) -> bool {
self.0.contains_key(key)
}

#[pyo3(signature = (key, default = None))]
fn get(&self, key: String, default: Option<String>) -> Option<String> {
fn get(&self, key: &str, default: Option<String>) -> Option<String> {
self.0.get(key).map_into().or(default)
}

fn values(&self, key: String) -> Vec<&str> {
fn values(&self, key: &str) -> Vec<&str> {
self.0.values(key).collect()
}

fn insert(&mut self, key: String, value: String) -> Option<String> {
fn insert(&mut self, key: &str, value: &str) -> Option<String> {
self.0.insert(key, value)
}

fn remove(&mut self, key: String) -> Option<String> {
fn remove(&mut self, key: &str) -> Option<String> {
self.0.remove(key)
}

Expand All @@ -352,11 +374,11 @@ impl Parameters {
!self.0.is_empty()
}

fn __contains__(&self, key: String) -> bool {
self.contains_key(key)
fn __contains__(&self, key: &str) -> bool {
self.0.contains_key(key)
}

fn __getitem__(&self, key: String) -> Option<String> {
fn __getitem__(&self, key: &str) -> Option<String> {
self.get(key, None)
}

Expand Down
17 changes: 13 additions & 4 deletions src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ use crate::{
macros::{build, with, wrapper},
pubsub::{Publisher, Reliability, Subscriber},
qos::{CongestionControl, Priority},
query::{ConsolidationMode, QueryTarget, Queryable, Reply, Selector},
query::{QueryConsolidation, QueryTarget, Queryable, Reply, Selector},
time::Timestamp,
utils::{wait, IntoPython, MapInto},
};

Expand Down Expand Up @@ -56,12 +57,14 @@ impl Session {
Ok(self.0.zid().into())
}

// TODO HLC

fn close(&self, py: Python) -> PyResult<()> {
wait(py, self.0.close())
}

fn is_closed(&self) -> bool {
self.0.is_closed()
}

fn undeclare(&self, obj: &Bound<PyAny>) -> PyResult<()> {
if let Ok(key_expr) = KeyExpr::from_py(obj) {
return wait(obj.py(), self.0.undeclare(key_expr.0));
Expand All @@ -70,6 +73,10 @@ impl Session {
Ok(())
}

fn new_timestamp(&self) -> Timestamp {
self.0.new_timestamp().into()
}

fn declare_keyexpr(
&self,
py: Python,
Expand Down Expand Up @@ -130,7 +137,9 @@ impl Session {
#[pyo3(from_py_with = "Selector::from_py")] selector: Selector,
handler: Option<&Bound<PyAny>>,
target: Option<QueryTarget>,
consolidation: Option<ConsolidationMode>,
#[pyo3(from_py_with = "QueryConsolidation::from_py_opt")] consolidation: Option<
QueryConsolidation,
>,
#[pyo3(from_py_with = "timeout")] timeout: Option<Duration>,
congestion_control: Option<CongestionControl>,
priority: Option<Priority>,
Expand Down
70 changes: 66 additions & 4 deletions src/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,76 @@
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
use std::time::SystemTime;
use std::{
hash::{Hash, Hasher},
time::{Duration, SystemTime},
};

use pyo3::prelude::*;
use pyo3::{prelude::*, types::PyType};

use crate::macros::wrapper;
use crate::{macros::wrapper, utils::IntoPyResult};

wrapper!(zenoh::time::Timestamp: Clone, PartialEq, PartialOrd);
wrapper!(zenoh::time::TimestampId: Copy, Clone, PartialEq, PartialOrd);

#[pymethods]
impl TimestampId {
fn __richcmp__(&self, other: &Self, op: pyo3::pyclass::CompareOp) -> bool {
match op {
pyo3::pyclass::CompareOp::Lt => self < other,
pyo3::pyclass::CompareOp::Le => self <= other,
pyo3::pyclass::CompareOp::Eq => self == other,
pyo3::pyclass::CompareOp::Ne => self != other,
pyo3::pyclass::CompareOp::Gt => self > other,
pyo3::pyclass::CompareOp::Ge => self >= other,
}
}

fn __bytes__(&self) -> [u8; zenoh::time::TimestampId::MAX_SIZE] {
self.0.to_le_bytes()
}

fn __hash__(&self, py: Python) -> PyResult<isize> {
self.__bytes__().into_py(py).bind(py).hash()
}

fn __repr__(&self) -> String {
format!("{:?}", self.0)
}

fn __str__(&self) -> String {
format!("{}", self.0)
}
}

wrapper!(zenoh::time::Timestamp: Clone, PartialEq, PartialOrd, Hash);

#[pymethods]
impl Timestamp {
fn get_time(&self) -> SystemTime {
self.0.get_time().to_system_time()
}

fn get_id(&self) -> TimestampId {
(*self.0.get_id()).into()
}

fn get_diff_duration(&self, other: Timestamp) -> Duration {
self.0.get_diff_duration(&other.0)
}

fn to_string_rfc3339_lossy(&self) -> String {
self.0.to_string_rfc3339_lossy()
}

#[classmethod]
fn parse_rfc3339(_cls: &Bound<PyType>, s: &str) -> PyResult<Self> {
Ok(Self(
zenoh::time::Timestamp::parse_rfc3339(s)
.map_err(|err| err.cause)
.into_pyres()?,
))
}

fn __richcmp__(&self, other: &Self, op: pyo3::pyclass::CompareOp) -> bool {
match op {
pyo3::pyclass::CompareOp::Lt => self < other,
Expand All @@ -36,6 +92,12 @@ impl Timestamp {
}
}

fn __hash__(&self) -> u64 {
let mut hasher = std::collections::hash_map::DefaultHasher::new();
self.0.hash(&mut hasher);
hasher.finish()
}

fn __repr__(&self) -> String {
format!("{:?}", self.0)
}
Expand Down
32 changes: 25 additions & 7 deletions zenoh/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -438,13 +438,10 @@ _IntoKeyExpr = KeyExpr | str

@final
class Parameters:
def __new__(cls, parameters: dict[str, str] | str): ...
def __new__(cls, parameters: dict[str, str] | str | None = None): ...
def is_empty(self) -> bool:
"""Returns true if properties does not contain anything."""

def contains_key(self, key: str) -> bool:
"""Returns true if properties contains the specified key."""

def get(self, key: str, default: str | None = None) -> str | None:
"""Returns the value corresponding to the key."""

Expand Down Expand Up @@ -528,6 +525,8 @@ class Query:
@property
def key_expr(self) -> KeyExpr: ...
@property
def parameters(self) -> Parameters: ...
@property
def payload(self) -> ZBytes | None: ...
@property
def encoding(self) -> Encoding | None: ...
Expand Down Expand Up @@ -590,6 +589,16 @@ class Queryable(Generic[_H]):
@overload
def __iter__(self) -> Never: ...

@final
class QueryConsolidation:
AUTO: Self
DEFAULT: Self
def __new__(cls, mode: ConsolidationMode, /) -> Self: ...
@property
def mode(self) -> ConsolidationMode: ...

_IntoQueryConsolidation = ConsolidationMode

@final
class QueryTarget(Enum):
"""The kind of consolidation used."""
Expand Down Expand Up @@ -744,7 +753,16 @@ class Session:
Sessions are automatically closed when dropped, but you may want to use this function to handle errors or close the Session asynchronously.
"""

def is_closed(self) -> bool:
"""Check if the session has been closed."""

def undeclare(self, obj: KeyExpr): ...
def new_timestamp(self) -> Timestamp:
"""Get a new Timestamp from a Zenoh session.
The returned timestamp has the current time, with the session's runtime ZenohId.
"""

def declare_keyexpr(self, key_expr: _IntoKeyExpr):
"""Informs Zenoh that you intend to use the provided key_expr multiple times and that it should optimize its transmission.
The returned KeyExpr's internal structure may differ from what you would have obtained through a simple key_expr.try_into(), to save time on detecting the optimizations that have been associated with it.
Expand Down Expand Up @@ -781,7 +799,7 @@ class Session:
handler: _RustHandler[Reply] | None = None,
*,
target: QueryTarget | None = None,
consolidation: ConsolidationMode | None = None,
consolidation: _IntoQueryConsolidation | None = None,
timeout: float | int | None = None,
congestion_control: CongestionControl | None = None,
priority: Priority | None = None,
Expand All @@ -801,7 +819,7 @@ class Session:
handler: _PythonHandler[Reply, _H],
*,
target: QueryTarget | None = None,
consolidation: ConsolidationMode | None = None,
consolidation: _IntoQueryConsolidation | None = None,
timeout: float | int | None = None,
congestion_control: CongestionControl | None = None,
priority: Priority | None = None,
Expand All @@ -821,7 +839,7 @@ class Session:
handler: _PythonCallback[Reply],
*,
target: QueryTarget | None = None,
consolidation: ConsolidationMode | None = None,
consolidation: _IntoQueryConsolidation | None = None,
timeout: float | int | None = None,
congestion_control: CongestionControl | None = None,
priority: Priority | None = None,
Expand Down

0 comments on commit 8cfba9f

Please sign in to comment.