Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue1023 take2 topicCopy fix #1191

Open
wants to merge 4 commits into
base: development
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions sarracenia/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,7 @@ def copyDict(msg, d):
def deriveSource(msg,o):
"""
set msg['source'] field as appropriate for given message and options (o)
This is used on message receipt to set up fields prior to processing.
"""
source=None
if 'source' in o:
Expand All @@ -518,11 +519,12 @@ def deriveSource(msg,o):
msg['source'] = source
msg['_deleteOnPost'] |= set(['source'])

def deriveTopics(msg,o,topic,separator='.'):
def deriveTopics(msg,o,inbound_topic,separator='.'):
"""
derive subtopic, topicPrefix, and topic fields based on message and options.
derive subtopic, topicPrefix, and inbound_topic fields based on message and options.
This is used on message receipt to set up fields prior to processing.
"""
msg_topic = topic.split(separator)
msg_topic = inbound_topic.split(separator)
# topic validation... deal with DMS topic scheme. https://github.com/MetPX/sarracenia/issues/1017
if 'topicCopy' in o and o['topicCopy']:
topicOverride=True
Expand All @@ -541,7 +543,7 @@ def deriveTopics(msg,o,topic,separator='.'):
topicOverride=True

if topicOverride:
msg['topic'] = topic
msg['topic'] = inbound_topic
msg['_deleteOnPost'] |= set( ['topic'] )


Expand Down
6 changes: 5 additions & 1 deletion sarracenia/moth/amqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,10 @@ def putNewMessage(self,
version = body['_format']


topicOverride=None
if 'topic' in body:
topicOverride=body['topic']

if '_deleteOnPost' in body:
# FIXME: need to delete because building entire JSON object at once.
# makes this routine alter the message. Ideally, would use incremental
Expand Down Expand Up @@ -688,7 +692,7 @@ def putNewMessage(self,
else:
deliv_mode = 2

raw_body, headers, content_type = PostFormat.exportAny( body, version, self.o['topicPrefix'], self.o )
raw_body, headers, content_type = PostFormat.exportAny( body, version, self.o['topicPrefix'], self.o, topicOverride )

topic = '.'.join(headers['topic'])
topic = topic.replace('#', '%23')
Expand Down
14 changes: 7 additions & 7 deletions sarracenia/postformat/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,18 +64,18 @@ def importAny(payload, headers, content_type, options ) -> sarracenia.Message:
pass

@staticmethod
def exportAny(msg, post_format='v03', topicPrefix=[ 'v03' ], options={ 'post_format': 'v03', 'topicPrefix':'v03' } ) -> (str, dict, str):
def exportAny(msg, post_format='v03', topicPrefix=[ 'v03' ], options={ 'post_format': 'v03', 'topicPrefix':'v03' }, topicOverride=None ) -> (str, dict, str):
"""
return a tuple of the encoded message body, a headers dict, and content_type
and a completed topic as a list as one header.
"""
for sc in PostFormat.__subclasses__():
if post_format == sc.__name__.lower():
return sc.exportMine( msg, options )
return sc.exportMine( msg, options, topicOverride )

return None, None, None

def topicDerive(msg, options ) -> list:
def topicDerive(msg, options, topicOverride=None ) -> list:
"""
Sarracenia standard topic derivation.

Expand All @@ -96,11 +96,11 @@ def topicDerive(msg, options ) -> list:
topic_prefix = options['topicPrefix']
topic_separator='.'

if 'topic' in msg:
if type(msg['topic']) is list:
topic = msg['topic']
if topicOverride:
if topicOverride is list:
petersilva marked this conversation as resolved.
Show resolved Hide resolved
topic = topicOverride
else:
topic = msg['topic'].split(topic_separator)
topic = topicOverride.split(topic_separator)
elif 'topic' in options and options['topic'] and (type(options['topic']) is not list):
topic = options['topic'].split(topic_separator)
else:
Expand Down
4 changes: 2 additions & 2 deletions sarracenia/postformat/v02.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ def importMine(body, headers, options) -> sarracenia.Message:
return msg

@staticmethod
def exportMine(body, options) -> (str, dict, str):
def exportMine(body, options, topicOverride=None) -> (str, dict, str):
"""
given a v03 (internal) message, produce an encoded version.
"""
Expand All @@ -158,6 +158,6 @@ def exportMine(body, options) -> (str, dict, str):
if h in v2m.headers:
del v2m.headers[h]

v2m.headers['topic'] = PostFormat.topicDerive( body, options )
v2m.headers['topic'] = PostFormat.topicDerive( body, options, topicOverride )

return v2m.notice, v2m.headers, V02.content_type()
4 changes: 2 additions & 2 deletions sarracenia/postformat/v03.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,12 @@ def importMine(body, headers, options) -> sarracenia.Message:
return msg

@staticmethod
def exportMine(body,options) -> (str, dict, str):
def exportMine(body,options,topicOverride=None) -> (str, dict, str):
"""
given a v03 (internal) message, produce an encoded version.
"""
raw_body = json.dumps(body)

headers = { 'topic': PostFormat.topicDerive(body,options) }
headers = { 'topic': PostFormat.topicDerive(body,options,topicOverride) }

return raw_body, headers, V03.content_type()
9 changes: 6 additions & 3 deletions sarracenia/postformat/wis.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def importMine(body, headers, options) -> sarracenia.Message:
return msg

@staticmethod
def exportMine(body, options) -> (str, dict, str):
def exportMine(body, options, topicOverride=None) -> (str, dict, str):
"""
given a v03 (internal) message, produce an encoded version.
"""
Expand All @@ -125,13 +125,16 @@ def exportMine(body, options) -> (str, dict, str):
if literal in body:
GeoJSONBody[literal] = body[literal]

if 'topic' in body:
topic = body['topic'].split('/')
if topicOverride:
topic=topicOverride
elif 'topic' in options:
topic=options['topic'].split('/')
else:
topic= []

if type(topic) is not list:
topic = topic.split('/')

headers = { 'topic' : topic }
"""
topicPrefix and body['subtopic'] could be used to build a topic...
Expand Down
Loading