Skip to content

Commit

Permalink
V03 issue415 (#723)
Browse files Browse the repository at this point in the history
* starting on #415... syntax works, but connects fail.

* issue #415 websocket connection takes more time... need to wait for
establishment to complete.

* for #415 exchangeSplit implement lost (regreesion)
corrected with new topicDerive in postformat parent class, same for v02, and v03.

* tmate always hangs manually invoked tests in spite of not being
selected. disabling.

---------

Co-authored-by: petersilva <[email protected]>
  • Loading branch information
petersilva and petersilva authored Jul 27, 2023
1 parent 10d730a commit 8ae69d2
Show file tree
Hide file tree
Showing 8 changed files with 102 additions and 52 deletions.
10 changes: 6 additions & 4 deletions .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,13 @@ jobs:
# Enable tmate debugging of manually-triggered workflows if the input option was provided
# https://github.com/marketplace/actions/debugging-with-tmate
# 2023/07/22 PAS, removed because when manually invoking test it always enables in spite
# of the github box not being checked, so cannot invoke the tests manually.
#
- name: Setup tmate session
uses: mxschmitt/action-tmate@v3
if: ${{ github.event.inputs.debug_enabled }}
#if: ${{ github.event_name == 'workflow_dispatch' && github.event.inputs.debug_enabled }}
#- name: Setup tmate session
# uses: mxschmitt/action-tmate@v3
# if: ${{ github.event.inputs.debug_enabled }}
# #if: ${{ github.event_name == 'workflow_dispatch' && github.event.inputs.debug_enabled }}

- name: Limit ${{ matrix.which_test }} test.
run: |
Expand Down
5 changes: 0 additions & 5 deletions .github/workflows/flow_basic.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,6 @@ jobs:
travis/flow_autoconfig.sh
travis/ssh_localhost.sh
- name: Setup tmate session
uses: mxschmitt/action-tmate@v3
if: ${{ github.event.inputs.debug_enabled }}
#if: ${{ github.event_name == 'workflow_dispatch' && github.event.inputs.debug_enabled }}

- name: Add and Remove configs,
run: |
pwd
Expand Down
2 changes: 1 addition & 1 deletion sarracenia/credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def __str__(self):
if self.url.hostname:
s += '@' + self.url.hostname
if self.url.port:
s += ':' + self.url.port
s += ':' + str(self.url.port)
if self.url.path:
s += self.url.path

Expand Down
20 changes: 12 additions & 8 deletions sarracenia/moth/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@
}

def ProtocolPresent(p) -> bool:
if ( p in ['amqp', 'amqps' ] ) and sarracenia.extras['amqp']['present']:
if ( p[0:4] in ['amqp'] ) and sarracenia.extras['amqp']['present']:
return True
if ( p in ['mqtt', 'mqtts' ] ) and sarracenia.extras['mqtt']['present']:
if ( p[0:4] in ['mqtt'] ) and sarracenia.extras['mqtt']['present']:
return True
if p in sarracenia.extras:
logger.critical( f"support for {p} missing, please install python packages: {' '.join(sarracenia.extras[p]['modules_needed'])}" )
Expand Down Expand Up @@ -218,9 +218,11 @@ def subFactory(props) -> 'Moth':
return None

for sc in Moth.__subclasses__():
if (props['broker'].url.scheme == sc.__name__.lower()) or (
(props['broker'].url.scheme[0:-1] == sc.__name__.lower()) and
(props['broker'].url.scheme[-1] == 's')):
driver=sc.__name__.lower()
scheme=props['broker'].url.scheme
if (scheme == driver) or \
( (scheme[0:-1] == driver) and (scheme[-1] in [ 's', 'w' ])) or \
( (scheme[0:-2] == driver) and (scheme[-2] == 'ws')):
return sc(props, True)
logger.error('broker intialization failure')
return None
Expand All @@ -240,9 +242,11 @@ def pubFactory(props) -> 'Moth':
return None

for sc in Moth.__subclasses__():
if (props['broker'].url.scheme == sc.__name__.lower()) or (
(props['broker'].url.scheme[0:-1] == sc.__name__.lower()) and
(props['broker'].url.scheme[-1] == 's')):
driver=sc.__name__.lower()
scheme=props['broker'].url.scheme
if (scheme == driver) or \
( (scheme[0:-1] == driver) and (scheme[-1] in [ 's', 'w' ])) or \
( (scheme[0:-2] == driver) and (scheme[-2] == 'ws')):
return sc(props, False)

# ProtocolPresent test should ensure that we never get here...
Expand Down
60 changes: 48 additions & 12 deletions sarracenia/moth/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ class MQTT(Moth):
Message Queue Telemetry Transport support.
talks to an MQTT broker. Tested with mosquitto. requires MQTTv5
- broker url schemes: mqtt, mqtts, mqttw, mqttws
Sarracenia Concept mapping from AMQP:
AMQP -> MQTT topic hierarcy mapping:
Expand Down Expand Up @@ -143,7 +145,7 @@ def __init__(self, options, is_subscriber):

if 'logLevel' in self.o['settings'][me]:
logger.setLevel(self.o['logLevel'].upper())

self.proto_version = paho.mqtt.client.MQTTv5

if 'receiveMaximum' in self.o and type(self.o['receiveMaximum']) is not int:
Expand Down Expand Up @@ -298,8 +300,13 @@ def __clientSetup(self, cid) -> paho.mqtt.client.Client:

self.connect_in_progress = True

client = paho.mqtt.client.Client( userdata=self, \
client_id=cid, protocol=paho.mqtt.client.MQTTv5 )
if (self.o['broker'].url.scheme[-2:] == 'ws' ) or \
(self.o['broker'].url.scheme[-1] == 'w' ) :
client = paho.mqtt.client.Client( userdata=self, transport="websockets", \
client_id=cid, protocol=paho.mqtt.client.MQTTv5 )
else:
client = paho.mqtt.client.Client( userdata=self, \
client_id=cid, protocol=paho.mqtt.client.MQTTv5 )

client.connected = False
client.on_connect = MQTT.__sub_on_connect
Expand Down Expand Up @@ -367,9 +374,15 @@ def __getSetup(self):
"paho library using auto_ack. may lose data every crash or restart."
)

self.client.connect_async( self.o['broker'].url.hostname, port=self.__sslClientSetup(), \
self.client.connect( self.o['broker'].url.hostname, port=self.__sslClientSetup(), \
clean_start=False, properties=props )
self.client.enable_logger(logger)
while (self.connect_in_progress) or (self.subscribe_in_progress > 0):
self.client.loop()
time.sleep(0.1)
if self.please_stop:
break
logger.info("waiting for subscription to be set up.")
self.client.loop_start()
self.connected=True
break
Expand Down Expand Up @@ -432,8 +445,16 @@ def __putSetup(self):
if self.o['message_ttl'] > 0:
props.MessageExpiryInterval = int(self.o['message_ttl'])

self.client = paho.mqtt.client.Client(
protocol=self.proto_version, userdata=self)
if (self.o['broker'].url.scheme[-2:] == 'ws' ) or \
(self.o['broker'].url.scheme[-1] == 'w' ) :
self.client = paho.mqtt.client.Client( userdata=self, transport="websockets", \
protocol=self.proto_version )
else:
self.client = paho.mqtt.client.Client( userdata=self, \
protocol=self.proto_version )

#self.client = paho.mqtt.client.Client(
# protocol=self.proto_version, userdata=self)

self.client.enable_logger(logger)
self.client.on_connect = MQTT.__pub_on_connect
Expand All @@ -447,13 +468,21 @@ def __putSetup(self):
self.connect_in_progress = True
res = self.client.connect_async(self.o['broker'].url.hostname,
port=self.__sslClientSetup(),
properties=props)
logger.info('connecting to %s, res=%s' %
(self.o['broker'].url.hostname, res))
properties=props)
logger.info('connecting to %s, res=%s' % (self.o['broker'].url.hostname, res))

self.client.loop_start()
self.connected=True
break

while self.connect_in_progress:
time.sleep(0.1)
if self.please_stop:
break
logger.info( f"waiting for connection to {self.o['broker']}")
self.client.loop()

if not self.connect_in_progress:
self.connected=True
break

except Exception as err:
logger.error("failed to {} with {}".format(
Expand Down Expand Up @@ -504,6 +533,8 @@ def getCleanUp(self):
clean_start=False, properties=props )
while self.connect_in_progress:
myclient.loop(0.1)
if self.please_stop:
break
myclient.disconnect()
logger.info('instance deletion for %02d done' % i)

Expand Down Expand Up @@ -635,6 +666,9 @@ def putNewMessage(self,
if not self.connected:
self.__putSetup()

# The caller probably doesn't expect the message to get modified by this method, so use a copy of the message
body = copy.deepcopy(body)

postFormat = body['_format']

if '_deleteOnPost' in body:
Expand Down Expand Up @@ -675,8 +709,9 @@ def putNewMessage(self,
props.ContentType = PostFormat.content_type( postFormat )

try:
raw_body, headers, content_type = PostFormat.exportAny( body, postFormat, [exchange]+self.o['topicPrefix'], self.o )
raw_body, headers, content_type = PostFormat.exportAny( body, postFormat, self.o['topicPrefix'], self.o )
# FIXME: might
logger.critical( f" headers:{headers} format: {postFormat}, pfx: {self.o['topicPrefix']} " )
topic = '/'.join(headers['topic'])

# url-quote wildcard characters in topics.
Expand Down Expand Up @@ -710,6 +745,7 @@ def putNewMessage(self,
logger.info("published mid={} ack_pending={} {} to under: {} ".format(
info.mid, ack_pending, body, topic))
return True #success...
logger.error( f"publish failed {paho.mqtt.client.error_string(info.rc)} ")

except Exception as ex:
logger.error('Exception details: ', exc_info=True)
Expand Down
32 changes: 32 additions & 0 deletions sarracenia/postformat/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,38 @@ def exportAny(msg, post_format='v03', topicPrefix=[ 'v03' ], options={ 'post_for

return None, None, None

def topicDerive(msg, options ) -> list:
"""
Sarracenia standard topic derivation.
https://metpx.github.io/sarracenia/Explanation/Concepts.html#amqp-v09-rabbitmq-settings
"""

if options['broker'].url.scheme.startswith('mqtt'):
if ( 'exchange' in options ) and ( 'topicPrefix' in options ):
if 'exchangeSplit' in options and options['exchangeSplit'] > 1:
idx = sum( bytearray(msg['identity']['value'], 'ascii')) % len(options['exchange'])
exchange = options['exchange'][idx]
else:
exchange = options['exchange'][0]
topic_prefix = [exchange] + options['topicPrefix']
topic_separator='/'
else:
topic_prefix = options['topicPrefix']
topic_separator='.'

if 'topic' in options:
topic = options['topic'].split(topic_separator)
else:
if 'relPath' in msg:
topic = topic_prefix + msg['relPath'].split('/')[0:-1]
else:
topic = topic_prefix

return topic



# test for v04 first, because v03 may claim all other JSON.
import sarracenia.postformat.wis
import sarracenia.postformat.v03
Expand Down
6 changes: 2 additions & 4 deletions sarracenia/postformat/v02.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ def exportMine(body, options) -> (str, dict, str):
given a v03 (internal) message, produce an encoded version.
"""
v2m = v2wrapper.Message(body)

# v2wrapp
for h in [
'pubTime', 'baseUrl', 'fileOp', 'relPath', 'size',
Expand All @@ -148,8 +148,6 @@ def exportMine(body, options) -> (str, dict, str):
if h in v2m.headers:
del v2m.headers[h]

if ( 'broker' in options ) and ( 'exchange' in options ) and \
options['broker'].url.scheme.startswith('mqtt'):
v2m.headers['topic'] = options['exchange'] + v2m.headers['topic']
v2m.headers['topic'] = PostFormat.topicDerive( body, options )

return v2m.notice, v2m.headers, V02.content_type()
19 changes: 1 addition & 18 deletions sarracenia/postformat/v03.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,23 +86,6 @@ def exportMine(body,options) -> (str, dict, str):
"""
raw_body = json.dumps(body)

topic_separator='.'

if options['broker'].url.scheme.startswith('mqtt'):
if ( 'exchange' in options ) and ( 'topicPrefix' in options ):
topic_prefix = options['exchange'] + options['topicPrefix']
topic_separator='/'
else:
topic_prefix = options['topicPrefix']

if 'topic' in options:
topic = options['topic'].split(topic_separator)
else:
if 'relPath' in body:
topic = topic_prefix + body['relPath'].split('/')[0:-1]
else:
topic = topic_prefix

headers = { 'topic': topic }
headers = { 'topic': PostFormat.topicDerive(body,options) }

return raw_body, headers, V03.content_type()

0 comments on commit 8ae69d2

Please sign in to comment.