Skip to content

Commit

Permalink
Merge branch 'development' into development_py36
Browse files Browse the repository at this point in the history
  • Loading branch information
petersilva committed Oct 30, 2024
2 parents 6b2be6b + 3d49841 commit b17b686
Show file tree
Hide file tree
Showing 19 changed files with 106 additions and 30 deletions.
8 changes: 8 additions & 0 deletions debian/changelog
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
metpx-sr3 (3.00.56rc3) unstable; urgency=medium

* fix #1278 regression: ack failure loops forever.
* fix #1271 AM charset issues, made some plugins more flexible.
* fix #1266 complain when explicitly asked to start disabled config.

-- SSC-5CD2310S60 <[email protected]> Wed, 30 Oct 2024 12:25:14 -0400

metpx-sr3 (3.00.56rc2) unstable; urgency=medium

* fix #1261 http performance regression.
Expand Down
3 changes: 2 additions & 1 deletion sarracenia/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1019,10 +1019,11 @@ def getContent(msg,options=None):

# inlined/embedded case.
if 'content' in msg:
logger.info("Getting msg from inline'd content")
if msg['content']['encoding'] == 'base64':
return b64decode(msg['content']['value'])
else:
return msg['content']['value'].encode('utf-8')
return msg['content']['value'].encode('utf-8') if not hasattr(options,'inputCharset') else msg['content']['value'].encode(options.inputCharset)

path=''
if msg['baseUrl'].startswith('file:'):
Expand Down
8 changes: 5 additions & 3 deletions sarracenia/bulletin.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ class Bulletin:
from sarracenia.bulletin import Bulletin
"""

def __init__(self):
def __init__(self,options):
super().__init__()
self.o = options
self.seq = 0
self.binary = 0

Expand Down Expand Up @@ -125,7 +127,7 @@ def getData(self, msg, path):
try:

self.binary = 0
if msg['content']:
if 'content' in msg:
data = msg['content']['value']

# Change from b64. We want to get the header from the raw binary data. Not retrievable in b64 format
Expand Down Expand Up @@ -339,4 +341,4 @@ def getTime(self, data):
ddHHMM = time.strftime('%d%H%M', timeStruct)
return ddHHMM
except Exception as e:
return None
return None
2 changes: 1 addition & 1 deletion sarracenia/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1761,7 +1761,7 @@ def parse_line(self, component, cfg, cfname, lineno, l ):
# specify a second queue with different bindings... so this warning could be complaining about something
# that is correct. but in every current case, the warning will be helpful.
if ( k == 'queueName' ) and self.subtopic_seen:
logger.warning( f"queueName usually should be before subtopic in configs: subtopic to default queue" )
logger.warning( f"{','.join(self.files)}:{lineno} queueName usually should be before subtopic in configs: subtopic to default queue" )
if ( k == 'directory' ) and not self.download:
logger.info( f"{','.join(self.files)}:{lineno} if download is false, directory has no effect" )

Expand Down
4 changes: 4 additions & 0 deletions sarracenia/examples/subscribe/dd_2mqtt.conf
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,8 @@ post_exchange xpublic

directory /tmp/dd_2mqt

# new topic... in 2025
subtopic *.WXO-DD.bulletins.#

# old topics likely replaced by above in 2025
subtopic bulletins.#
4 changes: 4 additions & 0 deletions sarracenia/examples/subscribe/dd_amis.conf
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ instances 5
# expire, in operational use, should be longer than longest expected interruption
expire 10m

# new topic... in 2025
subtopic *.WXO-DD.bulletins.alphanumeric.#

# old topics likely replaced by above in 2025
subtopic bulletins.alphanumeric.#

directory /tmp/dd_amis
4 changes: 4 additions & 0 deletions sarracenia/examples/subscribe/dd_aqhi.conf
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ instances 2
# valeur basse bonne pour essais initial, valeur haute (1d == 1 jour) pour les opérations.
expire 10m

# new topic... in 2025
subtopic *.WXO-DD.air_quality.aqhi.#

# old topics likely replaced by above in 2025
subtopic air_quality.aqhi.#
directory /tmp/dd_aqhi

4 changes: 4 additions & 0 deletions sarracenia/examples/subscribe/dd_cacn_bulletins.conf
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ instances 2

expire 10m

# new topic... in 2025
subtopic *.WXO-DD.bulletins.alphanumeric.*.CA.*.#

# old topics likely replaced by above in 2025
subtopic bulletins.alphanumeric.*.CA.*.#
directory /tmp/cacn_bulletins
accept .*CACN45.*
Expand Down
4 changes: 4 additions & 0 deletions sarracenia/examples/subscribe/dd_citypage.conf
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ instances 2
# durée de vie du file d´attente sur le serveur. Pour usage opérationnel, augmentez a 1d (1 jour.)
expire 10m

# new topic... in 2025
subtopic *.WXO-DD.citypage_weather.#

# old topics likely replaced by above in 2025
subtopic citypage_weather.#
#subtopic citypage_weather.xml.YT.#

Expand Down
4 changes: 4 additions & 0 deletions sarracenia/examples/subscribe/dd_cmml.conf
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ instances 2
# le file est enlevé.
expire 10m

# new topic... in 2025
subtopic *.WXO-DD.meteocode.*.cmml.#

# old topics likely replaced by above in 2025
subtopic meteocode.*.cmml.#

directory /tmp/dd_cmml
Expand Down
4 changes: 4 additions & 0 deletions sarracenia/examples/subscribe/dd_gdps.conf
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,9 @@ instances 5
expire 10m
#expire, in operational use, use 1d (1 day) as it needs to be longer than the longest interruption in downloads we want to tolerate without dropping downloads.

# new topic... in 2025
subtopic *.WXO-DD.model_gem_global.25km.grib2.#

# old topics likely replaced by above in 2025
subtopic model_gem_global.25km.grib2.#
directory /tmp/dd_gdps
4 changes: 4 additions & 0 deletions sarracenia/examples/subscribe/dd_radar.conf
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,8 @@ expire 10m
directory /tmp/dd_radar


# new topic... in 2025
subtopic *.WXO-DD.radar.CAPPI.GIF.XAM.#

# old topics likely replaced by above in 2025
subtopic radar.CAPPI.GIF.XAM.#
4 changes: 4 additions & 0 deletions sarracenia/examples/subscribe/dd_rdps.conf
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,9 @@ expire 10m
# suggest 1d (1 day.)


# new topic... in 2025
subtopic *.WXO-DD.model_gem_regional.10km.grib2.#

# old topics likely replaced by above in 2025
subtopic model_gem_regional.10km.grib2.#
director /tmp/dd_rdps
5 changes: 5 additions & 0 deletions sarracenia/examples/subscribe/dd_swob.conf
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ expire 10m
#expire, in operations, needs to be longer than the longest expected interruption

# All stations

# new topic... in 2025
subtopic *.WXO-DD.observations.swob-ml.#

# old topics likely replaced by above in 2025
subtopic observations.swob-ml.#

directory /tmp/dd_swob
Expand Down
2 changes: 1 addition & 1 deletion sarracenia/flowcb/gather/am.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class Am(FlowCB):
def __init__(self, options):

super().__init__(options,logger)
self.bulletinHandler = Bulletin()
self.bulletinHandler = Bulletin(self.o)

self.url = urllib.parse.urlparse(self.o.sendTo)

Expand Down
43 changes: 26 additions & 17 deletions sarracenia/flowcb/rename/raw2bulletin.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def __init__(self,options) :
super().__init__(options,logger)
self.seq = 0
self.binary = 0
self.bulletinHandler = Bulletin()
self.bulletinHandler = Bulletin(self.o)
# Need to redeclare these options to have their default values be initialized.
self.o.add_option('inputCharset', 'str', 'utf-8')
self.o.add_option('binaryInitialCharacters', 'list', [b'BUFR' , b'GRIB', b'\211PNG'])
Expand All @@ -87,18 +87,23 @@ def after_accept(self,worklist):
new_worklist = []

for msg in worklist.incoming:
path = msg['new_dir'] + '/' + msg['new_file']

data = self.bulletinHandler.getData(msg, path)
# If called by a sarra, should always have post_baseDir, so should be OK in specifying it
path = self.o.post_baseDir + '/' + msg['relPath']

# AM bulletins that need their filename rewritten with data should only have two chars before the first underscore
# This is in concordance with Sundew logic -> https://github.com/MetPX/Sundew/blob/main/lib/bulletinAm.py#L70-L71
# These messages are still good, so we will add them to the good_msgs list
# if len(filenameFirstChars) != 2 and self.binary:
# good_msgs.append(msg)
# continue
data = msg.getContent(self.o)

if data == None:
# Determine if bulletin is binary or not
# From sundew source code
if data.splitlines()[1][:4] in self.o.binaryInitialCharacters:
# Decode data, only text. The raw binary data contains the header in which we're interested. Only get that header.
data = data.splitlines()[0].decode('ascii')
else:
# Data is not binary
data = data.decode(self.o.inputCharset)


if not data:
logger.error("No data was found. Skipping message")
worklist.rejected.append(msg)
continue
Expand Down Expand Up @@ -133,13 +138,16 @@ def after_accept(self,worklist):
# Generate a sequence (random ints)
seq = self.bulletinHandler.getSequence()


# Assign a default value for messages not coming from AM
if 'isProblem' not in msg:
msg['isProblem'] = False


# Rename file with data fetched
try:
# We can't disseminate bulletins downstream if they're missing the timestamp, but we want to keep the bulletins to troubleshoot source problems
# We'll append "_PROBLEM" to the filename to be able to identify erronous bulletins
if ddhhmm == None or msg["isProblem"]:
if ddhhmm == None or msg['isProblem']:
timehandler = datetime.datetime.now()

# Add current time as new timestamp to filename
Expand All @@ -162,13 +170,14 @@ def after_accept(self,worklist):
new_file = header + "_" + ddhhmm + "_" + BBB + "_" + stn_id + "_" + seq

msg['new_file'] = new_file
# We need the rest of the fields to be also updated
del(msg['relPath'])

# No longer needed
del(msg['isProblem'])
msg.updatePaths(self.o, msg['new_dir'], msg['new_file'])
if 'isProblem' in msg:
del(msg['isProblem'])

# msg.updatePaths(self.o, msg['new_dir'], msg['new_file'])

logger.info(f"New filename (with path): {msg['relPath']}")
logger.info(f"New filename: {msg['new_file']}")
new_worklist.append(msg)

except Exception as e:
Expand Down
12 changes: 5 additions & 7 deletions sarracenia/moth/amqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -603,13 +603,11 @@ def ack(self, m: sarracenia.Message) -> None:
except Exception as err:
logger.warning("failed for tag: %s: %s" % (m['ack_id'], err))
logger.debug('Exception details: ', exc_info=True)
if type(err) == BrokenPipeError or type(err) == ConnectionResetError:
# Cleanly close partially broken connection
self.close()
# No point in trying to ack again if the connection is broken
del m['ack_id']
m['_deleteOnPost'].remove('ack_id')
return False
# No point in trying to ack again if the connection is broken
del m['ack_id']
m['_deleteOnPost'].remove('ack_id')
self.close()
return False

if ebo < 60:
ebo *= 2
Expand Down
15 changes: 15 additions & 0 deletions sarracenia/sr.py
Original file line number Diff line number Diff line change
Expand Up @@ -2233,6 +2233,21 @@ def start(self):
if len(self.leftovers) > 0 and not self._action_all_configs:
logging.error( f"{self.leftovers} configuration not found" )
return

has_disabled_config = False

# if any configs are disabled, don't start any
if not self._action_all_configs:
for f in self.filtered_configurations:
(c, cfg) = f.split(os.sep)

if self.configs[c][cfg]['status'] == 'disabled':
has_disabled_config = True
logger.error(f"Config {c}/{cfg} is disabled. It must be enabled before starting.")

if has_disabled_config:
logger.error("No configs have been started due to disabled configurations.")
return

pcount = 0
for f in self.filtered_configurations:
Expand Down
2 changes: 2 additions & 0 deletions tests/sarracenia/flowcb/gather/am__gather_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def __init__(self):
self.fileAgeMax = 0
self.post_baseUrl = "http://localhost/"
self.post_format = "v02"
self.post_baseDir = "/this/path/is/fake"

def add_option(self, option, type, default = None):
if not hasattr(self, option):
Expand All @@ -49,6 +50,7 @@ def make_message():
m["to_clusters"] = "localhost"
m["baseUrl"] = "https://NotARealURL"
m["post_baseUrl"] = "https://NotARealURL"
m["post_baseDir"] = "/this/path/is/fake"
m["relPath"] = "ThisIsAPath/To/A/File.txt"
m["_deleteOnPost"] = set()
return m
Expand Down

0 comments on commit b17b686

Please sign in to comment.