Skip to content
This repository has been archived by the owner on Aug 13, 2020. It is now read-only.

Add clean_session parameter and command support to MQTTDynamicSubscriber #110

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 23 additions & 12 deletions calvin/actorstore/systemactors/net/MQTTDynamicSubscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,57 +35,68 @@ class MQTTDynamicSubscriber(Actor):
"auth": { "username": <username "password": <password> },
"will": { "topic": <topic>, "payload": <payload> },
"transport": <tcp or websocket>,
"clean_session": <true|false>
}

Input:
client_id : MQTT client ID
uri : MQTT broker URI (format: schema://host:port)
cmd : command keyword ('subscribe'|'unsubscribe')
topic : topic to subscribe to
qos : MQTT qos
Output:
message : dictionary {"topic": <topic>, "payload": <payload>, "client_id": <client id>}
"""

@manage(['settings', 'mqtt_dict'])
@manage(['settings', 'mqtt_dict', 'queue'])
def init(self, settings):
if not settings:
settings = {}
self.settings = settings
self.mqtt_dict = {}
self.queue = []

"""
Read first available MQTT client message
Read all available MQTT clients for messages and store them in a FIFO queue
The reader will only read the first message in the queue.

@note The rest of the messages are expected to be read at the next readings
"""

@stateguard(lambda self:
@stateguard(lambda self: self.queue or
any(calvinsys.can_read(mqtt) for mqtt in self.mqtt_dict.itervalues()))
@condition(action_output=['message'])
def read_message(self):
client_id, mqtt = next((client_id, mqtt)
for (client_id, mqtt) in self.mqtt_dict.iteritems()
if calvinsys.can_read(mqtt))
message = calvinsys.read(mqtt)
# add client id to the message
message["client_id"] = client_id
message = ""
for (client_id, mqtt) in self.mqtt_dict.iteritems():
if (calvinsys.can_read(mqtt)):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove superfluous parentheses (in all if-statements, not only here)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

message = calvinsys.read(mqtt)
# add client id to the message
message["client_id"] = client_id
self.queue.append(message)
if (self.queue):
message = self.queue.pop(0)
return (message,)

"""
Update MQTT subscribed topics for specific MQTT client
"""

@condition(action_input=['client_id', 'uri', 'topic', 'qos'])
def update_topic(self, client_id, uri, topic, qos):
@condition(action_input=['client_id', 'uri', 'cmd', 'topic', 'qos'])
def update_topic(self, client_id, uri, cmd, topic, qos):
if (topic is None):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Topic must contain at least one character.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

_log.warning("Topic is missing!")
return

if (not client_id in self.mqtt_dict.keys()):
self.mqtt_dict[client_id] = calvinsys.open(self, "mqtt.subscribe",
client_id=client_id,
topics=[topic],
uri=uri,
qos=qos,
**self.settings)
calvinsys.write(self.mqtt_dict[client_id], {"topic":topic, "qos":qos})
calvinsys.write(self.mqtt_dict[client_id],
{"cmd": cmd, "topic":topic, "qos":qos})

action_priority = (update_topic, read_message)
requires = ['mqtt.subscribe']
121 changes: 96 additions & 25 deletions calvinextras/calvinsys/web/mqtt/Subscribe.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,22 +146,40 @@ class Subscribe(base_calvinsys_object.BaseCalvinsysObject):
}

can_write_schema = {
"description": "Does nothing, always return true",
"description": "Always return true, allowing configuration of MQTT client",
"type": "boolean"
}

write_schema = {
"description": "Does nothing"
"description": "Update topic subscriptions",
"type": "object",
"properties": {
"topic": {
"type": "string"
},
"qos": {
"type": "integer"
},
"cmd": {
"type": "string"
},
},
"required": ["topic"]
Copy link

@olaan olaan Jan 16, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Command would be better as an enum. Also, describe defaults (when qos and cmd missing.)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


}
CMD_SUBSCRIBE = "subscribe"
CMD_UNSUBSCRIBE = "unsubscribe"

def init(self, topics, uri=None, hostname=None, port=1883, qos=0, client_id='', will=None, auth=None, tls=None, transport='tcp', payload_only=False, **kwargs):
def init(self, topics, uri=None, hostname=None, port=1883, qos=0, client_id='',
will=None, auth=None, tls=None, transport='tcp', payload_only=False,
**kwargs):

def on_connect(client, userdata, flags, rc):
if rc != 0:
_log.warning("Connection to MQTT broker {}:{} failed".format(hostname, port))
else :
_log.info("Connected to MQTT broker {}:{}".format(hostname, port))
client.subscribe(self.topics)
client.subscribe([(topic, qos) for topic, qos in self.topics.iteritems()])

def on_disconnect(client, userdata, rc):
_log.warning("MQTT broker {}:{} disconnected".format(hostname, port))
Expand Down Expand Up @@ -219,10 +237,11 @@ def on_log_debug(client, string):

_log.info("TLS: {}".format(tls))
self.payload_only = payload_only
self.topics = [(topic.encode("ascii"), qos) for topic in topics]
self.topics = {(topic.encode("ascii")) : qos for topic in list(set(topics))}
self.data = []
clean_session = kwargs.get('clean_session', 'false').lower() == "true"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clean session that comes in should already be a boolean.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


self.client = mqtt.Client(client_id=client_id, transport=transport)
self.client = mqtt.Client(client_id=client_id, transport=transport, clean_session=clean_session)
self.client.on_connect = on_connect
self.client.on_disconnect = on_disconnect
self.client.on_message = on_message
Expand All @@ -245,37 +264,89 @@ def on_log_debug(client, string):
elif is_tls:
_log.warning("TLS configuration is missing!")

self.client.connect_async(host=hostname, port=port)
self.client.connect(host=hostname, port=port)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why change this from async. This would block the runtime, please change back.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

self.client.loop_start()

def can_write(self):
return True

def write(self, data):
ret = True
cmd = data.get("cmd", Subscribe.CMD_SUBSCRIBE)
topic = data.get("topic", "").encode("ascii")
if (not topic):
_log.error("The topic is missing!")
return False
qos = data.get("qos", 0)
update_topic = True
topic_index = -1
# check if topic already exist
for idx in range(len(self.topics)):
(t, q) = self.topics[idx]
if t == topic:
if (q == qos):
update_topic = False
exist = topic in self.topics.iterkeys()

if (cmd == Subscribe.CMD_SUBSCRIBE):
if (not exist or self.topics[topic] != qos):
ret = self._subscribe(topic, qos)
else:
_log.debug("Subscription to topic '{}' already exist!")
elif (cmd == Subscribe.CMD_UNSUBSCRIBE):
if (exist):
ret = self._unsubscribe(topic)
else:
_log.error("Unknown topic!")
ret = False
else:
_log.error("Unknown command: {}!", cmd)
ret = False
if (ret):
_log.debug("Command {}({},[{}]) successfully finished".format(cmd, topic, qos))
return ret

def _subscribe(self, topic, qos):
retry = 10
ret = False
while True:
done = True
try:
status = self.client.subscribe((topic, qos))
if (status[0] == mqtt.MQTT_ERR_SUCCESS):
self.topics[topic] = qos
ret = True
elif (status[0] == mqtt.MQTT_ERR_NO_CONN):
_log.warn("No connection to the MQTT broker")
done = False
else:
topic_index = idx
_log.error("Failed to subscribe topic: ({}, {}) error code {}"
.format(topic, qos, status[0]))
except ValueError:
_log.error("Topic or QOS incorrect!")

if (not done and retry > 0):
time.sleep(0.2)
retry -= 1
else:
break

if (update_topic):
status = self.client.subscribe([(topic, qos)])
return ret

def _unsubscribe(self, topic):
retry = 10
ret = False
while True:
done = True
# assume topic is always correct
status = self.client.unsubscribe(topic)
if (status[0] == mqtt.MQTT_ERR_SUCCESS):
if (topic_index >= 0):
self.topics[topic_index] = (topic, qos)
else:
self.topics.append((topic, qos))
del self.topics[topic]
ret = True
elif (status[0] == mqtt.MQTT_ERR_NO_CONN):
_log.warn("No connection to the MQTT broker")
done = False
else:
_log.error("Failed to update topic: ({}, {}) Check MQTT logs"
.format(topic, qos))
_log.error("Failed to unsubscribe topic: ({}) error code {}"
.format(topic, status[0]))

if (not done and retry > 0):
time.sleep(0.2)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you need delay or wait, either use thread or delayed call (see other calvinsys for usage). This usage will freeze the entire runtime during the sleep.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I said that we should have retries, I meant for the connection, but forgot that when using the loop_start reconnect is handled automatically. But what needs to be added is to resubscribe to all topics in the on_connect callback.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

retry -= 1
else:
break
return ret

def can_read(self):
return bool(self.data)
Expand Down