Skip to content

Commit

Permalink
#215 subscribe_many() returns error if slice lengths don't match. Add…
Browse files Browse the repository at this point in the history
…ed TopicMatcher::insert_many()
  • Loading branch information
fpagliughi committed May 25, 2024
1 parent 25059f5 commit 5e33a68
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 3 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@ All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [v0.12.5](https://github.com/eclipse/paho.mqtt.rust/compare/v0.12.4..v0.12.5) - (unreleased)

- Added `TopicMatcher::insert_many()`
- [#216](https://github.com/eclipse/paho.mqtt.rust/issues/216) Deref QoS pointers for SubscribeMany and UnsubscribeMany in server response
- [#215](https://github.com/eclipse/paho.mqtt.rust/issues/215) Now `subscribe_many()` returns an error if slices not the same length. Also added `subscribe_many_same_qos()` to the clients.


## [v0.12.4](https://github.com/eclipse/paho.mqtt.rust/compare/v0.12.3..v0.12.4) - 2024-05-19

- Fixes for topic matching:
Expand Down
4 changes: 2 additions & 2 deletions examples/sync_consume.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ fn main() {
.finalize();

let subscriptions = ["test", "hello"];
let qos = [1, 1];
let qos = 1;

// Make the connection to the broker
match cli.connect(conn_opts) {
Expand All @@ -121,7 +121,7 @@ fn main() {
subscriptions, qos
);

cli.subscribe_many(&subscriptions, &qos)
cli.subscribe_many_same_qos(&subscriptions, qos)
.and_then(|rsp| {
rsp.subscribe_many_response()
.ok_or(mqtt::Error::General("Bad response"))
Expand Down
22 changes: 21 additions & 1 deletion src/async_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -877,8 +877,12 @@ impl AsyncClient {
{
let n = topics.len();

// The length of the slices must match
if n != qos.len() {
return SubscribeManyToken::from_error(-1);
}

let ver = self.mqtt_version();
// TOOD: Make sure topics & qos are same length (or use min)
let tok = Token::from_request(None, ServerRequest::SubscribeMany(n));
let mut rsp_opts = ResponseOptions::new(ver, tok.clone());
let topics = StringCollection::new(topics);
Expand All @@ -903,6 +907,22 @@ impl AsyncClient {
tok
}

/// Subscribes to multiple topics simultaneously using the same QoS
/// for all of them.
///
/// # Arguments
///
/// `topics` The collection of topic names
/// `qos` The quality of service requested for all messages
///
pub fn subscribe_many_same_qos<T>(&self, topics: &[T], qos: i32) -> SubscribeManyToken
where
T: AsRef<str>,
{
let qos = vec![qos; topics.len()];
self.subscribe_many(topics, &qos)
}

/// Subscribes to multiple topics simultaneously with options.
///
/// # Arguments
Expand Down
17 changes: 17 additions & 0 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,23 @@ impl Client {
self.cli.subscribe_many(topics, qos).wait_for(self.timeout)
}

/// Subscribes to multiple topics simultaneously using the same QoS
/// for all of them.
///
/// # Arguments
///
/// `topics` The collection of topic names
/// `qos` The quality of service requested for all messages
///
pub fn subscribe_many_same_qos<T>(&self, topics: &[T], qos: i32) -> Result<ServerResponse>
where
T: AsRef<str>,
{
self.cli
.subscribe_many_same_qos(topics, qos)
.wait_for(self.timeout)
}

/// Subscribes to multiple topics simultaneously with options.
///
/// # Arguments
Expand Down
27 changes: 27 additions & 0 deletions src/topic_matcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,15 @@ impl<T> TopicMatcher<T> {
}
}

impl <T: Clone> TopicMatcher<T> {
/// Inserts multiple filters all with (a clone of) the same value.
pub fn insert_many<S: AsRef<str>>(&mut self, filters: &[S], val: T) {
for filter in filters {
self.insert(filter.as_ref(), val.clone());
}
}
}

// We manually implement Default, otherwise the derived one would
// require T: Default.

Expand Down Expand Up @@ -566,4 +575,22 @@ mod tests {
assert_eq!(n, 4);
}
}

#[test]
fn test_topic_matcher_many() {

let mut tm = TopicMatcher::new();
tm.insert("some/test/#", 99);
tm.insert_many(&[
"some/test/topic",
"some/+/topic",
"some/prod/topic",
], 42);

assert_eq!(tm.get("some/test/#"), Some(&99));
assert_eq!(tm.get("some/test/topic"), Some(&42));
assert_eq!(tm.get("some/+/topic"), Some(&42));
assert_eq!(tm.get("some/prod/topic"), Some(&42));
assert_eq!(tm.get("some/test/bubba"), None);
}
}

0 comments on commit 5e33a68

Please sign in to comment.