Skip to content

Commit

Permalink
Merge pull request #1273 from MetPX/issue1271
Browse files Browse the repository at this point in the history
Issue1271 - Change raw2bulletin renamer to an `after_accept` entry point
  • Loading branch information
petersilva authored Oct 25, 2024
2 parents 7cce208 + 3e6df6f commit 247faba
Show file tree
Hide file tree
Showing 5 changed files with 175 additions and 471 deletions.
15 changes: 8 additions & 7 deletions sarracenia/flowcb/gather/am.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@

import sarracenia
from sarracenia.bulletin import Bulletin
from sarracenia.flowcb.rename.raw2bulletin import Raw2bulletin
import sarracenia.config
from sarracenia.flowcb import FlowCB

Expand All @@ -80,7 +79,6 @@ def __init__(self, options):

super().__init__(options,logger)
self.bulletinHandler = Bulletin()
self.renamer = Raw2bulletin(self.o)

self.url = urllib.parse.urlparse(self.o.sendTo)

Expand Down Expand Up @@ -522,6 +520,9 @@ def gather(self, messageCountMax):
"value":decoded_bulletin
}

# For renamer (to be deleted after rename plugin is called)
msg['isProblem'] = isProblem

# Receiver is looking for raw message.
msg['size'] = len(bulletin)

Expand All @@ -536,11 +537,11 @@ def gather(self, messageCountMax):
ident.update(bulletin)
msg['identity'] = {'method':self.o.identity_method, 'value':ident.value}

# Call renamer
msg = self.renamer.rename(msg,isProblem)
if msg == None:
continue
logger.debug(f"New sarracenia message: {msg}")
# # Call renamer
# msg = self.renamer.rename(msg,isProblem)
# if msg == None:
# continue
# logger.debug(f"New sarracenia message: {msg}")

newmsg.append(msg)

Expand Down
13 changes: 13 additions & 0 deletions sarracenia/flowcb/rename/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@

"""
sarracenia.flowcb.rename modules are ones where the main focus is on the after_accept entry point.
These plugins should be used when the filename is desired to be renamed before the file is worked upon
(downloaded).
Problematic or wrong files should be moved to worklist.rejected to be properly discarded.
"""

pass
166 changes: 87 additions & 79 deletions sarracenia/flowcb/rename/raw2bulletin.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,89 +82,97 @@ def __init__(self,options) :
self.o.add_option('binaryInitialCharacters', 'list', [b'BUFR' , b'GRIB', b'\211PNG'])

# If file was converted, get rid of extensions it had
def rename(self,msg,isProblem):

path = msg['new_dir'] + '/' + msg['new_file']

data = self.bulletinHandler.getData(msg, path)

# AM bulletins that need their filename rewritten with data should only have two chars before the first underscore
# This is in concordance with Sundew logic -> https://github.com/MetPX/Sundew/blob/main/lib/bulletinAm.py#L70-L71
# These messages are still good, so we will add them to the good_msgs list
# if len(filenameFirstChars) != 2 and self.binary:
# good_msgs.append(msg)
# continue

if data == None:
return None

lines = data.split('\n')
#first_line = lines[0].strip('\r')
#first_line = first_line.strip(' ')
#first_line = first_line.strip('\t')
first_line = lines[0].split(' ')

# Build header from bulletin
header = self.bulletinHandler.buildHeader(first_line)
if header == None:
logger.error("Unable to fetch header contents. Skipping message")
return None

# Get the station timestamp from bulletin
if len(header.split('_')) == 2:
ddhhmm = self.bulletinHandler.getTime(data)
if ddhhmm == None:
logger.error("Unable to get julian time.")
else:
ddhhmm = ''

# Get the BBB from bulletin
BBB = self.bulletinHandler.getBBB(first_line)

# Get the station ID from bulletin
stn_id = self.bulletinHandler.getStation(data)

# Generate a sequence (random ints)
seq = self.bulletinHandler.getSequence()



# Rename file with data fetched
try:
# We can't disseminate bulletins downstream if they're missing the timestamp, but we want to keep the bulletins to troubleshoot source problems
# We'll append "_PROBLEM" to the filename to be able to identify erronous bulletins
if ddhhmm == None or isProblem:
timehandler = datetime.datetime.now()

# Add current time as new timestamp to filename
new_file = header + "_" + timehandler.strftime('%d%H%M') + "_" + BBB + "_" + stn_id + "_" + seq + "_PROBLEM"

# Write the file manually as the messages don't get posted downstream.
# The message won't also get downloaded further downstream
msg['new_file'] = new_file
new_path = msg['new_dir'] + '/' + msg['new_file']
def after_accept(self,worklist):

new_worklist = []

for msg in worklist.incoming:
path = msg['new_dir'] + '/' + msg['new_file']

data = self.bulletinHandler.getData(msg, path)

# AM bulletins that need their filename rewritten with data should only have two chars before the first underscore
# This is in concordance with Sundew logic -> https://github.com/MetPX/Sundew/blob/main/lib/bulletinAm.py#L70-L71
# These messages are still good, so we will add them to the good_msgs list
# if len(filenameFirstChars) != 2 and self.binary:
# good_msgs.append(msg)
# continue

if data == None:
logger.error("No data was found. Skipping message")
worklist.rejected.append(msg)
continue

lines = data.split('\n')
#first_line = lines[0].strip('\r')
#first_line = first_line.strip(' ')
#first_line = first_line.strip('\t')
first_line = lines[0].split(' ')

# Build header from bulletin
header = self.bulletinHandler.buildHeader(first_line)
if header == None:
logger.error("Unable to fetch header contents. Skipping message")
worklist.rejected.append(msg)
continue

# Get the station timestamp from bulletin
if len(header.split('_')) == 2:
ddhhmm = self.bulletinHandler.getTime(data)
if ddhhmm == None:
logger.error("Unable to get julian time.")
else:
ddhhmm = ''

# Get the BBB from bulletin
BBB = self.bulletinHandler.getBBB(first_line)

# with open(new_path, 'w') as f: f.write(data)
# Get the station ID from bulletin
stn_id = self.bulletinHandler.getStation(data)

logger.error(f"New filename (for problem file): {new_file}")
elif stn_id == None:
new_file = header + "_" + BBB + "_" + '' + "_" + seq + "_PROBLEM"
logger.error(f"New filename (for problem file): {new_file}")
elif ddhhmm == '':
new_file = header + "_" + BBB + "_" + stn_id + "_" + seq
else:
new_file = header + "_" + ddhhmm + "_" + BBB + "_" + stn_id + "_" + seq
# Generate a sequence (random ints)
seq = self.bulletinHandler.getSequence()

msg['new_file'] = new_file
# We need the rest of the fields to be also updated
del(msg['relPath'])
msg.updatePaths(self.o, msg['new_dir'], msg['new_file'])


logger.info(f"New filename (with path): {msg['relPath']}")
# Rename file with data fetched
try:
# We can't disseminate bulletins downstream if they're missing the timestamp, but we want to keep the bulletins to troubleshoot source problems
# We'll append "_PROBLEM" to the filename to be able to identify erronous bulletins
if ddhhmm == None or msg["isProblem"]:
timehandler = datetime.datetime.now()

return msg
# Add current time as new timestamp to filename
new_file = header + "_" + timehandler.strftime('%d%H%M') + "_" + BBB + "_" + stn_id + "_" + seq + "_PROBLEM"

except Exception as e:
logger.error(f"Error in renaming. Error message: {e}")
# Write the file manually as the messages don't get posted downstream.
# The message won't also get downloaded further downstream
msg['new_file'] = new_file
new_path = msg['new_dir'] + '/' + msg['new_file']

return None
# with open(new_path, 'w') as f: f.write(data)

logger.error(f"New filename (for problem file): {new_file}")
elif stn_id == None:
new_file = header + "_" + BBB + "_" + '' + "_" + seq + "_PROBLEM"
logger.error(f"New filename (for problem file): {new_file}")
elif ddhhmm == '':
new_file = header + "_" + BBB + "_" + stn_id + "_" + seq
else:
new_file = header + "_" + ddhhmm + "_" + BBB + "_" + stn_id + "_" + seq

msg['new_file'] = new_file
# We need the rest of the fields to be also updated
del(msg['relPath'])
# No longer needed
del(msg['isProblem'])
msg.updatePaths(self.o, msg['new_dir'], msg['new_file'])

logger.info(f"New filename (with path): {msg['relPath']}")
new_worklist.append(msg)

except Exception as e:
logger.error(f"Error in renaming. Error message: {e}")
continue

worklist.incoming = new_worklist
87 changes: 67 additions & 20 deletions tests/sarracenia/flowcb/gather/am__gather_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,14 @@ def make_message():
m["_deleteOnPost"] = set()
return m

# NOTE: Need to test filtering as well?
# WorkList = types.SimpleNamespace()
# WorkList.ok = []
# WorkList.incoming = []
# WorkList.rejected = []
# WorkList.failed = []
# WorkList.directories_ok = []
def make_worklist():
WorkList = types.SimpleNamespace()
WorkList.ok = []
WorkList.incoming = []
WorkList.rejected = []
WorkList.failed = []
WorkList.directories_ok = []
return WorkList

# def test___init__():
# BaseOptions = Options()
Expand Down Expand Up @@ -105,10 +106,14 @@ def test_am_binary_bulletin():
message_test1['new_file'] = bulletinHeader + '__12345'
message_test1['new_dir'] = BaseOptions.directory
message_test1['content']['value'] = b64encode(message_test1['content']['value']).decode('ascii')
message_test1["isProblem"] = False

worklist = make_worklist()
worklist.incoming = [message_test1]

# Check renamer.
message_test1 = renamer.rename(message_test1, False)
assert message_test1['new_file'] == 'ISAA41_CYWA_030000___00001'
renamer.after_accept(worklist)
assert worklist.incoming[0]['new_file'] == 'ISAA41_CYWA_030000___00001'


# Test 2: Check a regular CACN bulletin
Expand All @@ -132,10 +137,16 @@ def test_cacn_regular():
new_bulletin, isProblem = am_instance.correctContents(bulletin, firstchars, lines, missing_ahl, station, charset)
assert new_bulletin == b'CACN00 CWAO 021600\nWVO\n100,2024,123,1600,0,100,13.5,5.6,79.4,0.722,11.81,11.74,1.855,6.54,16.76,1544,2.344,14.26,0,375.6,375.6,375.5,375.5,0,11.58,11.24,3.709,13.89,13.16,11.22,11,9.45,11.39,5.033,79.4,0.694,-6999,41.19,5.967,5.887,5.93,6.184,5.64,5.066,5.253,-6999,7.3,0.058,0,5.715,4.569,0,0,1.942,-6999,57.4,0,0.531,-6999,1419,1604,1787,-6999,-6999,-6999,-6999,-6999,1601,-6999,-6999,6,5.921,5.956,6.177,5.643,5.07,5.256,-6999,9.53,11.22,10.09,10.61,125.4,9.1\n'


# Check renamer.
message_test2['content']['value'] = new_bulletin.decode('iso-8859-1')
message_test2 = renamer.rename(message_test2, False)
assert message_test2['new_file'] == 'CACN00_CWAO_021600__WVO_00001'
message_test2["isProblem"] = isProblem

worklist = make_worklist()
worklist.incoming = [message_test2]

renamer.after_accept(worklist)
assert worklist.incoming[0]['new_file'] == 'CACN00_CWAO_021600__WVO_00001'

# Test 3: Check an erronous CACN bulletin (missing timestamp in bulletin contents)
def test_cacn_erronous():
Expand All @@ -161,8 +172,14 @@ def test_cacn_erronous():

# Check renamer.
message_test3['content']['value'] = new_bulletin.decode('iso-8859-1')
message_test3 = renamer.rename(message_test3, False)
assert re.match('CACN00_CWAO_......__WPK_00001_PROBLEM' , message_test3['new_file'])
message_test3["isProblem"] = isProblem

worklist = make_worklist()
worklist.incoming = [message_test3]


renamer.after_accept(worklist)
assert re.match('CACN00_CWAO_......__WPK_00001_PROBLEM' , worklist.incoming[0]['new_file'])

# Test 4: Bulletin with double line separator after header (my-header\n\n)
def test_bulletin_double_linesep():
Expand All @@ -188,7 +205,12 @@ def test_bulletin_double_linesep():

# Check renamer.
message_test4['content']['value'] = message_test4['content']['value'].decode('iso-8859-1')
message_test4 = renamer.rename(message_test4, False)
message_test4["isProblem"] = isProblem

worklist = make_worklist()
worklist.incoming = [message_test4]

renamer.after_accept(worklist)
assert message_test4['new_file'] == 'SXCN35_CWVR_021100___00001'

# Test 5: Bulletin with invalid year in timestamp (Fix: https://github.com/MetPX/sarracenia/pull/973)
Expand All @@ -213,7 +235,12 @@ def test_bulletin_invalid_timestamp(caplog):
assert new_bulletin == b'CACN00 CWAO\nWVO\n100,1024,123,1600,0,100,13.5,5.6,79.4,0.722,11.81,11.74,1.855,6.54,16.76,1544,2.344,14.26,0,375.6,375.6,375.5,375.5,0,11.58,11.24,3.709,13.89,13.16,11.22,11,9.45,11.39,5.033,79.4,0.694,-6999,41.19,5.967,5.887,5.93,6.184,5.64,5.066,5.253,-6999,7.3,0.058,0,5.715,4.569,0,0,1.942,-6999,57.4,0,0.531,-6999,1419,1604,1787,-6999,-6999,-6999,-6999,-6999,1601,-6999,-6999,6,5.921,5.956,6.177,5.643,5.07,5.256,-6999,9.53,11.22,10.09,10.61,125.4,9.1\n'

message_test5['content']['value'] = message_test5['content']['value'].decode('iso-8859-1')
message_test5 = renamer.rename(message_test5, False)
message_test5["isProblem"] = isProblem

worklist = make_worklist()
worklist.incoming = [message_test5]

renamer.after_accept(worklist)
# We want to make sure the proper errors are raised from the logs
assert 'Unable to fetch header contents. Skipping message' in caplog.text and 'Unable to verify year from julian time.' in caplog.text

Expand Down Expand Up @@ -265,7 +292,12 @@ def test_bulletin_wrong_station():

# Check renamer.
message_test7['content']['value'] = message_test7['content']['value'].decode('iso-8859-1')
message_test7 = renamer.rename(message_test7, False)
message_test7["isProblem"] = isProblem

worklist = make_worklist()
worklist.incoming = [message_test7]

renamer.after_accept(worklist)
assert message_test7['new_file'] == 'UECN99_CYCX_071200___00001_PROBLEM'

# Test 8: SM Bulletin - Add station mapping + SM/SI bulletin accomodities
Expand All @@ -291,7 +323,12 @@ def test_SM_bulletin():
assert new_bulletin == b'SMCN06 CWAO 030000\nAAXX 03004\n71816 11324 80313 10004 20003 30255 40318 52018 60031 77177 887//\n333 10017 20004 42001 70118 90983 93101=\n'

message_test8['content']['value'] = new_bulletin.decode('iso-8859-1')
message_test8 = renamer.rename(message_test8, False)
message_test8["isProblem"] = isProblem

worklist = make_worklist()
worklist.incoming = [message_test8]

renamer.after_accept(worklist)
assert message_test8['new_file'] == 'SMCN06_CWAO_030000__71816_00001'

# Test 9: Bulletin with 5 fields in header (invalid)
Expand Down Expand Up @@ -378,7 +415,12 @@ def test_random_bulletin_with_BBB():
assert new_bulletin == b''

message_test12['content']['value'] = bulletin.decode('iso-8859-1')
message_test12 = renamer.rename(message_test12, False)
message_test12["isProblem"] = isProblem

worklist = make_worklist()
worklist.incoming = [message_test12]

renamer.after_accept(worklist)
assert message_test12['new_file'] == 'FXCN06_CYTR_230939_AAA__00001'

# Test 13: SM Bulletin with BBB - Add station mapping + SM/SI bulletin accomodities + conserve BBB header
Expand All @@ -404,5 +446,10 @@ def test_SM_bulletin_with_BBB():
assert new_bulletin == b'SMCN06 CWAO 030000 AAA\nAAXX 03004\n71816 11324 80313 10004 20003 30255 40318 52018 60031 77177 887//\n333 10017 20004 42001 70118 90983 93101=\n'

message_test13['content']['value'] = new_bulletin.decode('iso-8859-1')
message_test13 = renamer.rename(message_test13, False)
assert message_test13['new_file'] == 'SMCN06_CWAO_030000_AAA_71816_00001'
message_test13["isProblem"] = isProblem

worklist = make_worklist()
worklist.incoming = [message_test13]

renamer.after_accept(worklist)
assert message_test13['new_file'] == 'SMCN06_CWAO_030000_AAA_71816_00001'
Loading

0 comments on commit 247faba

Please sign in to comment.