diff --git a/sarracenia/moth/mqtt.py b/sarracenia/moth/mqtt.py index 552d07a3f..963ef8ae6 100755 --- a/sarracenia/moth/mqtt.py +++ b/sarracenia/moth/mqtt.py @@ -59,6 +59,8 @@ class MQTT(Moth): Message Queue Telemetry Transport support. talks to an MQTT broker. Tested with mosquitto. requires MQTTv5 + - broker url schemes: mqtt, mqtts, mqttw, mqttws + Sarracenia Concept mapping from AMQP: AMQP -> MQTT topic hierarcy mapping: @@ -372,9 +374,15 @@ def __getSetup(self): "paho library using auto_ack. may lose data every crash or restart." ) - self.client.connect_async( self.o['broker'].url.hostname, port=self.__sslClientSetup(), \ + self.client.connect( self.o['broker'].url.hostname, port=self.__sslClientSetup(), \ clean_start=False, properties=props ) self.client.enable_logger(logger) + while (self.connect_in_progress) or (self.subscribe_in_progress > 0): + self.client.loop() + time.sleep(0.1) + if self.please_stop: + break + logger.info("waiting for subscription to be set up.") self.client.loop_start() self.connected=True break @@ -460,18 +468,21 @@ def __putSetup(self): self.connect_in_progress = True res = self.client.connect_async(self.o['broker'].url.hostname, port=self.__sslClientSetup(), - properties=props) + properties=props) logger.info('connecting to %s, res=%s' % (self.o['broker'].url.hostname, res)) self.client.loop_start() while self.connect_in_progress: time.sleep(0.1) + if self.please_stop: + break logger.info( f"waiting for connection to {self.o['broker']}") self.client.loop() - self.connected=True - break + if not self.connect_in_progress: + self.connected=True + break except Exception as err: logger.error("failed to {} with {}".format( @@ -522,6 +533,8 @@ def getCleanUp(self): clean_start=False, properties=props ) while self.connect_in_progress: myclient.loop(0.1) + if self.please_stop: + break myclient.disconnect() logger.info('instance deletion for %02d done' % i) @@ -696,8 +709,9 @@ def putNewMessage(self, props.ContentType = PostFormat.content_type( postFormat ) try: - raw_body, headers, content_type = PostFormat.exportAny( body, postFormat, [exchange]+self.o['topicPrefix'], self.o ) + raw_body, headers, content_type = PostFormat.exportAny( body, postFormat, self.o['topicPrefix'], self.o ) # FIXME: might + logger.critical( f" headers:{headers} format: {postFormat}, pfx: {self.o['topicPrefix']} " ) topic = '/'.join(headers['topic']) # url-quote wildcard characters in topics. diff --git a/sarracenia/postformat/__init__.py b/sarracenia/postformat/__init__.py index 7ad37e3c7..13c02d2b4 100644 --- a/sarracenia/postformat/__init__.py +++ b/sarracenia/postformat/__init__.py @@ -75,6 +75,38 @@ def exportAny(msg, post_format='v03', topicPrefix=[ 'v03' ], options={ 'post_for return None, None, None + def topicDerive(msg, options ) -> list: + """ + Sarracenia standard topic derivation. + + https://metpx.github.io/sarracenia/Explanation/Concepts.html#amqp-v09-rabbitmq-settings + """ + + if options['broker'].url.scheme.startswith('mqtt'): + if ( 'exchange' in options ) and ( 'topicPrefix' in options ): + if 'exchangeSplit' in options and options['exchangeSplit'] > 1: + idx = sum( bytearray(msg['identity']['value'], 'ascii')) % len(options['exchange']) + exchange = options['exchange'][idx] + else: + exchange = options['exchange'][0] + topic_prefix = [exchange] + options['topicPrefix'] + topic_separator='/' + else: + topic_prefix = options['topicPrefix'] + topic_separator='.' + + if 'topic' in options: + topic = options['topic'].split(topic_separator) + else: + if 'relPath' in msg: + topic = topic_prefix + msg['relPath'].split('/')[0:-1] + else: + topic = topic_prefix + + return topic + + + # test for v04 first, because v03 may claim all other JSON. import sarracenia.postformat.wis import sarracenia.postformat.v03 diff --git a/sarracenia/postformat/v02.py b/sarracenia/postformat/v02.py index 49a7fbd0f..996df758b 100644 --- a/sarracenia/postformat/v02.py +++ b/sarracenia/postformat/v02.py @@ -139,7 +139,7 @@ def exportMine(body, options) -> (str, dict, str): given a v03 (internal) message, produce an encoded version. """ v2m = v2wrapper.Message(body) - + # v2wrapp for h in [ 'pubTime', 'baseUrl', 'fileOp', 'relPath', 'size', @@ -148,8 +148,6 @@ def exportMine(body, options) -> (str, dict, str): if h in v2m.headers: del v2m.headers[h] - if ( 'broker' in options ) and ( 'exchange' in options ) and \ - options['broker'].url.scheme.startswith('mqtt'): - v2m.headers['topic'] = options['exchange'] + v2m.headers['topic'] + v2m.headers['topic'] = PostFormat.topicDerive( body, options ) return v2m.notice, v2m.headers, V02.content_type() diff --git a/sarracenia/postformat/v03.py b/sarracenia/postformat/v03.py index 0bea6ee09..80dcbd63c 100644 --- a/sarracenia/postformat/v03.py +++ b/sarracenia/postformat/v03.py @@ -86,23 +86,6 @@ def exportMine(body,options) -> (str, dict, str): """ raw_body = json.dumps(body) - topic_separator='.' - - if options['broker'].url.scheme.startswith('mqtt'): - if ( 'exchange' in options ) and ( 'topicPrefix' in options ): - topic_prefix = options['exchange'] + options['topicPrefix'] - topic_separator='/' - else: - topic_prefix = options['topicPrefix'] - - if 'topic' in options: - topic = options['topic'].split(topic_separator) - else: - if 'relPath' in body: - topic = topic_prefix + body['relPath'].split('/')[0:-1] - else: - topic = topic_prefix - - headers = { 'topic': topic } + headers = { 'topic': PostFormat.topicDerive(body,options) } return raw_body, headers, V03.content_type()