diff --git a/pilot/eventservice/esprocess/esprocess.py b/pilot/eventservice/esprocess/esprocess.py index 6d727497..0b87e70d 100644 --- a/pilot/eventservice/esprocess/esprocess.py +++ b/pilot/eventservice/esprocess/esprocess.py @@ -162,7 +162,7 @@ def init_yampl_socket(self, executable: str) -> str: is_mt = "--multithreaded=true" in executable.lower() if is_ca: if is_mt: - preexec_socket_config = f" --mtes=True --mtes_channel=\"{socket_name}\" " + preexec_socket_config = f" --mtes=True --mtes-channel=\"{socket_name}\" " else: preexec_socket_config = f" --preExec 'ConfigFlags.MP.EventRangeChannel=\"{socket_name}\"' " else: @@ -483,17 +483,7 @@ def parse_out_message(self, message: str) -> dict: """ logger.debug(f'parsing message: {message}') try: - if message.startswith("/"): - parts = message.split(",") - ret = {'output': parts[0]} - parts = parts[1:] - for part in parts: - name, value = part.split(":") - name = name.lower() - ret[name] = value - ret['status'] = 'finished' - return ret - elif message.startswith('ERR'): + if message.startswith('ERR'): if "ERR_ATHENAMP_PARSE" in message: pattern = re.compile(r"(ERR\_[A-Z\_]+)\ (.+)\:\ ?(.+)") found = re.findall(pattern, message) @@ -513,7 +503,15 @@ def parse_out_message(self, message: str) -> dict: ret = {'id': event_range_id, 'status': 'failed', 'message': message} return ret else: - raise UnknownException(f"Unknown message {message}") + parts = message.split(",") + ret = {'output': parts[0]} + parts = parts[1:] + for part in parts: + name, value = part.split(":") + name = name.lower() + ret[name] = value + ret['status'] = 'finished' + return ret except PilotException as e: raise e except Exception as e: diff --git a/pilot/eventservice/workexecutor/plugins/raythenaexecutor.py b/pilot/eventservice/workexecutor/plugins/raythenaexecutor.py index 47e639c3..53fd52f1 100644 --- a/pilot/eventservice/workexecutor/plugins/raythenaexecutor.py +++ b/pilot/eventservice/workexecutor/plugins/raythenaexecutor.py @@ -183,6 +183,8 @@ def handle_out_message(self, message: dict): if message['status'] in ['failed', 'fatal']: self.update_failed_event_ranges([message]) else: + if 'output' in message: + message['output'] = os.path.join(self.get_job().workdir, message['output']) self.__queued_out_messages.append(message) def stageout_es(self, force: bool = False):