Skip to content

Commit

Permalink
Merge pull request #23 from Iture:Iture/issue21
Browse files Browse the repository at this point in the history
Reconnection logic
  • Loading branch information
Iture authored Nov 6, 2022
2 parents c10f65f + cbe321e commit 52967e5
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 25 deletions.
65 changes: 44 additions & 21 deletions MQTTClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import paho.mqtt.client as mqtt

def is_number(s):
def is_number(s) -> bool:
try:
float(s)
return True
Expand All @@ -21,39 +21,56 @@ def is_number(s):
return False

class MQTTClient(multiprocessing.Process):
def __init__(self, messageQ, commandQ, config):
def __init__(self, messageQ, commandQ, config) -> None:
self.logger = logging.getLogger('RFLinkGW.MQTTClient')
self.logger.info("Starting...")

self.config=config
multiprocessing.Process.__init__(self)
self.__messageQ = messageQ
self.__commandQ = commandQ

self.mqttDataPrefix = config['mqtt_prefix']
self.mqttDataFormat = config['mqtt_format']
self.client_connected = False
self.connect_retry_counter = 0
self.mqttDataPrefix = self.config['mqtt_prefix']
self.mqttDataFormat = self.config['mqtt_format']
self._mqttConn = mqtt.Client(client_id='RFLinkGateway')
self._mqttConn.username_pw_set(config['mqtt_user'], config['mqtt_password'])
self._mqttConn.connect(config['mqtt_host'], port=config['mqtt_port'], keepalive=120)
self._mqttConn.username_pw_set(self.config['mqtt_user'], self.config['mqtt_password'])

self._mqttConn.on_disconnect = self._on_disconnect
self._mqttConn.on_publish = self._on_publish
self._mqttConn.on_message = self._on_message
self._mqttConn.on_connect = self._on_connect
self.connect(self.config)


def close(self):
def connect (self,config) -> None:
try:
self._mqttConn.connect(config['mqtt_host'], port=config['mqtt_port'], keepalive=120)
except Exception as e:
self.logger.error("problem with connect: %s" % e)
def close(self) -> None:
self.logger.info("Closing connection")
self._mqttConn.disconnect()

def _on_disconnect(self, client, userdata, rc):
def _on_connect(self,client,userdata,flags,rc) -> None:
self.client_connected = True
self.connect_retry_counter = 0
self.logger.info("Client connected")
self._mqttConn.subscribe("%s/+/+/WRITE/+" % self.mqttDataPrefix)


def _on_disconnect(self, client, userdata, rc) -> None:
if rc != 0:
self.logger.error("Unexpected disconnection.")
self._mqttConn.reconnect()
self.client_connected = False
self.connect(self.config)

def _on_publish(self, client, userdata, mid):
def _on_publish(self, client, userdata, mid) -> None:
self.logger.debug("Message " + str(mid) + " published.")

def _on_message(self, client, userdata, message):
def _on_message(self, client, userdata, message) -> None:
self.logger.debug("Message received: %s" % (message))

data = message.topic.replace(self.mqttDataPrefix + "/", "").split("/")
data = message.topic.replace(self.config['mqtt_prefix'] + "/", "").split("/")
data_out = {
'method': 'subscribe',
'topic': message.topic,
Expand All @@ -65,28 +82,34 @@ def _on_message(self, client, userdata, message):
}
self.__commandQ.put(data_out)

def publish(self, task):
topic = "%s/%s" % (self.mqttDataPrefix, task['topic'])

def publish(self, task) -> None:
topic = "%s/%s" % (self.config['mqtt_prefix'], task['topic'])
if self.mqttDataFormat == 'json':
if is_number(task['payload']):
task['payload'] = '{"value": ' + str(task['payload']) + '}'
else:
task['payload'] = '{"value": "' + str(task['payload']) + '"}'
try:
self._mqttConn.publish(topic, payload=task['payload'])
self.logger.debug('Sending:%s' % (task))
result = self._mqttConn.publish(topic, payload=task['payload'])
self.logger.debug('Sending message %s :%s, result:%s' % (result.mid, task, result.rc))
if result.rc != 0:
raise Exception("Send failed")
except Exception as e:
self.logger.error('Publish problem: %s' % (e))
self.__messageQ.put(task)

def run(self):
self._mqttConn.subscribe("%s/+/+/W/+" % self.mqttDataPrefix)
while True:
if self.client_connected == False:
#TODO Add reconnection limit
time.sleep (1+2*self.connect_retry_counter)
self.logger.error('Reconnecting, try:%s' % (self.connect_retry_counter+1))
self.connect(self.config)
self.connect_retry_counter += 1
if not self.__messageQ.empty():
task = self.__messageQ.get()
if task['method'] == 'publish':
self.publish(task)
else:
time.sleep(0.01)
time.sleep(0.1)
self._mqttConn.loop()
8 changes: 4 additions & 4 deletions SerialProcess.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@


class SerialProcess(multiprocessing.Process):
def __init__(self, messageQ, commandQ, config):
def __init__(self, messageQ, commandQ, config) -> None:
self.logger = logging.getLogger('RFLinkGW.SerialProcessing')

self.logger.info("Starting...")
Expand All @@ -25,11 +25,11 @@ def __init__(self, messageQ, commandQ, config):

self.processing_wdir = config['rflink_wdir_output_params']

def close(self):
def close(self) -> None:
self.sp.close()
self.logger.debug('Serial closed')

def prepare_output(self, data_in):
def prepare_output(self, data_in) -> list:
out = []
data = data_in.decode("ascii").replace(";\r\n", "").split(";")
self.logger.debug("Received message:%s" % (data))
Expand Down Expand Up @@ -80,7 +80,7 @@ def prepare_input(self, task):
self.logger.debug('Sending to serial:%s' % (out_str))
return out_str

def connect(self):
def connect(self) -> None:
self.logger.info('Connecting to serial')
while not self.sp.isOpen():
try:
Expand Down

0 comments on commit 52967e5

Please sign in to comment.