From c98920909dc22a5f0c51fe882d4138521f8956a1 Mon Sep 17 00:00:00 2001 From: Marcin Kaptur Date: Sat, 5 Nov 2022 22:09:26 +0100 Subject: [PATCH 1/9] Manual reconnect implementation --- MQTTClient.py | 54 +++++++++++++++++++++++++++++++++++---------------- 1 file changed, 37 insertions(+), 17 deletions(-) diff --git a/MQTTClient.py b/MQTTClient.py index 7359f89..ec7aef2 100644 --- a/MQTTClient.py +++ b/MQTTClient.py @@ -4,7 +4,7 @@ import paho.mqtt.client as mqtt -def is_number(s): +def is_number(s) -> bool: try: float(s) return True @@ -21,36 +21,48 @@ def is_number(s): return False class MQTTClient(multiprocessing.Process): - def __init__(self, messageQ, commandQ, config): + def __init__(self, messageQ, commandQ, config) -> None: self.logger = logging.getLogger('RFLinkGW.MQTTClient') self.logger.info("Starting...") multiprocessing.Process.__init__(self) self.__messageQ = messageQ self.__commandQ = commandQ - + self.client_connected = False + self.connect_retry_counter = 0 self.mqttDataPrefix = config['mqtt_prefix'] self.mqttDataFormat = config['mqtt_format'] self._mqttConn = mqtt.Client(client_id='RFLinkGateway') self._mqttConn.username_pw_set(config['mqtt_user'], config['mqtt_password']) self._mqttConn.connect(config['mqtt_host'], port=config['mqtt_port'], keepalive=120) + self._mqttConn.on_disconnect = self._on_disconnect self._mqttConn.on_publish = self._on_publish self._mqttConn.on_message = self._on_message + self._mqttConn.on_connect = self._on_connect - def close(self): + def close(self) -> None: self.logger.info("Closing connection") self._mqttConn.disconnect() - def _on_disconnect(self, client, userdata, rc): + def _on_connect(self,client,userdata,flags,rc) -> None: + if rc == 0: + self.client_connected = True + self.connect_retry_counter = 0 + self.logger.info("Client connected") + self._mqttConn.subscribe("%s/+/+/WRITE/+" % self.mqttDataPrefix) + + + def _on_disconnect(self, client, userdata, rc) -> None: if rc != 0: self.logger.error("Unexpected disconnection.") + self.client_connected = False self._mqttConn.reconnect() - def _on_publish(self, client, userdata, mid): + def _on_publish(self, client, userdata, mid) -> None: self.logger.debug("Message " + str(mid) + " published.") - def _on_message(self, client, userdata, message): + def _on_message(self, client, userdata, message) -> None: self.logger.debug("Message received: %s" % (message)) data = message.topic.replace(self.mqttDataPrefix + "/", "").split("/") @@ -65,7 +77,7 @@ def _on_message(self, client, userdata, message): } self.__commandQ.put(data_out) - def publish(self, task): + def publish(self, task) -> None: topic = "%s/%s" % (self.mqttDataPrefix, task['topic']) if self.mqttDataFormat == 'json': @@ -74,19 +86,27 @@ def publish(self, task): else: task['payload'] = '{"value": "' + str(task['payload']) + '"}' try: - self._mqttConn.publish(topic, payload=task['payload']) - self.logger.debug('Sending:%s' % (task)) + result = self._mqttConn.publish(topic, payload=task['payload']) + self.logger.debug('Sending message %s :%s, result:%s' % (result.mid, task, result.rc)) + if result.rc != 0: + raise Exception("Send failed") except Exception as e: self.logger.error('Publish problem: %s' % (e)) self.__messageQ.put(task) - def run(self): - self._mqttConn.subscribe("%s/+/+/W/+" % self.mqttDataPrefix) + def run(self) -> NoReturn: while True: - if not self.__messageQ.empty(): - task = self.__messageQ.get() - if task['method'] == 'publish': - self.publish(task) + if self.client_connected == False: + #TODO Add reconnection limit + time.sleep (1+2*self.connect_retry_counter) + self.logger.error('Reconnecting...') + self._mqttConn.reconnect() + self.connect_retry_counter += 1 else: - time.sleep(0.01) + if not self.__messageQ.empty(): + task = self.__messageQ.get() + if task['method'] == 'publish': + self.publish(task) + else: + time.sleep(0.1) self._mqttConn.loop() From e2da15329f72fa75aeccf0ea2714f113e56c5f09 Mon Sep 17 00:00:00 2001 From: Marcin Kaptur Date: Sat, 5 Nov 2022 22:16:06 +0100 Subject: [PATCH 2/9] small refactor --- MQTTClient.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/MQTTClient.py b/MQTTClient.py index ec7aef2..c72804b 100644 --- a/MQTTClient.py +++ b/MQTTClient.py @@ -34,13 +34,15 @@ def __init__(self, messageQ, commandQ, config) -> None: self.mqttDataFormat = config['mqtt_format'] self._mqttConn = mqtt.Client(client_id='RFLinkGateway') self._mqttConn.username_pw_set(config['mqtt_user'], config['mqtt_password']) - self._mqttConn.connect(config['mqtt_host'], port=config['mqtt_port'], keepalive=120) self._mqttConn.on_disconnect = self._on_disconnect self._mqttConn.on_publish = self._on_publish self._mqttConn.on_message = self._on_message self._mqttConn.on_connect = self._on_connect + self._mqttConn.connect(config['mqtt_host'], port=config['mqtt_port'], keepalive=120) + + def close(self) -> None: self.logger.info("Closing connection") self._mqttConn.disconnect() @@ -94,12 +96,12 @@ def publish(self, task) -> None: self.logger.error('Publish problem: %s' % (e)) self.__messageQ.put(task) - def run(self) -> NoReturn: + def run(self): while True: if self.client_connected == False: #TODO Add reconnection limit time.sleep (1+2*self.connect_retry_counter) - self.logger.error('Reconnecting...') + self.logger.error('Reconnecting, try:%s' % (self.connect_retry_counter+1)) self._mqttConn.reconnect() self.connect_retry_counter += 1 else: From b2f12e92244d30ad9dcbd92ff23d7a3ccf62e1e5 Mon Sep 17 00:00:00 2001 From: Marcin Kaptur Date: Sat, 5 Nov 2022 22:20:53 +0100 Subject: [PATCH 3/9] Serial refactor --- SerialProcess.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/SerialProcess.py b/SerialProcess.py index cc328ff..28724a6 100644 --- a/SerialProcess.py +++ b/SerialProcess.py @@ -6,7 +6,7 @@ class SerialProcess(multiprocessing.Process): - def __init__(self, messageQ, commandQ, config): + def __init__(self, messageQ, commandQ, config) -> None: self.logger = logging.getLogger('RFLinkGW.SerialProcessing') self.logger.info("Starting...") @@ -25,11 +25,11 @@ def __init__(self, messageQ, commandQ, config): self.processing_wdir = config['rflink_wdir_output_params'] - def close(self): + def close(self) -> None: self.sp.close() self.logger.debug('Serial closed') - def prepare_output(self, data_in): + def prepare_output(self, data_in) -> list: out = [] data = data_in.decode("ascii").replace(";\r\n", "").split(";") self.logger.debug("Received message:%s" % (data)) @@ -80,7 +80,7 @@ def prepare_input(self, task): self.logger.debug('Sending to serial:%s' % (out_str)) return out_str - def connect(self): + def connect(self) -> None: self.logger.info('Connecting to serial') while not self.sp.isOpen(): try: From 7d9785692b083002fd58fe135c7ab0eebd5cbd8e Mon Sep 17 00:00:00 2001 From: Marcin Kaptur Date: Sun, 6 Nov 2022 13:30:17 +0100 Subject: [PATCH 4/9] extracting config to external method --- MQTTClient.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/MQTTClient.py b/MQTTClient.py index c72804b..430d198 100644 --- a/MQTTClient.py +++ b/MQTTClient.py @@ -24,25 +24,26 @@ class MQTTClient(multiprocessing.Process): def __init__(self, messageQ, commandQ, config) -> None: self.logger = logging.getLogger('RFLinkGW.MQTTClient') self.logger.info("Starting...") - + self.config=config multiprocessing.Process.__init__(self) self.__messageQ = messageQ self.__commandQ = commandQ self.client_connected = False self.connect_retry_counter = 0 - self.mqttDataPrefix = config['mqtt_prefix'] - self.mqttDataFormat = config['mqtt_format'] + self.mqttDataPrefix = self.config['mqtt_prefix'] + self.mqttDataFormat = self.config['mqtt_format'] self._mqttConn = mqtt.Client(client_id='RFLinkGateway') - self._mqttConn.username_pw_set(config['mqtt_user'], config['mqtt_password']) + self._mqttConn.username_pw_set(self.config['mqtt_user'], self.config['mqtt_password']) self._mqttConn.on_disconnect = self._on_disconnect self._mqttConn.on_publish = self._on_publish self._mqttConn.on_message = self._on_message self._mqttConn.on_connect = self._on_connect + self.connect(self.config) + + def connect (self,config): self._mqttConn.connect(config['mqtt_host'], port=config['mqtt_port'], keepalive=120) - - def close(self) -> None: self.logger.info("Closing connection") self._mqttConn.disconnect() @@ -59,7 +60,7 @@ def _on_disconnect(self, client, userdata, rc) -> None: if rc != 0: self.logger.error("Unexpected disconnection.") self.client_connected = False - self._mqttConn.reconnect() + self.connect(self.config) def _on_publish(self, client, userdata, mid) -> None: self.logger.debug("Message " + str(mid) + " published.") @@ -67,7 +68,7 @@ def _on_publish(self, client, userdata, mid) -> None: def _on_message(self, client, userdata, message) -> None: self.logger.debug("Message received: %s" % (message)) - data = message.topic.replace(self.mqttDataPrefix + "/", "").split("/") + data = message.topic.replace(self.config['mqtt_prefix'] + "/", "").split("/") data_out = { 'method': 'subscribe', 'topic': message.topic, @@ -80,8 +81,7 @@ def _on_message(self, client, userdata, message) -> None: self.__commandQ.put(data_out) def publish(self, task) -> None: - topic = "%s/%s" % (self.mqttDataPrefix, task['topic']) - + topic = "%s/%s" % (self.config['mqtt_prefix'], task['topic']) if self.mqttDataFormat == 'json': if is_number(task['payload']): task['payload'] = '{"value": ' + str(task['payload']) + '}' @@ -102,7 +102,7 @@ def run(self): #TODO Add reconnection limit time.sleep (1+2*self.connect_retry_counter) self.logger.error('Reconnecting, try:%s' % (self.connect_retry_counter+1)) - self._mqttConn.reconnect() + self.connect(self.config) self.connect_retry_counter += 1 else: if not self.__messageQ.empty(): From b2e7237a630446e66c59a2b5b9824a2c8ecb8fd5 Mon Sep 17 00:00:00 2001 From: Marcin Kaptur Date: Sun, 6 Nov 2022 13:33:29 +0100 Subject: [PATCH 5/9] . --- MQTTClient.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/MQTTClient.py b/MQTTClient.py index 430d198..03a27f0 100644 --- a/MQTTClient.py +++ b/MQTTClient.py @@ -49,11 +49,10 @@ def close(self) -> None: self._mqttConn.disconnect() def _on_connect(self,client,userdata,flags,rc) -> None: - if rc == 0: - self.client_connected = True - self.connect_retry_counter = 0 - self.logger.info("Client connected") - self._mqttConn.subscribe("%s/+/+/WRITE/+" % self.mqttDataPrefix) + self.client_connected = True + self.connect_retry_counter = 0 + self.logger.info("Client connected") + self._mqttConn.subscribe("%s/+/+/WRITE/+" % self.mqttDataPrefix) def _on_disconnect(self, client, userdata, rc) -> None: From 58c9622b4cfa445fab7ad16437fba0625898827c Mon Sep 17 00:00:00 2001 From: Marcin Kaptur Date: Sun, 6 Nov 2022 13:38:33 +0100 Subject: [PATCH 6/9] connect exception handling --- MQTTClient.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/MQTTClient.py b/MQTTClient.py index 03a27f0..046874c 100644 --- a/MQTTClient.py +++ b/MQTTClient.py @@ -43,7 +43,10 @@ def __init__(self, messageQ, commandQ, config) -> None: def connect (self,config): - self._mqttConn.connect(config['mqtt_host'], port=config['mqtt_port'], keepalive=120) + try: + result = self._mqttConn.connect(config['mqtt_host'], port=config['mqtt_port'], keepalive=120) + except: + self.logger.error("problem with connect:%s" % result) def close(self) -> None: self.logger.info("Closing connection") self._mqttConn.disconnect() From cd51a76c12a635229434790b4487409e4133325f Mon Sep 17 00:00:00 2001 From: Marcin Kaptur Date: Sun, 6 Nov 2022 13:42:51 +0100 Subject: [PATCH 7/9] . --- MQTTClient.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/MQTTClient.py b/MQTTClient.py index 046874c..114c9ce 100644 --- a/MQTTClient.py +++ b/MQTTClient.py @@ -42,11 +42,11 @@ def __init__(self, messageQ, commandQ, config) -> None: self.connect(self.config) - def connect (self,config): + def connect (self,config) -> None: try: - result = self._mqttConn.connect(config['mqtt_host'], port=config['mqtt_port'], keepalive=120) + self._mqttConn.connect(config['mqtt_host'], port=config['mqtt_port'], keepalive=120) except: - self.logger.error("problem with connect:%s" % result) + self.logger.error("problem with connect") def close(self) -> None: self.logger.info("Closing connection") self._mqttConn.disconnect() From 6e1d6f3100875587cf7b829e7174b8a09e4f731d Mon Sep 17 00:00:00 2001 From: Marcin Kaptur Date: Sun, 6 Nov 2022 13:47:27 +0100 Subject: [PATCH 8/9] . --- MQTTClient.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/MQTTClient.py b/MQTTClient.py index 114c9ce..0fe5c3c 100644 --- a/MQTTClient.py +++ b/MQTTClient.py @@ -45,8 +45,8 @@ def __init__(self, messageQ, commandQ, config) -> None: def connect (self,config) -> None: try: self._mqttConn.connect(config['mqtt_host'], port=config['mqtt_port'], keepalive=120) - except: - self.logger.error("problem with connect") + except Exception as e: + self.logger.error("problem with connect: %s" % e) def close(self) -> None: self.logger.info("Closing connection") self._mqttConn.disconnect() From cbe321e1aa2aed2b9dfac6bf99f4c7a21632de2d Mon Sep 17 00:00:00 2001 From: Marcin Kaptur Date: Sun, 6 Nov 2022 13:59:13 +0100 Subject: [PATCH 9/9] Loop handling --- MQTTClient.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/MQTTClient.py b/MQTTClient.py index 0fe5c3c..74e38fc 100644 --- a/MQTTClient.py +++ b/MQTTClient.py @@ -106,11 +106,10 @@ def run(self): self.logger.error('Reconnecting, try:%s' % (self.connect_retry_counter+1)) self.connect(self.config) self.connect_retry_counter += 1 + if not self.__messageQ.empty(): + task = self.__messageQ.get() + if task['method'] == 'publish': + self.publish(task) else: - if not self.__messageQ.empty(): - task = self.__messageQ.get() - if task['method'] == 'publish': - self.publish(task) - else: - time.sleep(0.1) + time.sleep(0.1) self._mqttConn.loop()