Skip to content
This repository has been archived by the owner on Jan 17, 2020. It is now read-only.

Commit

Permalink
Feature: Added unsubscribe method to MqttClient (#123)
Browse files Browse the repository at this point in the history
  • Loading branch information
TheBestJohn authored and Ravi Teja committed Dec 23, 2018
1 parent 587c868 commit e8c2c8f
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 1 deletion.
1 change: 1 addition & 0 deletions src/client/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,7 @@ impl From<Request> for Packet {
Request::Ping => Packet::Pingreq,
Request::Disconnect => Packet::Disconnect,
Request::Subscribe(subscribe) => Packet::Subscribe(subscribe),
Request::Unsubscribe(unsubscribe) => Packet::Unsubscribe(unsubscribe),
_ => unimplemented!(),
}
}
Expand Down
17 changes: 16 additions & 1 deletion src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::error::{ClientError, ConnectError};
use crate::MqttOptions;
use crossbeam_channel;
use futures::{sync::mpsc, Future, Sink};
use mqtt311::{PacketIdentifier, Publish, QoS, Subscribe, SubscribeTopic};
use mqtt311::{PacketIdentifier, Publish, QoS, Subscribe, Unsubscribe, SubscribeTopic};
use std::sync::Arc;

pub mod connection;
Expand All @@ -26,6 +26,7 @@ pub enum Notification {
pub enum Request {
Publish(Publish),
Subscribe(Subscribe),
Unsubscribe(Unsubscribe),
PubAck(PacketIdentifier),
PubRec(PacketIdentifier),
PubRel(PacketIdentifier),
Expand Down Expand Up @@ -119,6 +120,20 @@ impl MqttClient {
Ok(())
}

pub fn unsubscribe<S>(&mut self, topic: S) -> Result<(), ClientError>
where
S: Into<String>,
{
let unsubscribe = Unsubscribe {
pkid: PacketIdentifier::zero(),
topics: vec![topic.into()],
};

let tx = &mut self.request_tx;
tx.send(Request::Unsubscribe(unsubscribe)).wait()?;
Ok(())
}

pub fn pause(&mut self) -> Result<(), ClientError> {
let tx = &mut self.command_tx;
tx.send(Command::Pause).wait()?;
Expand Down
1 change: 1 addition & 0 deletions src/client/mqttstate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ impl MqttState {
Packet::Pingresp => self.handle_incoming_pingresp(),
Packet::Publish(publish) => self.handle_incoming_publish(publish.clone()),
Packet::Suback(_pkid) => Ok((Notification::None, Request::None)),
Packet::Unsuback(_pkid) => Ok((Notification::None, Request::None)),
Packet::Puback(pkid) => self.handle_incoming_puback(pkid),
Packet::Pubrec(pkid) => self.handle_incoming_pubrec(pkid),
Packet::Pubrel(pkid) => self.handle_incoming_pubrel(pkid),
Expand Down

0 comments on commit e8c2c8f

Please sign in to comment.