From 447b4eb3c1371c67b1abb2c5cca30ca0a8239e7a Mon Sep 17 00:00:00 2001 From: TheBestJohn Date: Sat, 22 Dec 2018 05:23:19 -0500 Subject: [PATCH] Added unsubscribe method to MqttClient --- src/client/connection.rs | 1 + src/client/mod.rs | 17 ++++++++++++++++- src/client/mqttstate.rs | 1 + 3 files changed, 18 insertions(+), 1 deletion(-) diff --git a/src/client/connection.rs b/src/client/connection.rs index 97ad413..135288e 100644 --- a/src/client/connection.rs +++ b/src/client/connection.rs @@ -456,6 +456,7 @@ impl From for Packet { Request::Ping => Packet::Pingreq, Request::Disconnect => Packet::Disconnect, Request::Subscribe(subscribe) => Packet::Subscribe(subscribe), + Request::Unsubscribe(unsubscribe) => Packet::Unsubscribe(unsubscribe), _ => unimplemented!(), } } diff --git a/src/client/mod.rs b/src/client/mod.rs index ee004d2..938a1d0 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -2,7 +2,7 @@ use crate::error::{ClientError, ConnectError}; use crate::MqttOptions; use crossbeam_channel; use futures::{sync::mpsc, Future, Sink}; -use mqtt311::{PacketIdentifier, Publish, QoS, Subscribe, SubscribeTopic}; +use mqtt311::{PacketIdentifier, Publish, QoS, Subscribe, Unsubscribe, SubscribeTopic}; use std::sync::Arc; pub mod connection; @@ -26,6 +26,7 @@ pub enum Notification { pub enum Request { Publish(Publish), Subscribe(Subscribe), + Unsubscribe(Unsubscribe), PubAck(PacketIdentifier), PubRec(PacketIdentifier), PubRel(PacketIdentifier), @@ -119,6 +120,20 @@ impl MqttClient { Ok(()) } + pub fn unsubscribe(&mut self, topic: S) -> Result<(), ClientError> + where + S: Into, + { + let unsubscribe = Unsubscribe { + pkid: PacketIdentifier::zero(), + topics: vec![topic.into()], + }; + + let tx = &mut self.request_tx; + tx.send(Request::Unsubscribe(unsubscribe)).wait()?; + Ok(()) + } + pub fn pause(&mut self) -> Result<(), ClientError> { let tx = &mut self.command_tx; tx.send(Command::Pause).wait()?; diff --git a/src/client/mqttstate.rs b/src/client/mqttstate.rs index 22371d8..fb9b826 100644 --- a/src/client/mqttstate.rs +++ b/src/client/mqttstate.rs @@ -91,6 +91,7 @@ impl MqttState { Packet::Pingresp => self.handle_incoming_pingresp(), Packet::Publish(publish) => self.handle_incoming_publish(publish.clone()), Packet::Suback(_pkid) => Ok((Notification::None, Request::None)), + Packet::Unsuback(_pkid) => Ok((Notification::None, Request::None)), Packet::Puback(pkid) => self.handle_incoming_puback(pkid), Packet::Pubrec(pkid) => self.handle_incoming_pubrec(pkid), Packet::Pubrel(pkid) => self.handle_incoming_pubrel(pkid),