-
Notifications
You must be signed in to change notification settings - Fork 92
Add clean_session parameter and command support to MQTTDynamicSubscriber #110
base: develop
Are you sure you want to change the base?
Conversation
clean_session flag is set by default. Add support for following commands: 'subscribe' (topic subscribe) and 'unsubscribe' (topic unsubscribe) Other small updates
message["client_id"] = client_id | ||
message = "" | ||
for (client_id, mqtt) in self.mqtt_dict.iteritems(): | ||
if (calvinsys.can_read(mqtt)): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove superfluous parentheses (in all if-statements, not only here)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@condition(action_input=['client_id', 'uri', 'topic', 'qos']) | ||
def update_topic(self, client_id, uri, topic, qos): | ||
@condition(action_input=['client_id', 'uri', 'cmd', 'topic', 'qos']) | ||
def update_topic(self, client_id, uri, cmd, topic, qos): | ||
if (topic is None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Topic must contain at least one character.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
"type": "string" | ||
}, | ||
}, | ||
"required": ["topic"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Command would be better as an enum. Also, describe defaults (when qos and cmd missing.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
.format(topic, status[0])) | ||
|
||
if (not done and retry > 0): | ||
time.sleep(0.2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you need delay or wait, either use thread or delayed call (see other calvinsys for usage). This usage will freeze the entire runtime during the sleep.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When I said that we should have retries, I meant for the connection, but forgot that when using the loop_start reconnect is handled automatically. But what needs to be added is to resubscribe to all topics in the on_connect callback.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See notes.
self.data = [] | ||
clean_session = kwargs.get('clean_session', 'false').lower() == "true" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clean session that comes in should already be a boolean.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -245,37 +264,89 @@ def on_log_debug(client, string): | |||
elif is_tls: | |||
_log.warning("TLS configuration is missing!") | |||
|
|||
self.client.connect_async(host=hostname, port=port) | |||
self.client.connect(host=hostname, port=port) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why change this from async. This would block the runtime, please change back.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
Add support for MQTTDynamicSubscriber actor migration Add migration test using public MQTT broker http://test.mosquitto.org/ Other minor updates
|
||
def on_disconnect(client, userdata, rc): | ||
_log.warning("MQTT broker {}:{} disconnected".format(hostname, port)) | ||
|
||
def on_message(client, userdata, message): | ||
_log.info("New message {}".format(message)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will be removed on the next commit
clean_session flag is set by default.
Add support for following commands: 'subscribe' (topic subscribe) and
'unsubscribe' (topic unsubscribe)
Other small updates