Skip to content

Commit

Permalink
Merge pull request #119 from esseivaju/mtes-fix
Browse files Browse the repository at this point in the history
fixup: mtes arg typo and ES file path
  • Loading branch information
PalNilsson authored Mar 25, 2024
2 parents 405004e + 682b10e commit 7d66f6a
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 13 deletions.
24 changes: 11 additions & 13 deletions pilot/eventservice/esprocess/esprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ def init_yampl_socket(self, executable: str) -> str:
is_mt = "--multithreaded=true" in executable.lower()
if is_ca:
if is_mt:
preexec_socket_config = f" --mtes=True --mtes_channel=\"{socket_name}\" "
preexec_socket_config = f" --mtes=True --mtes-channel=\"{socket_name}\" "
else:
preexec_socket_config = f" --preExec 'ConfigFlags.MP.EventRangeChannel=\"{socket_name}\"' "
else:
Expand Down Expand Up @@ -483,17 +483,7 @@ def parse_out_message(self, message: str) -> dict:
"""
logger.debug(f'parsing message: {message}')
try:
if message.startswith("/"):
parts = message.split(",")
ret = {'output': parts[0]}
parts = parts[1:]
for part in parts:
name, value = part.split(":")
name = name.lower()
ret[name] = value
ret['status'] = 'finished'
return ret
elif message.startswith('ERR'):
if message.startswith('ERR'):
if "ERR_ATHENAMP_PARSE" in message:
pattern = re.compile(r"(ERR\_[A-Z\_]+)\ (.+)\:\ ?(.+)")
found = re.findall(pattern, message)
Expand All @@ -513,7 +503,15 @@ def parse_out_message(self, message: str) -> dict:
ret = {'id': event_range_id, 'status': 'failed', 'message': message}
return ret
else:
raise UnknownException(f"Unknown message {message}")
parts = message.split(",")
ret = {'output': parts[0]}
parts = parts[1:]
for part in parts:
name, value = part.split(":")
name = name.lower()
ret[name] = value
ret['status'] = 'finished'
return ret
except PilotException as e:
raise e
except Exception as e:
Expand Down
2 changes: 2 additions & 0 deletions pilot/eventservice/workexecutor/plugins/raythenaexecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,8 @@ def handle_out_message(self, message: dict):
if message['status'] in ['failed', 'fatal']:
self.update_failed_event_ranges([message])
else:
if 'output' in message:
message['output'] = os.path.join(self.get_job().workdir, message['output'])
self.__queued_out_messages.append(message)

def stageout_es(self, force: bool = False):
Expand Down

0 comments on commit 7d66f6a

Please sign in to comment.