Skip to content

Commit

Permalink
for #415 exchangeSplit implement lost (regreesion)
Browse files Browse the repository at this point in the history
corrected with new topicDerive in postformat parent class, same for v02, and v03.
  • Loading branch information
petersilva committed Jul 22, 2023
1 parent 2c757e0 commit 945bc00
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 27 deletions.
24 changes: 19 additions & 5 deletions sarracenia/moth/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ class MQTT(Moth):
Message Queue Telemetry Transport support.
talks to an MQTT broker. Tested with mosquitto. requires MQTTv5
- broker url schemes: mqtt, mqtts, mqttw, mqttws
Sarracenia Concept mapping from AMQP:
AMQP -> MQTT topic hierarcy mapping:
Expand Down Expand Up @@ -372,9 +374,15 @@ def __getSetup(self):
"paho library using auto_ack. may lose data every crash or restart."
)

self.client.connect_async( self.o['broker'].url.hostname, port=self.__sslClientSetup(), \
self.client.connect( self.o['broker'].url.hostname, port=self.__sslClientSetup(), \
clean_start=False, properties=props )
self.client.enable_logger(logger)
while (self.connect_in_progress) or (self.subscribe_in_progress > 0):
self.client.loop()
time.sleep(0.1)
if self.please_stop:
break
logger.info("waiting for subscription to be set up.")
self.client.loop_start()
self.connected=True
break
Expand Down Expand Up @@ -460,18 +468,21 @@ def __putSetup(self):
self.connect_in_progress = True
res = self.client.connect_async(self.o['broker'].url.hostname,
port=self.__sslClientSetup(),
properties=props)
properties=props)
logger.info('connecting to %s, res=%s' % (self.o['broker'].url.hostname, res))

self.client.loop_start()

while self.connect_in_progress:
time.sleep(0.1)
if self.please_stop:
break
logger.info( f"waiting for connection to {self.o['broker']}")
self.client.loop()

self.connected=True
break
if not self.connect_in_progress:
self.connected=True
break

except Exception as err:
logger.error("failed to {} with {}".format(
Expand Down Expand Up @@ -522,6 +533,8 @@ def getCleanUp(self):
clean_start=False, properties=props )
while self.connect_in_progress:
myclient.loop(0.1)
if self.please_stop:
break
myclient.disconnect()
logger.info('instance deletion for %02d done' % i)

Expand Down Expand Up @@ -696,8 +709,9 @@ def putNewMessage(self,
props.ContentType = PostFormat.content_type( postFormat )

try:
raw_body, headers, content_type = PostFormat.exportAny( body, postFormat, [exchange]+self.o['topicPrefix'], self.o )
raw_body, headers, content_type = PostFormat.exportAny( body, postFormat, self.o['topicPrefix'], self.o )
# FIXME: might
logger.critical( f" headers:{headers} format: {postFormat}, pfx: {self.o['topicPrefix']} " )
topic = '/'.join(headers['topic'])

# url-quote wildcard characters in topics.
Expand Down
32 changes: 32 additions & 0 deletions sarracenia/postformat/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,38 @@ def exportAny(msg, post_format='v03', topicPrefix=[ 'v03' ], options={ 'post_for

return None, None, None

def topicDerive(msg, options ) -> list:
"""
Sarracenia standard topic derivation.
https://metpx.github.io/sarracenia/Explanation/Concepts.html#amqp-v09-rabbitmq-settings
"""

if options['broker'].url.scheme.startswith('mqtt'):
if ( 'exchange' in options ) and ( 'topicPrefix' in options ):
if 'exchangeSplit' in options and options['exchangeSplit'] > 1:
idx = sum( bytearray(msg['identity']['value'], 'ascii')) % len(options['exchange'])
exchange = options['exchange'][idx]
else:
exchange = options['exchange'][0]
topic_prefix = [exchange] + options['topicPrefix']
topic_separator='/'
else:
topic_prefix = options['topicPrefix']
topic_separator='.'

if 'topic' in options:
topic = options['topic'].split(topic_separator)
else:
if 'relPath' in msg:
topic = topic_prefix + msg['relPath'].split('/')[0:-1]
else:
topic = topic_prefix

return topic



# test for v04 first, because v03 may claim all other JSON.
import sarracenia.postformat.wis
import sarracenia.postformat.v03
Expand Down
6 changes: 2 additions & 4 deletions sarracenia/postformat/v02.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ def exportMine(body, options) -> (str, dict, str):
given a v03 (internal) message, produce an encoded version.
"""
v2m = v2wrapper.Message(body)

# v2wrapp
for h in [
'pubTime', 'baseUrl', 'fileOp', 'relPath', 'size',
Expand All @@ -148,8 +148,6 @@ def exportMine(body, options) -> (str, dict, str):
if h in v2m.headers:
del v2m.headers[h]

if ( 'broker' in options ) and ( 'exchange' in options ) and \
options['broker'].url.scheme.startswith('mqtt'):
v2m.headers['topic'] = options['exchange'] + v2m.headers['topic']
v2m.headers['topic'] = PostFormat.topicDerive( body, options )

return v2m.notice, v2m.headers, V02.content_type()
19 changes: 1 addition & 18 deletions sarracenia/postformat/v03.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,23 +86,6 @@ def exportMine(body,options) -> (str, dict, str):
"""
raw_body = json.dumps(body)

topic_separator='.'

if options['broker'].url.scheme.startswith('mqtt'):
if ( 'exchange' in options ) and ( 'topicPrefix' in options ):
topic_prefix = options['exchange'] + options['topicPrefix']
topic_separator='/'
else:
topic_prefix = options['topicPrefix']

if 'topic' in options:
topic = options['topic'].split(topic_separator)
else:
if 'relPath' in body:
topic = topic_prefix + body['relPath'].split('/')[0:-1]
else:
topic = topic_prefix

headers = { 'topic': topic }
headers = { 'topic': PostFormat.topicDerive(body,options) }

return raw_body, headers, V03.content_type()

0 comments on commit 945bc00

Please sign in to comment.