Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: API alignment #330

Merged
merged 2 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ jobs:
run: cargo fmt --check -- --config "unstable_features=true,imports_granularity=Crate,group_imports=StdExternalCrate"

- name: Clippy no-default-features
run: cargo clippy --no-default-features --all-targets -- --deny warnings
run: cargo +stable clippy --no-default-features --all-targets -- --deny warnings

- name: Clippy
run: cargo clippy --all-features --all-targets -- --deny warnings
run: cargo +stable clippy --all-features --all-targets -- --deny warnings

build:
needs: check
Expand Down
4 changes: 2 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ pub(crate) mod zenoh {
pubsub::{Publisher, Reliability, Subscriber},
qos::{CongestionControl, Priority},
query::{
ConsolidationMode, Parameters, Query, QueryTarget, Queryable, Reply, ReplyError,
Selector,
ConsolidationMode, Parameters, Query, QueryConsolidation, QueryTarget, Queryable,
Reply, ReplyError, Selector,
},
sample::{Sample, SampleKind},
scouting::{scout, Hello, Scout},
Expand Down
48 changes: 35 additions & 13 deletions src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::{
key_expr::KeyExpr,
macros::{build, downcast_or_new, enum_mapper, option_wrapper, wrapper},
qos::{CongestionControl, Priority},
utils::{generic, wait, IntoPyResult, IntoPython, MapInto},
utils::{generic, wait, IntoPyResult, IntoPython, IntoRust, MapInto},
};

enum_mapper!(zenoh::query::QueryTarget: u8 {
Expand Down Expand Up @@ -53,6 +53,29 @@ impl ConsolidationMode {
const DEFAULT: Self = Self::Auto;
}

wrapper!(zenoh::query::QueryConsolidation: Clone);
downcast_or_new!(QueryConsolidation => ConsolidationMode);

#[pymethods]
impl QueryConsolidation {
#[classattr]
const AUTO: Self = Self(zenoh::query::QueryConsolidation::AUTO);
#[classattr]
const DEFAULT: Self = Self(zenoh::query::QueryConsolidation::DEFAULT);

#[new]
fn new(mode: Option<ConsolidationMode>) -> PyResult<Self> {
let Some(mode) = mode else {
return Ok(Self::DEFAULT);
};
Ok(Self(mode.into_rust().into()))
}

fn mode(&self) -> ConsolidationMode {
self.0.mode().into()
}
}

wrapper!(zenoh::query::Query: Clone);

#[pymethods]
Expand Down Expand Up @@ -308,7 +331,10 @@ downcast_or_new!(Parameters);
#[pymethods]
impl Parameters {
#[new]
pub(crate) fn new(obj: &Bound<PyAny>) -> PyResult<Self> {
pub(crate) fn new(obj: Option<&Bound<PyAny>>) -> PyResult<Self> {
let Some(obj) = obj else {
return Ok(Self(zenoh::query::Parameters::empty()));
};
if let Ok(map) = <HashMap<String, String>>::extract_bound(obj) {
return Ok(Self(map.into()));
}
Expand All @@ -319,24 +345,20 @@ impl Parameters {
self.0.is_empty()
}

fn contains_key(&self, key: String) -> bool {
self.0.contains_key(key)
}

#[pyo3(signature = (key, default = None))]
fn get(&self, key: String, default: Option<String>) -> Option<String> {
fn get(&self, key: &str, default: Option<String>) -> Option<String> {
self.0.get(key).map_into().or(default)
}

fn values(&self, key: String) -> Vec<&str> {
fn values(&self, key: &str) -> Vec<&str> {
self.0.values(key).collect()
}

fn insert(&mut self, key: String, value: String) -> Option<String> {
fn insert(&mut self, key: &str, value: &str) -> Option<String> {
self.0.insert(key, value)
}

fn remove(&mut self, key: String) -> Option<String> {
fn remove(&mut self, key: &str) -> Option<String> {
self.0.remove(key)
}

Expand All @@ -352,11 +374,11 @@ impl Parameters {
!self.0.is_empty()
}

fn __contains__(&self, key: String) -> bool {
self.contains_key(key)
fn __contains__(&self, key: &str) -> bool {
self.0.contains_key(key)
}

fn __getitem__(&self, key: String) -> Option<String> {
fn __getitem__(&self, key: &str) -> Option<String> {
self.get(key, None)
}

Expand Down
17 changes: 13 additions & 4 deletions src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ use crate::{
macros::{build, with, wrapper},
pubsub::{Publisher, Reliability, Subscriber},
qos::{CongestionControl, Priority},
query::{ConsolidationMode, QueryTarget, Queryable, Reply, Selector},
query::{QueryConsolidation, QueryTarget, Queryable, Reply, Selector},
time::Timestamp,
utils::{wait, IntoPython, MapInto},
};

Expand Down Expand Up @@ -56,12 +57,14 @@ impl Session {
Ok(self.0.zid().into())
}

// TODO HLC

fn close(&self, py: Python) -> PyResult<()> {
wait(py, self.0.close())
}

fn is_closed(&self) -> bool {
self.0.is_closed()
}

fn undeclare(&self, obj: &Bound<PyAny>) -> PyResult<()> {
if let Ok(key_expr) = KeyExpr::from_py(obj) {
return wait(obj.py(), self.0.undeclare(key_expr.0));
Expand All @@ -70,6 +73,10 @@ impl Session {
Ok(())
}

fn new_timestamp(&self) -> Timestamp {
self.0.new_timestamp().into()
}

fn declare_keyexpr(
&self,
py: Python,
Expand Down Expand Up @@ -130,7 +137,9 @@ impl Session {
#[pyo3(from_py_with = "Selector::from_py")] selector: Selector,
handler: Option<&Bound<PyAny>>,
target: Option<QueryTarget>,
consolidation: Option<ConsolidationMode>,
#[pyo3(from_py_with = "QueryConsolidation::from_py_opt")] consolidation: Option<
QueryConsolidation,
>,
#[pyo3(from_py_with = "timeout")] timeout: Option<Duration>,
congestion_control: Option<CongestionControl>,
priority: Option<Priority>,
Expand Down
70 changes: 66 additions & 4 deletions src/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,76 @@
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
use std::time::SystemTime;
use std::{
hash::{Hash, Hasher},
time::{Duration, SystemTime},
};

use pyo3::prelude::*;
use pyo3::{prelude::*, types::PyType};

use crate::macros::wrapper;
use crate::{macros::wrapper, utils::IntoPyResult};

wrapper!(zenoh::time::Timestamp: Clone, PartialEq, PartialOrd);
wrapper!(zenoh::time::TimestampId: Copy, Clone, PartialEq, PartialOrd);

#[pymethods]
impl TimestampId {
fn __richcmp__(&self, other: &Self, op: pyo3::pyclass::CompareOp) -> bool {
match op {
pyo3::pyclass::CompareOp::Lt => self < other,
pyo3::pyclass::CompareOp::Le => self <= other,
pyo3::pyclass::CompareOp::Eq => self == other,
pyo3::pyclass::CompareOp::Ne => self != other,
pyo3::pyclass::CompareOp::Gt => self > other,
pyo3::pyclass::CompareOp::Ge => self >= other,
}
}

fn __bytes__(&self) -> [u8; zenoh::time::TimestampId::MAX_SIZE] {
self.0.to_le_bytes()
}

fn __hash__(&self, py: Python) -> PyResult<isize> {
self.__bytes__().into_py(py).bind(py).hash()
}

fn __repr__(&self) -> String {
format!("{:?}", self.0)
}

fn __str__(&self) -> String {
format!("{}", self.0)
}
}

wrapper!(zenoh::time::Timestamp: Clone, PartialEq, PartialOrd, Hash);

#[pymethods]
impl Timestamp {
fn get_time(&self) -> SystemTime {
self.0.get_time().to_system_time()
}

fn get_id(&self) -> TimestampId {
(*self.0.get_id()).into()
}

fn get_diff_duration(&self, other: Timestamp) -> Duration {
self.0.get_diff_duration(&other.0)
}

fn to_string_rfc3339_lossy(&self) -> String {
self.0.to_string_rfc3339_lossy()
}

#[classmethod]
fn parse_rfc3339(_cls: &Bound<PyType>, s: &str) -> PyResult<Self> {
Ok(Self(
zenoh::time::Timestamp::parse_rfc3339(s)
.map_err(|err| err.cause)
.into_pyres()?,
))
}

fn __richcmp__(&self, other: &Self, op: pyo3::pyclass::CompareOp) -> bool {
match op {
pyo3::pyclass::CompareOp::Lt => self < other,
Expand All @@ -36,6 +92,12 @@ impl Timestamp {
}
}

fn __hash__(&self) -> u64 {
let mut hasher = std::collections::hash_map::DefaultHasher::new();
self.0.hash(&mut hasher);
hasher.finish()
}

fn __repr__(&self) -> String {
format!("{:?}", self.0)
}
Expand Down
30 changes: 1 addition & 29 deletions src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,42 +93,14 @@ pub(crate) fn generic(cls: &Bound<PyType>, args: &Bound<PyAny>) -> PyObject {
.unbind()
}

pub(crate) struct TryProcessIter<'a, I, E> {
iter: I,
error: &'a mut Option<E>,
}

impl<I: Iterator<Item = Result<T, E>>, T, E> Iterator for TryProcessIter<'_, I, E> {
type Item = T;

fn next(&mut self) -> Option<Self::Item> {
match self.iter.next() {
Some(Ok(x)) => Some(x),
Some(Err(err)) => {
*self.error = Some(err);
None
}
None => None,
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
if self.error.is_some() {
(0, Some(0))
} else {
self.iter.size_hint()
}
}
}

pub(crate) fn short_type_name<T: ?Sized>() -> &'static str {
let name = std::any::type_name::<T>();
name.rsplit_once("::").map_or(name, |(_, name)| name)
}

pub(crate) fn wait<T: Send>(
py: Python,
resolve: impl zenoh::Resolve<zenoh::Result<T>> + Send,
resolve: impl zenoh::Resolve<zenoh::Result<T>>,
) -> PyResult<T> {
py.allow_threads(|| resolve.wait()).into_pyres()
}
32 changes: 25 additions & 7 deletions zenoh/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -438,13 +438,10 @@ _IntoKeyExpr = KeyExpr | str

@final
class Parameters:
def __new__(cls, parameters: dict[str, str] | str): ...
def __new__(cls, parameters: dict[str, str] | str | None = None): ...
def is_empty(self) -> bool:
"""Returns true if properties does not contain anything."""

def contains_key(self, key: str) -> bool:
"""Returns true if properties contains the specified key."""

def get(self, key: str, default: str | None = None) -> str | None:
"""Returns the value corresponding to the key."""

Expand Down Expand Up @@ -528,6 +525,8 @@ class Query:
@property
def key_expr(self) -> KeyExpr: ...
@property
def parameters(self) -> Parameters: ...
@property
def payload(self) -> ZBytes | None: ...
@property
def encoding(self) -> Encoding | None: ...
Expand Down Expand Up @@ -590,6 +589,16 @@ class Queryable(Generic[_H]):
@overload
def __iter__(self) -> Never: ...

@final
class QueryConsolidation:
AUTO: Self
DEFAULT: Self
def __new__(cls, mode: ConsolidationMode, /) -> Self: ...
@property
def mode(self) -> ConsolidationMode: ...

_IntoQueryConsolidation = ConsolidationMode

@final
class QueryTarget(Enum):
"""The kind of consolidation used."""
Expand Down Expand Up @@ -744,7 +753,16 @@ class Session:
Sessions are automatically closed when dropped, but you may want to use this function to handle errors or close the Session asynchronously.
"""

def is_closed(self) -> bool:
"""Check if the session has been closed."""

def undeclare(self, obj: KeyExpr): ...
def new_timestamp(self) -> Timestamp:
"""Get a new Timestamp from a Zenoh session.

The returned timestamp has the current time, with the session's runtime ZenohId.
"""

def declare_keyexpr(self, key_expr: _IntoKeyExpr):
"""Informs Zenoh that you intend to use the provided key_expr multiple times and that it should optimize its transmission.
The returned KeyExpr's internal structure may differ from what you would have obtained through a simple key_expr.try_into(), to save time on detecting the optimizations that have been associated with it.
Expand Down Expand Up @@ -781,7 +799,7 @@ class Session:
handler: _RustHandler[Reply] | None = None,
*,
target: QueryTarget | None = None,
consolidation: ConsolidationMode | None = None,
consolidation: _IntoQueryConsolidation | None = None,
timeout: float | int | None = None,
congestion_control: CongestionControl | None = None,
priority: Priority | None = None,
Expand All @@ -801,7 +819,7 @@ class Session:
handler: _PythonHandler[Reply, _H],
*,
target: QueryTarget | None = None,
consolidation: ConsolidationMode | None = None,
consolidation: _IntoQueryConsolidation | None = None,
timeout: float | int | None = None,
congestion_control: CongestionControl | None = None,
priority: Priority | None = None,
Expand All @@ -821,7 +839,7 @@ class Session:
handler: _PythonCallback[Reply],
*,
target: QueryTarget | None = None,
consolidation: ConsolidationMode | None = None,
consolidation: _IntoQueryConsolidation | None = None,
timeout: float | int | None = None,
congestion_control: CongestionControl | None = None,
priority: Priority | None = None,
Expand Down
Loading