From 4230ef457ed93ceddd9cafcfd78520a24bc0f163 Mon Sep 17 00:00:00 2001 From: Peter Silva Date: Mon, 26 Aug 2024 15:59:34 -0400 Subject: [PATCH 1/4] fix 1023 topicCopy not working (perhaps... not tested yet.) --- sarracenia/__init__.py | 10 ++++++---- sarracenia/moth/amqp.py | 6 +++++- sarracenia/postformat/__init__.py | 14 +++++++------- sarracenia/postformat/v02.py | 4 ++-- sarracenia/postformat/v03.py | 4 ++-- sarracenia/postformat/wis.py | 9 ++++++--- 6 files changed, 28 insertions(+), 19 deletions(-) diff --git a/sarracenia/__init__.py b/sarracenia/__init__.py index 5e8624218..148d1268f 100755 --- a/sarracenia/__init__.py +++ b/sarracenia/__init__.py @@ -500,6 +500,7 @@ def copyDict(msg, d): def deriveSource(msg,o): """ set msg['source'] field as appropriate for given message and options (o) + This is used on message receipt to set up fields prior to processing. """ source=None if 'source' in o: @@ -518,11 +519,12 @@ def deriveSource(msg,o): msg['source'] = source msg['_deleteOnPost'] |= set(['source']) - def deriveTopics(msg,o,topic,separator='.'): + def deriveTopics(msg,o,inbound_topic,separator='.'): """ - derive subtopic, topicPrefix, and topic fields based on message and options. + derive subtopic, topicPrefix, and inbound_topic fields based on message and options. + This is used on message receipt to set up fields prior to processing. """ - msg_topic = topic.split(separator) + msg_topic = inbound_topic.split(separator) # topic validation... deal with DMS topic scheme. https://github.com/MetPX/sarracenia/issues/1017 if 'topicCopy' in o and o['topicCopy']: topicOverride=True @@ -541,7 +543,7 @@ def deriveTopics(msg,o,topic,separator='.'): topicOverride=True if topicOverride: - msg['topic'] = topic + msg['topic'] = inbound_topic msg['_deleteOnPost'] |= set( ['topic'] ) diff --git a/sarracenia/moth/amqp.py b/sarracenia/moth/amqp.py index 9b23d11d2..46c5ad6f4 100755 --- a/sarracenia/moth/amqp.py +++ b/sarracenia/moth/amqp.py @@ -644,6 +644,10 @@ def putNewMessage(self, version = body['_format'] + topicOverride=None + if 'topic' in body: + topicOverride=body['topic'] + if '_deleteOnPost' in body: # FIXME: need to delete because building entire JSON object at once. # makes this routine alter the message. Ideally, would use incremental @@ -688,7 +692,7 @@ def putNewMessage(self, else: deliv_mode = 2 - raw_body, headers, content_type = PostFormat.exportAny( body, version, self.o['topicPrefix'], self.o ) + raw_body, headers, content_type = PostFormat.exportAny( body, version, self.o['topicPrefix'], self.o, topicOverride ) topic = '.'.join(headers['topic']) topic = topic.replace('#', '%23') diff --git a/sarracenia/postformat/__init__.py b/sarracenia/postformat/__init__.py index 64fa55d19..172e71d2c 100644 --- a/sarracenia/postformat/__init__.py +++ b/sarracenia/postformat/__init__.py @@ -64,18 +64,18 @@ def importAny(payload, headers, content_type, options ) -> sarracenia.Message: pass @staticmethod - def exportAny(msg, post_format='v03', topicPrefix=[ 'v03' ], options={ 'post_format': 'v03', 'topicPrefix':'v03' } ) -> (str, dict, str): + def exportAny(msg, post_format='v03', topicPrefix=[ 'v03' ], options={ 'post_format': 'v03', 'topicPrefix':'v03' }, topicOverride=None ) -> (str, dict, str): """ return a tuple of the encoded message body, a headers dict, and content_type and a completed topic as a list as one header. """ for sc in PostFormat.__subclasses__(): if post_format == sc.__name__.lower(): - return sc.exportMine( msg, options ) + return sc.exportMine( msg, options, topicOverride ) return None, None, None - def topicDerive(msg, options ) -> list: + def topicDerive(msg, options, topicOverride=None ) -> list: """ Sarracenia standard topic derivation. @@ -96,11 +96,11 @@ def topicDerive(msg, options ) -> list: topic_prefix = options['topicPrefix'] topic_separator='.' - if 'topic' in msg: - if type(msg['topic']) is list: - topic = msg['topic'] + if topicOverride in msg: + if topicOverride is list: + topic = topicOverride else: - topic = msg['topic'].split(topic_separator) + topic = topicOverride.split(topic_separator) elif 'topic' in options and options['topic'] and (type(options['topic']) is not list): topic = options['topic'].split(topic_separator) else: diff --git a/sarracenia/postformat/v02.py b/sarracenia/postformat/v02.py index 92b523a6d..e9c18d1f5 100644 --- a/sarracenia/postformat/v02.py +++ b/sarracenia/postformat/v02.py @@ -144,7 +144,7 @@ def importMine(body, headers, options) -> sarracenia.Message: return msg @staticmethod - def exportMine(body, options) -> (str, dict, str): + def exportMine(body, options, topicOverride=None) -> (str, dict, str): """ given a v03 (internal) message, produce an encoded version. """ @@ -158,6 +158,6 @@ def exportMine(body, options) -> (str, dict, str): if h in v2m.headers: del v2m.headers[h] - v2m.headers['topic'] = PostFormat.topicDerive( body, options ) + v2m.headers['topic'] = PostFormat.topicDerive( body, options, topicOverride ) return v2m.notice, v2m.headers, V02.content_type() diff --git a/sarracenia/postformat/v03.py b/sarracenia/postformat/v03.py index cef765705..06dba2235 100644 --- a/sarracenia/postformat/v03.py +++ b/sarracenia/postformat/v03.py @@ -93,12 +93,12 @@ def importMine(body, headers, options) -> sarracenia.Message: return msg @staticmethod - def exportMine(body,options) -> (str, dict, str): + def exportMine(body,options,topicOverride=None) -> (str, dict, str): """ given a v03 (internal) message, produce an encoded version. """ raw_body = json.dumps(body) - headers = { 'topic': PostFormat.topicDerive(body,options) } + headers = { 'topic': PostFormat.topicDerive(body,options,topicOverride) } return raw_body, headers, V03.content_type() diff --git a/sarracenia/postformat/wis.py b/sarracenia/postformat/wis.py index f5851edc1..068271484 100644 --- a/sarracenia/postformat/wis.py +++ b/sarracenia/postformat/wis.py @@ -115,7 +115,7 @@ def importMine(body, headers, options) -> sarracenia.Message: return msg @staticmethod - def exportMine(body, options) -> (str, dict, str): + def exportMine(body, options, topicOverride=None) -> (str, dict, str): """ given a v03 (internal) message, produce an encoded version. """ @@ -125,13 +125,16 @@ def exportMine(body, options) -> (str, dict, str): if literal in body: GeoJSONBody[literal] = body[literal] - if 'topic' in body: - topic = body['topic'].split('/') + if topicOverride: + topic=topicOverride elif 'topic' in options: topic=options['topic'].split('/') else: topic= [] + if type(topic) is not list: + topic = topic.split('/') + headers = { 'topic' : topic } """ topicPrefix and body['subtopic'] could be used to build a topic... From d72b92056704d67894a3822022020dd5e1b1fbd3 Mon Sep 17 00:00:00 2001 From: Peter Silva Date: Tue, 27 Aug 2024 11:22:12 -0400 Subject: [PATCH 2/4] ugh found by Reid... working now --- sarracenia/postformat/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sarracenia/postformat/__init__.py b/sarracenia/postformat/__init__.py index 172e71d2c..24367b827 100644 --- a/sarracenia/postformat/__init__.py +++ b/sarracenia/postformat/__init__.py @@ -96,7 +96,7 @@ def topicDerive(msg, options, topicOverride=None ) -> list: topic_prefix = options['topicPrefix'] topic_separator='.' - if topicOverride in msg: + if topicOverride: if topicOverride is list: topic = topicOverride else: From 7367b5263e92fac23ce3f272bdcf5941adf65502 Mon Sep 17 00:00:00 2001 From: Peter Silva Date: Tue, 27 Aug 2024 13:48:44 -0400 Subject: [PATCH 3/4] correct and simplify topic listification --- sarracenia/postformat/__init__.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/sarracenia/postformat/__init__.py b/sarracenia/postformat/__init__.py index 24367b827..60239b2ac 100644 --- a/sarracenia/postformat/__init__.py +++ b/sarracenia/postformat/__init__.py @@ -97,19 +97,19 @@ def topicDerive(msg, options, topicOverride=None ) -> list: topic_separator='.' if topicOverride: - if topicOverride is list: - topic = topicOverride - else: - topic = topicOverride.split(topic_separator) - elif 'topic' in options and options['topic'] and (type(options['topic']) is not list): - topic = options['topic'].split(topic_separator) + topic = topicOverride + elif 'topic' in options and options['topic']: + topic = options['topic'] + elif 'relPath' in msg: + topic = topic_prefix + msg['relPath'].split('/')[0:-1] + elif 'subtopic' in msg: + topic = topic_prefix + msg['subtopic'] else: - if 'relPath' in msg: - topic = topic_prefix + msg['relPath'].split('/')[0:-1] - elif 'subtopic' in msg: - topic = topic_prefix + msg['subtopic'] - else: - topic = topic_prefix + topic = topic_prefix + + if type(topic) is not list: + topic = topic.split(topic_separator) + return topic From bca8bb9a74a042e70a72d7f41403dda68b2a6306 Mon Sep 17 00:00:00 2001 From: Peter Silva Date: Wed, 18 Sep 2024 16:42:04 -0400 Subject: [PATCH 4/4] if topic is malformed, just do not set anything --- sarracenia/__init__.py | 22 +++++++--------------- 1 file changed, 7 insertions(+), 15 deletions(-) diff --git a/sarracenia/__init__.py b/sarracenia/__init__.py index 148d1268f..652bc99e7 100755 --- a/sarracenia/__init__.py +++ b/sarracenia/__init__.py @@ -526,25 +526,17 @@ def deriveTopics(msg,o,inbound_topic,separator='.'): """ msg_topic = inbound_topic.split(separator) # topic validation... deal with DMS topic scheme. https://github.com/MetPX/sarracenia/issues/1017 - if 'topicCopy' in o and o['topicCopy']: - topicOverride=True - else: - topicOverride=False - if 'relPath' in msg: - path_topic = o['topicPrefix'] + os.path.dirname(msg['relPath']).split('/') - - if msg_topic != path_topic: - topicOverride=True - - # set subtopic if possible. - if msg_topic[0:len(o['topicPrefix'])] == o['topicPrefix']: - msg['subtopic'] = msg_topic[len(o['topicPrefix']):] - else: - topicOverride=True + topicOverride = 'topicCopy' in o and o['topicCopy'] if topicOverride: msg['topic'] = inbound_topic msg['_deleteOnPost'] |= set( ['topic'] ) + else: + # if the topic isn't as expected by Sarracenia, leave subtopic unset. + # set subtopic if possible. + if msg_topic[0:len(o['topicPrefix'])] == o['topicPrefix']: + msg['subtopic'] = msg_topic[len(o['topicPrefix']):] + msg['_deleteOnPost'] |= set(['subtopic']) def dumps(msg) -> str: