From b7c0398d0f78c3bf119caed70a4e255f5669ed79 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Mon, 15 Jan 2024 20:26:50 +0100 Subject: [PATCH] updates to support finegrainedproc eventservice --- pilot/control/data.py | 3 + pilot/control/payload.py | 8 +- .../esprocess/esprocessfinegrainedproc.py | 363 +++++++ .../workexecutor/plugins/baseexecutor.py | 10 +- .../plugins/finegrainedprocexecutor.py | 285 ++++++ .../eventservice/workexecutor/workexecutor.py | 2 + pilot/user/rubin/esprocessfinegrainedproc.py | 890 ++++++++++++++++++ 7 files changed, 1558 insertions(+), 3 deletions(-) create mode 100644 pilot/eventservice/esprocess/esprocessfinegrainedproc.py create mode 100644 pilot/eventservice/workexecutor/plugins/finegrainedprocexecutor.py create mode 100644 pilot/user/rubin/esprocessfinegrainedproc.py diff --git a/pilot/control/data.py b/pilot/control/data.py index 473ca5a0..15d828e4 100644 --- a/pilot/control/data.py +++ b/pilot/control/data.py @@ -828,6 +828,7 @@ def create_log(workdir, logfile_name, tarball_name, cleanup, input_files=[], out """ logger.debug(f'preparing to create log file (debug mode={debugmode})') + logger.debug(f'workdir: {workdir}') # PILOT_HOME is the launch directory of the pilot (or the one specified in pilot options as pilot workdir) pilot_home = os.environ.get('PILOT_HOME', os.getcwd()) @@ -835,6 +836,8 @@ def create_log(workdir, logfile_name, tarball_name, cleanup, input_files=[], out if pilot_home != current_dir: os.chdir(pilot_home) + logger.debug(f'current_dir: {current_dir}') + # copy special files if they exist (could be made experiment specific if there's a need for it) copy_special_files(workdir) diff --git a/pilot/control/payload.py b/pilot/control/payload.py index 73a440dd..79409121 100644 --- a/pilot/control/payload.py +++ b/pilot/control/payload.py @@ -216,8 +216,12 @@ def execute_payloads(queues, traces, args): # noqa: C901 logger.debug(f'job {job.jobid} added to monitored payloads queue') try: - out = open(os.path.join(job.workdir, config.Payload.payloadstdout), 'wb') - err = open(os.path.join(job.workdir, config.Payload.payloadstderr), 'wb') + if job.is_eventservice or job.is_eventservicemerge: + out = open(os.path.join(job.workdir, config.Payload.payloadstdout), 'ab') + err = open(os.path.join(job.workdir, config.Payload.payloadstderr), 'ab') + else: + out = open(os.path.join(job.workdir, config.Payload.payloadstdout), 'wb') + err = open(os.path.join(job.workdir, config.Payload.payloadstderr), 'wb') except Exception as error: logger.warning(f'failed to open payload stdout/err: {error}') out = None diff --git a/pilot/eventservice/esprocess/esprocessfinegrainedproc.py b/pilot/eventservice/esprocess/esprocessfinegrainedproc.py new file mode 100644 index 00000000..641902fc --- /dev/null +++ b/pilot/eventservice/esprocess/esprocessfinegrainedproc.py @@ -0,0 +1,363 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# Authors: +# - Wen Guan, wen.guan@cern.ch, 2023 + +import io +import logging +import os +import time +import threading +import traceback + +from pilot.common.exception import PilotException, MessageFailure, SetupFailure, RunPayloadFailure, UnknownException + + +logger = logging.getLogger(__name__) + +""" +Main process to handle event service. +It makes use of two hooks get_event_ranges_hook and handle_out_message_hook to communicate with other processes when +it's running. The process will handle the logic of Event service independently. +""" + + +class ESProcessFineGrainedProc(threading.Thread): + """ + Main EventService Process. + """ + def __init__(self, payload, waiting_time=30 * 60): + """ + Init ESProcessFineGrainedProc. + + :param payload: a dict of {'executable': , 'output_file': , 'error_file': } + """ + threading.Thread.__init__(self, name='esprocessFineGrainedProc') + + self.__payload = payload + + self.__process = None + + self.get_event_ranges_hook = None + self.handle_out_message_hook = None + + self.__monitor_log_time = None + self.is_no_more_events = False + self.__no_more_event_time = None + self.__waiting_time = waiting_time + self.__stop = threading.Event() + self.__stop_time = 180 + self.pid = None + self.__is_payload_started = False + + self.__ret_code = None + self.setName("ESProcessFineGrainedProc") + self.corecount = 1 + + self.event_ranges_cache = [] + + def is_payload_started(self): + return self.__is_payload_started + + def stop(self, delay=1800): + if not self.__stop.is_set(): + self.__stop.set() + self.__stop_set_time = time.time() + self.__stop_delay = delay + + def get_job_id(self): + if 'job' in self.__payload and self.__payload['job'] and self.__payload['job'].jobid: + return self.__payload['job'].jobid + return '' + + def get_corecount(self): + if 'job' in self.__payload and self.__payload['job'] and self.__payload['job'].corecount: + core_count = int(self.__payload['job'].corecount) + return core_count + return 1 + + def get_file(self, workdir, file_label='output_file', file_name='ES_payload_output.txt'): + """ + Return the requested file. + + :param file_label: + :param workdir: + :return: + """ + + try: + file_type = file # Python 2 + except NameError: + file_type = io.IOBase # Python 3 + + if file_label in self.__payload: + if isinstance(self.__payload[file_label], file_type): + _file_fd = self.__payload[file_label] + else: + _file = self.__payload[file_label] if '/' in self.__payload[file_label] else os.path.join(workdir, self.__payload[file_label]) + _file_fd = open(_file, 'w') + else: + _file = os.path.join(workdir, file_name) + _file_fd = open(_file, 'w') + + return _file_fd + + def get_workdir(self): + """ + Return the workdir. + If the workdir is set but is not a directory, return None. + + :return: workdir (string or None). + :raises SetupFailure: in case workdir is not a directory. + """ + + workdir = '' + if 'workdir' in self.__payload: + workdir = self.__payload['workdir'] + if not os.path.exists(workdir): + os.makedirs(workdir) + elif not os.path.isdir(workdir): + raise SetupFailure('workdir exists but is not a directory') + return workdir + + def get_executable(self, workdir): + """ + Return the executable string. + + :param workdir: work directory (string). + :return: executable (string). + """ + executable = self.__payload['executable'] + executable = self.get_payload_executable(executable) + return 'cd %s; %s' % (workdir, executable) + + def set_get_event_ranges_hook(self, hook): + """ + set get_event_ranges hook. + + :param hook: a hook method to get event ranges. + """ + + self.get_event_ranges_hook = hook + + def get_get_event_ranges_hook(self): + """ + get get_event_ranges hook. + + :returns: The hook method to get event ranges. + """ + + return self.get_event_ranges_hook + + def set_handle_out_message_hook(self, hook): + """ + set handle_out_message hook. + + :param hook: a hook method to handle payload output and error messages. + """ + + self.handle_out_message_hook = hook + + def get_handle_out_message_hook(self): + """ + get handle_out_message hook. + + :returns: The hook method to handle payload output and error messages. + """ + + return self.handle_out_message_hook + + def init(self): + """ + initialize message thread and payload process. + """ + + try: + pass + except Exception as e: + # TODO: raise exceptions + self.__ret_code = -1 + self.stop() + raise e + + def monitor(self): + """ + Monitor whether a process is dead. + + raises: MessageFailure: when the message thread is dead or exited. + RunPayloadFailure: when the payload process is dead or exited. + """ + pass + + def has_running_children(self): + """ + Check whether it has running children + + :return: True if there are alive children, otherwise False + """ + return False + + def is_payload_running(self): + """ + Check whether the payload is still running + + :return: True if the payload is running, otherwise False + """ + return False + + def get_event_ranges(self, num_ranges=None, queue_factor=1): + """ + Calling get_event_ranges hook to get event ranges. + + :param num_ranges: number of event ranges to get. + + :raises: SetupFailure: If get_event_ranges_hook is not set. + MessageFailure: when failed to get event ranges. + """ + if not num_ranges: + num_ranges = self.corecount + + logger.debug('getting event ranges(num_ranges=%s)' % num_ranges) + if not self.get_event_ranges_hook: + raise SetupFailure("get_event_ranges_hook is not set") + + try: + logger.debug('calling get_event_ranges hook(%s) to get event ranges.' % self.get_event_ranges_hook) + event_ranges = self.get_event_ranges_hook(num_ranges, queue_factor=queue_factor) + logger.debug('got event ranges: %s' % event_ranges) + return event_ranges + except Exception as e: + raise MessageFailure("Failed to get event ranges: %s" % e) + + def parse_out_message(self, message): + """ + Parse output or error messages from payload. + + :param message: The message string received from payload. + + :returns: a dict {'id': , 'status': , 'output': , 'cpu': , 'wall': , 'message': } + :raises: PilotExecption: when a PilotException is caught. + UnknownException: when other unknown exception is caught. + """ + + logger.debug('parsing message: %s' % message) + return message + + def handle_out_message(self, message): + """ + Handle output or error messages from payload. + Messages from payload will be parsed and the handle_out_message hook is called. + + :param message: The message string received from payload. + + :raises: SetupFailure: when handle_out_message_hook is not set. + RunPayloadFailure: when failed to handle an output or error message. + """ + + logger.debug('handling out message: %s' % message) + if not self.handle_out_message_hook: + raise SetupFailure("handle_out_message_hook is not set") + + try: + message_status = self.parse_out_message(message) + logger.debug('parsed out message: %s' % message_status) + logger.debug('calling handle_out_message hook(%s) to handle parsed message.' % self.handle_out_message_hook) + self.handle_out_message_hook(message_status) + except Exception as e: + raise RunPayloadFailure("Failed to handle out message: %s" % e) + + def poll(self): + """ + poll whether the process is still running. + + :returns: None: still running. + 0: finished successfully. + others: failed. + """ + return self.__ret_code + + def terminate(self, time_to_wait=1): + """ + Terminate running threads and processes. + + :param time_to_wait: integer, seconds to wait to force kill the payload process. + + :raises: PilotExecption: when a PilotException is caught. + UnknownException: when other unknown exception is caught. + """ + logger.info('terminate running threads and processes.') + try: + self.stop() + except Exception as e: + logger.error('Exception caught when terminating ESProcessFineGrainedProc: %s' % e) + self.__ret_code = -1 + raise UnknownException(e) + + def kill(self): + """ + Terminate running threads and processes. + + :param time_to_wait: integer, seconds to wait to force kill the payload process. + + :raises: PilotException: when a PilotException is caught. + UnknownException: when other unknown exception is caught. + """ + logger.info('terminate running threads and processes.') + try: + self.stop() + except Exception as e: + logger.error('Exception caught when terminating ESProcessFineGrainedProc: %s' % e) + raise UnknownException(e) + + def clean(self): + """ + Clean left resources + """ + self.stop() + + def run(self): + """ + Main run loops: monitor message thread and payload process. + handle messages from payload and response messages with injecting new event ranges or process outputs. + + :raises: PilotExecption: when a PilotException is caught. + UnknownException: when other unknown exception is caught. + """ + + logger.info('start esprocess with thread ident: %s' % (self.ident)) + logger.debug('initializing') + self.init() + logger.debug('initialization finished.') + + logger.info('starts to main loop') + while self.is_payload_running(): + try: + self.monitor() + time.sleep(0.01) + except PilotException as e: + logger.error('PilotException caught in the main loop: %s, %s' % (e.get_detail(), traceback.format_exc())) + # TODO: define output message exception. If caught 3 output message exception, terminate + self.stop() + except Exception as e: + logger.error('Exception caught in the main loop: %s, %s' % (e, traceback.format_exc())) + # TODO: catch and raise exceptions + # if catching dead process exception, terminate. + self.stop() + break + self.clean() + logger.debug('main loop finished') diff --git a/pilot/eventservice/workexecutor/plugins/baseexecutor.py b/pilot/eventservice/workexecutor/plugins/baseexecutor.py index 9279fd94..54b4268d 100644 --- a/pilot/eventservice/workexecutor/plugins/baseexecutor.py +++ b/pilot/eventservice/workexecutor/plugins/baseexecutor.py @@ -56,6 +56,8 @@ def __init__(self, **kwargs): self.proc = None + self.current_dir = os.getcwd() + def get_pid(self): return self.proc.pid if self.proc else None @@ -68,13 +70,17 @@ def is_payload_started(self): return False def start(self): - super(BaseExecutor, self).start() self.communication_manager = CommunicationManager() self.communication_manager.start() + super(BaseExecutor, self).start() def stop(self): if not self.is_stop(): self.__stop.set() + if self.communication_manager: + self.communication_manager.stop() + os.chdir(self.current_dir) + logger.info("change current dir from %s to %s" % (os.getcwd(), self.current_dir)) def is_stop(self): return self.__stop.is_set() @@ -92,7 +98,9 @@ def set_payload(self, payload): self.__is_set_payload = True job = self.get_job() if job and job.workdir: + current_dir = os.getcwd() os.chdir(job.workdir) + logger.info("change current dir from %s to %s" % (current_dir, job.workdir)) def is_set_payload(self): return self.__is_set_payload diff --git a/pilot/eventservice/workexecutor/plugins/finegrainedprocexecutor.py b/pilot/eventservice/workexecutor/plugins/finegrainedprocexecutor.py new file mode 100644 index 00000000..b78b904a --- /dev/null +++ b/pilot/eventservice/workexecutor/plugins/finegrainedprocexecutor.py @@ -0,0 +1,285 @@ +#!/usr/bin/env python +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# Authors: +# - Wen Guan, wen.guan@cern.ch, 2023 - 2024 + +import json +import os +import time +import traceback + +from pilot.common.errorcodes import ErrorCodes + +from .baseexecutor import BaseExecutor + +import logging +logger = logging.getLogger(__name__) + +errors = ErrorCodes() + +""" +FineGrainedProc Executor with one process to manage EventService +""" + + +class FineGrainedProcExecutor(BaseExecutor): + def __init__(self, **kwargs): + super(FineGrainedProcExecutor, self).__init__(**kwargs) + self.setName("FineGrainedProcExecutor") + + self.__queued_out_messages = [] + self.__stageout_failures = 0 + self.__max_allowed_stageout_failures = 20 + self.__last_stageout_time = None + self.__all_out_messages = [] + + self.proc = None + self.exit_code = None + + def is_payload_started(self): + return self.proc.is_payload_started() if self.proc else False + + def get_pid(self): + return self.proc.pid if self.proc else None + + def get_exit_code(self): + return self.exit_code + + def update_finished_event_ranges(self, out_messagess, output_file, fsize, checksum, storage_id): + """ + Update finished event ranges + + :param out_messages: messages from AthenaMP. + :param output_file: output file name. + :param fsize: file size. + :param adler32: checksum (adler32) of the file. + :param storage_id: the id of the storage. + """ + + if len(out_messagess) == 0: + return + + event_ranges = [] + for out_msg in out_messagess: + event_ranges.append({"eventRangeID": out_msg['id'], "eventStatus": 'finished'}) + event_range_status = {"zipFile": {"numEvents": len(event_ranges), + "objstoreID": storage_id, + "lfn": os.path.basename(output_file), + "fsize": fsize, + "pathConvention": 1000}, + "eventRanges": event_ranges} + for checksum_key in checksum: + event_range_status["zipFile"][checksum_key] = checksum[checksum_key] + event_range_message = {'version': 1, 'eventRanges': json.dumps([event_range_status])} + self.update_events(event_range_message) + + job = self.get_job() + job.nevents += len(event_ranges) + + def update_failed_event_ranges(self, out_messagess): + """ + Update failed event ranges + + :param out_messages: messages from AthenaMP. + """ + + if len(out_messagess) == 0: + return + + event_ranges = [] + for message in out_messagess: + status = message['status'] if message['status'] in ['failed', 'fatal'] else 'failed' + # ToBeFixed errorCode + event_ranges.append({"errorCode": errors.UNKNOWNPAYLOADFAILURE, "eventRangeID": message['id'], "eventStatus": status}) + event_range_message = {'version': 0, 'eventRanges': json.dumps(event_ranges)} + self.update_events(event_range_message) + + def update_terminated_event_ranges(self, out_messagess): + """ + Update terminated event ranges + + :param out_messages: messages from AthenaMP. + """ + + if len(out_messagess) == 0: + return + + event_ranges = [] + finished_events = 0 + for message in out_messagess: + if message['status'] in ['failed', 'fatal', 'finished', 'running', 'transferring']: + status = message['status'] + if message['status'] in ['finished']: + finished_events += 1 + else: + logger.warn("status is unknown for messages, set it running: %s" % str(message)) + status = 'running' + error_code = message.get("error_code", None) + if status in ["failed", "fatal"] and error_code is None: + error_code = errors.UNKNOWNPAYLOADFAILURE + error_diag = message.get("error_diag") + + event_range = {"eventRangeID": message['id'], "eventStatus": status, "errorCode": error_code, "errorDiag": error_diag} + event_ranges.append(event_range) + event_range_message = {'version': 0, 'eventRanges': json.dumps(event_ranges)} + self.update_events(event_range_message) + + job = self.get_job() + job.nevents += finished_events + + def handle_out_message(self, message): + """ + Handle ES output or error messages hook function for tests. + + :param message: a dict of parsed message. + For 'finished' event ranges, it's {'id': , 'status': 'finished', 'output': , 'cpu': , + 'wall': , 'message': }. + Fro 'failed' event ranges, it's {'id': , 'status': 'failed', 'message': }. + """ + + logger.info(f"handling out message: {message}") + + self.__all_out_messages.append(message) + + self.__queued_out_messages.append(message) + + def stageout_es(self, force=False): + """ + Stage out event service outputs. + When pilot fails to stage out a file, the file will be added back to the queue for staging out next period. + """ + + job = self.get_job() + if len(self.__queued_out_messages): + if force or self.__last_stageout_time is None or (time.time() > self.__last_stageout_time + job.infosys.queuedata.es_stageout_gap): + + out_messages = [] + while len(self.__queued_out_messages) > 0: + out_messages.append(self.__queued_out_messages.pop()) + + if out_messages: + self.__last_stageout_time = time.time() + self.update_terminated_event_ranges(out_messages) + + def clean(self): + """ + Clean temp produced files + """ + + for msg in self.__all_out_messages: + if msg['status'] in ['failed', 'fatal']: + pass + elif 'output' in msg: + try: + logger.info(f"removing ES pre-merge file: {msg['output']}") + os.remove(msg['output']) + except Exception as exc: + logger.error(f"failed to remove file({msg['output']}): {exc}") + self.__queued_out_messages = [] + self.__stageout_failures = 0 + self.__last_stageout_time = None + self.__all_out_messages = [] + + if self.proc: + self.proc.stop() + while self.proc.is_alive(): + time.sleep(0.1) + + self.stop_communicator() + self.stop() + + def get_esprocess_finegrainedproc(self, payload): + # get the payload command from the user specific code + try: + pilot_user = os.environ.get('PILOT_USER', 'generic').lower() + esprocessfinegrainedproc = __import__(f'pilot.user.{pilot_user}.esprocessfinegrainedproc', + globals(), locals(), [pilot_user], 0) + proc = esprocessfinegrainedproc.ESProcessFineGrainedProc(payload) + return proc + except Exception as ex: + logger.warn("use specific ESProcessFineGrainedProc does not exist. Using the pilot.eventservice.esprocess.esprocessfinegrainedproc: " + str(ex)) + from pilot.eventservice.esprocess.esprocessfinegrainedproc import ESProcessFineGrainedProc + proc = ESProcessFineGrainedProc(payload) + return proc + + def run(self): + """ + Initialize and run ESProcess. + """ + + try: + logger.info("starting ES FineGrainedProcExecutor with thread identifier: %s" % (self.ident)) + if self.is_set_payload(): + payload = self.get_payload() + elif self.is_retrieve_payload(): + payload = self.retrieve_payload() + else: + logger.error("payload is not set, is_retrieve_payload is also not set - no payloads") + self.exit_code = -1 + return + + logger.info(f"payload: {payload}") + logger.info("starting ESProcessFineGrainedProc") + proc = self.get_esprocess_finegrainedproc(payload) + self.proc = proc + logger.info("ESProcessFineGrainedProc initialized") + + proc.set_get_event_ranges_hook(self.get_event_ranges) + proc.set_handle_out_message_hook(self.handle_out_message) + + logger.info('ESProcessFineGrainedProc starts to run') + proc.start() + logger.info('ESProcessFineGrainedProc started to run') + + iteration = 0 + while proc.is_alive(): + iteration += 1 + if self.is_stop(): + logger.info(f'stop is set -- stopping process pid={proc.pid}') + proc.stop() + break + self.stageout_es() + + # have we passed the threshold for failed stage-outs? + if self.__stageout_failures >= self.__max_allowed_stageout_failures: + logger.warning(f'too many stage-out failures ({self.__max_allowed_stageout_failures})') + logger.info(f'stopping process pid={proc.pid}') + proc.stop() + break + + exit_code = proc.poll() + if iteration % 60 == 0: + logger.info(f'running: iteration={iteration} pid={proc.pid} exit_code={exit_code}') + time.sleep(5) + + while proc.is_alive(): + time.sleep(1) + logger.info("ESProcess finished") + + self.stageout_es(force=True) + self.clean() + self.exit_code = proc.poll() + logger.info("ESProcess exit_code: %s" % self.exit_code) + + except Exception as exc: + logger.error(f'execute payload failed: {exc}, {traceback.format_exc()}') + self.clean() + self.exit_code = -1 + + logger.info('ES fine grained proc executor finished') diff --git a/pilot/eventservice/workexecutor/workexecutor.py b/pilot/eventservice/workexecutor/workexecutor.py index f9c27182..25f0daa1 100644 --- a/pilot/eventservice/workexecutor/workexecutor.py +++ b/pilot/eventservice/workexecutor/workexecutor.py @@ -75,6 +75,8 @@ def get_plugin_confs(self): plugin_confs = {'class': 'pilot.eventservice.workexecutor.plugins.hammercloudexecutor.HammerCloudExecutor'} elif self.args['executor_type'] == 'mpi': # network-less plugin_confs = {'class': 'pilot.eventservice.workexecutor.plugins.mpiexecutor.MPIExecutor'} + elif self.args['executor_type'] == 'fineGrainedProc': + plugin_confs = {'class': 'pilot.eventservice.workexecutor.plugins.finegrainedprocexecutor.FineGrainedProcExecutor'} else: plugin_confs = {'class': 'pilot.eventservice.workexecutor.plugins.genericexecutor.GenericExecutor'} diff --git a/pilot/user/rubin/esprocessfinegrainedproc.py b/pilot/user/rubin/esprocessfinegrainedproc.py new file mode 100644 index 00000000..c3363952 --- /dev/null +++ b/pilot/user/rubin/esprocessfinegrainedproc.py @@ -0,0 +1,890 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# Authors: +# - Wen Guan, wen.guan@cern.ch, 2023 - 2024 + +import base64 +import io +import json +import logging +import os +import queue +import re +import signal +import time +import threading +import traceback +from concurrent import futures +from typing import Any + +# from pilot.util.auxiliary import set_pilot_state +from pilot.util.filehandling import read_file +from pilot.common.errorcodes import ErrorCodes +from pilot.common.exception import PilotException, MessageFailure, SetupFailure, RunPayloadFailure +from pilot.util.container import execute + + +logger = logging.getLogger(__name__) +errors = ErrorCodes() + +""" +Main process to handle event service. +It makes use of two hooks get_event_ranges_hook and handle_out_message_hook to communicate with other processes when +it's running. The process will handle the logic of Event service independently. +""" + + +class ESRunnerThreadPool(futures.ThreadPoolExecutor): + def __init__(self, max_workers=None, thread_name_prefix='', initializer=None, initargs=()): + self.futures = {} + self.outputs = {} + self._lock = threading.RLock() + self.max_workers = max_workers + super(ESRunnerThreadPool, self).__init__(max_workers=max_workers, + thread_name_prefix=thread_name_prefix, + initializer=initializer, + initargs=initargs) + + def submit(self, fn, *args, **kwargs): + future = super(ESRunnerThreadPool, self).submit(fn, *args, **kwargs) + return future + + def run_event(self, fn, event): + future = super(ESRunnerThreadPool, self).submit(fn, event) + with self._lock: + self.futures[event['eventRangeID']] = {'event': event, 'future': future} + + def scan(self): + with self._lock: + for event_range_id in list(self.futures.keys()): + event_future = self.futures[event_range_id] + future = event_future['future'] + if future.done(): + result = future.result() + self.outputs[event_range_id] = {'event': self.futures[event_range_id]['event'], 'result': result} + del self.futures[event_range_id] + + def get_outputs(self): + outputs = [] + with self._lock: + for event_range_id in self.outputs: + outputs.append(self.outputs[event_range_id]['result']) + self.outputs = {} + return outputs + + def get_max_workers(self): + return self.max_workers + + def get_num_running_workers(self): + return len(list(self.futures.keys())) + + def has_free_workers(self): + return self.get_num_workers() < self.max_workers + + def get_num_free_workers(self): + return self.max_workers - self.get_num_running_workers() + + +class ESProcessFineGrainedProc(threading.Thread): + """ + Main EventService Process. + """ + def __init__(self, payload, waiting_time=30 * 60): + """ + Init ESProcessFineGrainedProc. + + :param payload: a dict of {'executable': , 'output_file': , 'error_file': } + """ + threading.Thread.__init__(self, name='esprocessFineGrainedProc') + + self.__payload = payload + + self.__thread_pool = None + + self.get_event_ranges_hook = None + self.handle_out_message_hook = None + + self.__monitor_log_time = None + self.is_no_more_events = False + self.__no_more_event_time = None + self.__waiting_time = waiting_time + self.__stop = threading.Event() + self.__stop_time = 180 + self.pid = None + self.__is_payload_started = False + + self.__ret_code = None + self.setName("ESProcessFineGrainedProc") + self.corecount = 1 + self.event_execution_time = None + + self.rubin_es_map = {} + + self._worker_id = -1 + self._lock = threading.RLock() + + def __del__(self): + if self.__thread_pool: + del self.__thread_pool + + def is_payload_started(self): + return self.__is_payload_started + + def stop(self, delay=1800): + if not self.__stop.is_set(): + self.__stop.set() + self.__stop_set_time = time.time() + self.__stop_delay = delay + self.close_logs() + self.__thread_pool.shutdown(wait=False) + + def get_job_id(self): + if 'job' in self.__payload and self.__payload['job'] and self.__payload['job'].jobid: + return self.__payload['job'].jobid + return '' + + def get_job(self): + if 'job' in self.__payload and self.__payload['job']: + return self.__payload['job'] + return None + + def get_transformation(self): + if 'job' in self.__payload and self.__payload['job'] and self.__payload['job'].transformation: + return self.__payload['job'].transformation + return None + + def get_corecount(self): + try: + if os.environ.get("RUBIN_ES_CORES", None) is not None: + rubin_es_cores = int(os.environ.get("RUBIN_ES_CORES")) + return rubin_es_cores + except Exception as ex: + logger.warn("RUBIN_ES_CORES is not defined correctly: %s" % str(ex)) + + if 'job' in self.__payload and self.__payload['job'] and self.__payload['job'].corecount: + core_count = int(self.__payload['job'].corecount) + return core_count + return 1 + + def get_file(self, workdir, file_label='output_file', file_name='payload.stdout'): + """ + Return the requested file. + + :param file_label: + :param workdir: + :return: + """ + + try: + file_type = file # Python 2 + except NameError: + file_type = io.IOBase # Python 3 + + if file_label in self.__payload: + if isinstance(self.__payload[file_label], file_type): + _file_fd = self.__payload[file_label] + else: + _file = self.__payload[file_label] if '/' in self.__payload[file_label] else os.path.join(workdir, self.__payload[file_label]) + _file_fd = open(_file, 'w') + else: + _file = os.path.join(workdir, file_name) + _file_fd = open(_file, 'w') + + return _file_fd + + def get_workdir(self): + """ + Return the workdir. + If the workdir is set but is not a directory, return None. + + :return: workdir (string or None). + :raises SetupFailure: in case workdir is not a directory. + """ + + workdir = '' + if 'workdir' in self.__payload: + workdir = self.__payload['workdir'] + if not os.path.exists(workdir): + os.makedirs(workdir) + elif not os.path.isdir(workdir): + raise SetupFailure('workdir exists but is not a directory') + return workdir + + def get_executable(self, workdir): + """ + Return the executable string. + + :param workdir: work directory (string). + :return: executable (string). + """ + executable = self.__payload['executable'] + # return 'cd %s; %s' % (workdir, executable) + return executable + + def init_logs(self): + workdir = self.get_workdir() + # logger.info("payload: %s", str(self.__payload)) + output_file_fd = self.get_file(workdir, file_label='output_file', file_name='payload.stdout') + error_file_fd = self.get_file(workdir, file_label='error_file', file_name='payload.stderr') + + self.stdout_queue = queue.Queue() + self.stderr_queue = queue.Queue() + self.stdout_file = output_file_fd + self.stderr_file = error_file_fd + + logger.info("stdout_file: %s; stderr_file: %s" % (self.stdout_file, self.stderr_file)) + + realtime_log_files = os.environ.get('REALTIME_LOGFILES', None) + realtime_log_files = re.split('[:,]', realtime_log_files) + # realtime_log_files = [os.path.join(event_dir, f) for f in realtime_log_files] + self.realtime_log_queues = {} + self.realtime_log_files = {} + for realtime_log_file in realtime_log_files: + self.realtime_log_queues[realtime_log_file] = queue.Queue() + self.realtime_log_files[realtime_log_file] = self.get_file(workdir, file_label=realtime_log_file, file_name=realtime_log_file) + logger.info("realtime log %s: %s" % (realtime_log_file, self.realtime_log_files[realtime_log_file])) + logger.info("self.realtime_log_queues: %s" % str(self.realtime_log_queues)) + + def write_logs_from_queue(self): + while not self.stdout_queue.empty(): + item = self.stdout_queue.get(block=False) + itemb = item.encode('utf-8') + self.stdout_file.write(itemb) + # logger.debug("write stdout_file: %s" % item) + while not self.stderr_queue.empty(): + item = self.stderr_queue.get(block=False) + itemb = item.encode('utf-8') + self.stderr_file.write(itemb) + # logger.debug("write stderr_file: %s" % item) + + for fd in self.realtime_log_queues: + while not self.realtime_log_queues[fd].empty(): + item = self.realtime_log_queues[fd].get(block=False) + self.realtime_log_files[fd].write(json.dumps(item)) + # logger.debug("write realtime log %s: %s" % (fd, item)) + + def close_logs(self): + try: + # cmd = "pwd; ls -ltr" + # execute(cmd, stdout=self.stdout_file, stderr=self.stderr_file, timeout=120) + self.stdout_file.close() + self.stderr_file.close() + for fd in self.realtime_log_files: + self.realtime_log_files[fd].close() + except Exception as ex: + logger.error("Failed to close logs: %s" % str(ex)) + + def set_get_event_ranges_hook(self, hook): + """ + set get_event_ranges hook. + + :param hook: a hook method to get event ranges. + """ + + self.get_event_ranges_hook = hook + + def get_get_event_ranges_hook(self): + """ + get get_event_ranges hook. + + :returns: The hook method to get event ranges. + """ + + return self.get_event_ranges_hook + + def set_handle_out_message_hook(self, hook): + """ + set handle_out_message hook. + + :param hook: a hook method to handle payload output and error messages. + """ + + self.handle_out_message_hook = hook + + def get_handle_out_message_hook(self): + """ + get handle_out_message hook. + + :returns: The hook method to handle payload output and error messages. + """ + + return self.handle_out_message_hook + + def init(self): + """ + initialize message thread and payload process. + """ + + try: + self.init_logs() + self.__thread_pool = ESRunnerThreadPool(max_workers=self.get_corecount(), + thread_name_prefix='ESProcessRunner') + except Exception as e: + # TODO: raise exceptions + self.__ret_code = -1 + self.stop() + raise e + + def try_get_events(self, num_free_workers): + events = [] + if num_free_workers: + queue_factor = 1 + if self.event_execution_time and self.event_execution_time < 10 * 60: # 10 minutes + queue_factor = int(10 * 60 / self.event_execution_time) + events = self.get_event_ranges(num_ranges=num_free_workers, queue_factor=queue_factor) + if not events: + self.is_no_more_events = True + self.__no_more_event_time = time.time() + return events + + def get_event_dir(self, event_range_id): + work_dir = self.get_workdir() + event_dir = os.path.join(work_dir, event_range_id) + if not os.path.exists(event_dir): + os.makedirs(event_dir) + return event_dir + + def get_env_item(self, env, str_item): + items = str_item.replace(" ", ";").split(";") + for item in items: + if env in item: + return item.replace(env, "") + return None + + def get_event_range_map_info(self): + executable = self.get_executable(self.get_workdir()) + exec_list = executable.split(" ") + es_map_env, es_map_file = None, None + for exec_item in exec_list: + new_exec_item = None + if self.is_base64(exec_item): + new_exec_item = self.decode_base64(exec_item) + else: + new_exec_item = exec_item + + if "RUBIN_ES_MAP_FILE=" in new_exec_item: + es_map_file = self.get_env_item("RUBIN_ES_MAP_FILE=", new_exec_item) + if "RUBIN_ES_MAP=" in new_exec_item: + es_map_env = self.get_env_item("RUBIN_ES_MAP=", new_exec_item) + + self.rubin_es_map = {} + if es_map_file: + try: + with open(es_map_file) as f: + rubin_es_map_from_file_content = json.load(f) + self.rubin_es_map.update(rubin_es_map_from_file_content) + except Exception as ex: + logger.error("failed to load RUBIN_ES_MAP_FILE: %s" % str(ex)) + if es_map_env: + try: + rubin_es_map_from_env = json.loads(es_map_env) + self.rubin_es_map.update(rubin_es_map_from_env) + except Exception as ex: + logger.error("failed to load RUBIN_ES_MAP: %s" % str(ex)) + + def get_event_range_file_map(self, event): + if not self.rubin_es_map: + self.get_event_range_map_info() + # input_file = self.__payload['job'].input_file + # return {input_file: event['eventRangeID']} + # label = input_file.split(":")[0] + + lfn = event['LFN'] + label = lfn.split(":")[1] + input_file = lfn.split(":")[2] + input_file_name = label + ":" + input_file + event_base_index = int(input_file.split("_")[1]) + event_index = int(event['startEvent']) + event_abs_index = str(event_base_index + event_index - 1) + if label in self.rubin_es_map and event_abs_index in self.rubin_es_map[label]: + return {input_file_name: self.rubin_es_map[label][event_abs_index]} + return {input_file_name: input_file_name + "^" + str(event_index)} + + def is_base64(self, sb): + try: + if isinstance(sb, str): + sb_bytes = bytes(sb, 'ascii') + elif isinstance(sb, bytes): + sb_bytes = sb + else: + return False + return base64.b64encode(base64.b64decode(sb_bytes)) == sb_bytes + except Exception: + # logger.error("is_base64 %s: %s" % (sb, ex)) + return False + + def decode_base64(self, sb): + try: + if isinstance(sb, str): + sb_bytes = bytes(sb, 'ascii') + elif isinstance(sb, bytes): + sb_bytes = sb + else: + return sb + return base64.b64decode(sb_bytes).decode("utf-8") + except Exception as ex: + logger.error("decode_base64 %s: %s" % (sb, ex)) + return sb + + def encode_base64(self, sb): + try: + if isinstance(sb, str): + sb_bytes = bytes(sb, 'ascii') + elif isinstance(sb, bytes): + sb_bytes = sb + return base64.b64encode(sb_bytes).decode("utf-8") + except Exception as ex: + logger.error("encode_base64 %s: %s" % (sb, ex)) + return sb + + def replace_executable(self, executable, event_range_file_map): + exec_list = executable.split(" ") + new_exec_list = [] + for exec_item in exec_list: + new_exec_item = None + if self.is_base64(exec_item): + new_exec_item = self.decode_base64(exec_item) + for input_file in event_range_file_map: + new_exec_item = new_exec_item.replace(input_file, event_range_file_map[input_file]) + new_exec_item = self.encode_base64(new_exec_item) + else: + new_exec_item = exec_item + for input_file in event_range_file_map: + new_exec_item = new_exec_item.replace(input_file, event_range_file_map[input_file]) + new_exec_list.append(new_exec_item) + return " ".join(new_exec_list) + + def get_event_executable(self, event_dir, event): + executable = self.get_executable(event_dir) + event_range_file_map = self.get_event_range_file_map(event) + executable = self.replace_executable(executable, event_range_file_map) + # executable = "cd " + event_dir + "; " + executable + + transformation = self.get_transformation() + base_transformation = os.path.basename(transformation) + + executable = "cp -f " + base_transformation + " " + event_dir + "; cd " + event_dir + "; " + executable + + stdout_filename = os.path.join(event_dir, "payload.stdout") + stderr_filename = os.path.join(event_dir, "payload.stderr") + + stdout_file = open(stdout_filename, 'a') + stderr_file = open(stderr_filename, 'a') + realtime_log_files = os.environ.get('REALTIME_LOGFILES', None) + realtime_log_files = re.split('[:,]', realtime_log_files) + realtime_log_files = [os.path.join(event_dir, f) for f in realtime_log_files] + return executable, stdout_file, stderr_file, stdout_filename, stderr_filename, realtime_log_files + + def get_worker_id(self): + worker_id = None + with self._lock: + self._worker_id += 1 + worker_id = self._worker_id + return worker_id + + def open_log_file(self, filename, perm='r'): + if os.path.exists(filename): + fd = open(filename, perm) + fd.seek(0) + return fd + return None + + def redirect_logs(self, graceful_stop, worker_id, stdout_filename, stderr_filename, realtime_log_files, event_dir): # noqa C901 + stdout_file = None + stderr_file = None + realtime_logs = {} + for rt in realtime_log_files: + realtime_logs[rt] = None + # logger.debug("self.realtime_log_queues: %s" % str(self.realtime_log_queues)) + while not graceful_stop.is_set(): + try: + if stdout_file is None: + stdout_file = self.open_log_file(stdout_filename) + if stderr_file is None: + stderr_file = self.open_log_file(stderr_filename) + for rt in realtime_logs: + if realtime_logs[rt] is None: + realtime_logs[rt] = self.open_log_file(rt) + + if stdout_file: + # logger.debug("stdout_file location: %s" % stdout_file.tell()) + lines = stdout_file.readlines() + for line in lines: + line = "Worker %s: " % worker_id + line + self.stdout_queue.put(line) + if stderr_file: + lines = stderr_file.readlines() + for line in lines: + line = "Worker %s: " % worker_id + line + self.stderr_queue.put(line) + for rt in realtime_logs: + if realtime_logs[rt]: + lines = realtime_logs[rt].readlines() + rt_base = os.path.basename(rt) + for line in lines: + try: + line = json.loads(line) + line.update({'worker_id': worker_id}) + except Exception: + line = "Worker %s: " % worker_id + line + self.realtime_log_queues[rt_base].put(line) + + time.sleep(0.1) + except Exception as ex: + logger.warn(ex) + logger.debug(traceback.format_exc()) + + try: + # cmd = "cd %s; pwd; ls -ltr" % event_dir + # ls_status, ls_stdout, ls_stderr = execute(cmd, timeout=120) + # logger.info("list files status: %s, output: %s, error: %s" % (ls_status, ls_stdout, ls_stderr)) + + if stdout_file is None: + stdout_file = self.open_log_file(stdout_filename) + if stderr_file is None: + stderr_file = self.open_log_file(stderr_filename) + for rt in realtime_logs: + if realtime_logs[rt] is None: + realtime_logs[rt] = self.open_log_file(rt) + + if stdout_file: + lines = stdout_file.readlines() + for line in lines: + line = "Worker %s: " % worker_id + line + self.stdout_queue.put(line) + stdout_file.close() + if stderr_file: + lines = stderr_file.readlines() + for line in lines: + line = "Worker %s: " % worker_id + line + self.stderr_queue.put(line) + stderr_file.close() + for rt in realtime_logs: + if realtime_logs[rt]: + lines = realtime_logs[rt].readlines() + rt_base = os.path.basename(rt) + for line in lines: + try: + line = json.loads(line) + line.update({'worker_id': worker_id}) + except Exception: + line = "Worker %s: " % worker_id + line + self.realtime_log_queues[rt_base].put(line) + realtime_logs[rt].close() + except Exception as ex: + logger.warn(ex) + logger.debug(traceback.format_exc()) + + def wait_graceful(self, proc: Any) -> int: + """ + Wait for payload process to finish. + + :param proc: subprocess object (Any) + :return: exit code (int). + """ + breaker = False + exit_code = None + iteration = 0 + while True: + time.sleep(0.1) + + iteration += 1 + for _ in range(60): + if self.__stop.is_set(): + breaker = True + logger.info(f'breaking -- sending SIGTERM to pid={proc.pid}') + os.killpg(os.getpgid(proc.pid), signal.SIGTERM) + break + exit_code = proc.poll() + if exit_code is not None: + break + time.sleep(1) + if breaker: + logger.info(f'breaking -- sleep 3s before sending SIGKILL pid={proc.pid}') + time.sleep(3) + proc.kill() + break + + exit_code = proc.poll() + + if iteration % 10 == 0: + logger.info(f'running: iteration={iteration} pid={proc.pid} exit_code={exit_code}') + if exit_code is not None: + break + else: + continue + + return exit_code + + def run_event(self, event): + time_start = time.time() + ret = {} + worker_id = self.get_worker_id() + log_prefix = "worker_id=%s: " % worker_id + try: + event_range_id = event['eventRangeID'] + logger.info(log_prefix + "start to run event " + str(event_range_id)) + + event_dir = self.get_event_dir(event_range_id) + executable, stdout_file, stderr_file, stdout_filename, stderr_filename, realtime_log_files = self.get_event_executable(event_dir, event) + logger.info(log_prefix + "executable: " + executable) + logger.info(log_prefix + "stdout: " + stdout_filename) + logger.info(log_prefix + "stderr: " + stderr_filename) + + # exit_code, stdout, stderr = execute(executable, workdir=event_dir, returnproc=True, stdout=stdout_file, stderr=stderr_file, + # cwd=event_dir, timeout=7 * 24 * 3600) + # logger.info(log_prefix + "exit_code: " + str(exit_code)) + # logger.info(log_prefix + "stdout: " + str(stdout)) + # logger.info(log_prefix + "stderr: " + str(stderr)) + try: + proc = execute(executable, returnproc=True, stdout=stdout_file, stderr=stderr_file, timeout=7 * 24 * 3600) + except Exception as error: + logger.error(f'could not execute: {error}') + raise Exception(f'could not execute: {error}') + if isinstance(proc, tuple) and not proc[0]: + logger.error('failed to execute payload') + raise Exception('failed to execute payload') + + logger.info(f'started -- pid={proc.pid} executable={executable}') + # job = self.get_job() + # if job: + # job.pid = proc.pid + # job.pgrp = os.getpgid(job.pid) + # set_pilot_state(job=job, state="running") + + # start a thread to redirect stdout/stderr and realtime logging + graceful_stop = threading.Event() + log_redirect_thread = threading.Thread(target=self.redirect_logs, args=(graceful_stop, worker_id, stdout_filename, stderr_filename, realtime_log_files, event_dir)) + log_redirect_thread.start() + + exit_code = self.wait_graceful(proc) + logger.info(log_prefix + "exit_code: " + str(exit_code)) + stdout_file.close() + stderr_file.close() + + cmd = "cd %s; pwd; ls -ltr" % event_dir + ls_status, ls_stdout, ls_stderr = execute(cmd, timeout=120) + logger.info("list files status: %s, output: %s, error: %s" % (ls_status, ls_stdout, ls_stderr)) + + # log_redirect_thread.stop() + time.sleep(2) + logger.info(log_prefix + "stopping log_redirect_thread") + graceful_stop.set() + + diagnostics = None + if exit_code: + logger.warning(f'payload returned exit code={exit_code}') + stdout = read_file(stdout_filename) + stderr = read_file(stderr_filename) + err_msg = errors.extract_stderr_error(stderr) + if err_msg == "": + err_msg = errors.extract_stderr_warning(stderr) + + diagnostics = stderr + stdout if stdout and stderr else 'General payload setup verification error (check setup logs)' + # check for special errors in thw output + exit_code = errors.resolve_transform_error(exit_code, diagnostics) + # diagnostics = errors.format_diagnostics(exit_code, diagnostics) + + diagnostics = errors.format_diagnostics(exit_code, err_msg) + _, diagnostics = errors.add_error_code(exit_code, msg=diagnostics) + if stdout_file: + stdout_file.close() + logger.debug(f'closed {stdout_filename}') + if stderr_file: + stderr_file.close() + logger.debug(f'closed {stderr_filename}') + if exit_code: + self.__ret_code = exit_code + ret = {'id': event_range_id, 'status': 'failed', 'error_code': exit_code, 'error_diag': diagnostics} + else: + ret = {'id': event_range_id, 'status': 'finished', 'error_code': exit_code, 'error_diag': diagnostics} + except Exception as ex: + logger.error(ex) + logger.error(traceback.format_exc()) + ret = {'id': event_range_id, 'status': 'failed', 'error_code': -1, 'error_diag': str(ex)} + self.__ret_code = -1 + + logger.info(log_prefix + "ret: " + str(ret)) + + time_used = time.time() - time_start + logger.info(log_prefix + "time used to process this event: " + str(time_used)) + + ret['wall_time'] = time_used + + if self.event_execution_time is None or self.event_execution_time < time_used: + self.event_execution_time = time_used + logger.info(log_prefix + "max event execution time: " + str(time_used)) + return ret + + def send_terminate_events(self, outputs): + for output in outputs: + self.handle_out_message(output) + + def monitor(self, terminate=False): + """ + Monitor whether a process is dead. + + raises: RunPayloadFailure: when the payload process is dead or exited. + """ + if self.__thread_pool: + self.__thread_pool.scan() + if not terminate: + num_free_workers = self.__thread_pool.get_num_free_workers() + if num_free_workers > 0: + events = self.try_get_events(num_free_workers) + if events: + logger.info("Got %s events: %s" % (len(events), events)) + for event in events: + # self.run_event(event) + self.__thread_pool.run_event(self.run_event, event) + + outputs = self.__thread_pool.get_outputs() + if outputs: + logger.info("Got %s outputs: %s" % (len(outputs), outputs)) + self.send_terminate_events(outputs) + + def get_event_ranges(self, num_ranges=None, queue_factor=1): + """ + Calling get_event_ranges hook to get event ranges. + + :param num_ranges: number of event ranges to get. + + :raises: SetupFailure: If get_event_ranges_hook is not set. + MessageFailure: when failed to get event ranges. + """ + if not num_ranges: + num_ranges = self.corecount + + logger.debug('getting event ranges(num_ranges=%s)' % num_ranges) + if not self.get_event_ranges_hook: + raise SetupFailure("get_event_ranges_hook is not set") + + try: + logger.debug('calling get_event_ranges hook(%s) to get event ranges.' % self.get_event_ranges_hook) + event_ranges = self.get_event_ranges_hook(num_ranges, queue_factor) + logger.debug('got event ranges: %s' % event_ranges) + return event_ranges + except Exception as e: + raise MessageFailure("Failed to get event ranges: %s" % e) + + def parse_out_message(self, message): + """ + Parse output or error messages from payload. + + :param message: The message string received from payload. + + :returns: a dict {'id': , 'status': , 'output': , 'cpu': , 'wall': , 'message': } + :raises: PilotExecption: when a PilotException is caught. + UnknownException: when other unknown exception is caught. + """ + + logger.debug('parsing message: %s' % message) + return message + + def handle_out_message(self, message): + """ + Handle output or error messages from payload. + Messages from payload will be parsed and the handle_out_message hook is called. + + :param message: The message string received from payload. + + :raises: SetupFailure: when handle_out_message_hook is not set. + RunPayloadFailure: when failed to handle an output or error message. + """ + + logger.debug('handling out message: %s' % message) + if not self.handle_out_message_hook: + raise SetupFailure("handle_out_message_hook is not set") + + try: + message_status = self.parse_out_message(message) + logger.debug('parsed out message: %s' % message_status) + logger.debug('calling handle_out_message hook(%s) to handle parsed message.' % self.handle_out_message_hook) + self.handle_out_message_hook(message_status) + except Exception as e: + raise RunPayloadFailure("Failed to handle out message: %s" % e) + + def is_payload_running(self): + """ + Check whether the payload is still running + + :return: True if the payload is running, otherwise False + """ + if (self.__stop.is_set() or self.is_no_more_events) and self.__thread_pool.get_num_running_workers() < 1: + return False + return True + + def poll(self): + """ + poll whether the process is still running. + + :returns: None: still running. + 0: finished successfully. + others: failed. + """ + # if self.is_payload_running(): + # return None + logger.debug("is_alive: %s, ret_code:%s" % (self.is_alive(), self.__ret_code)) + # if self.is_alive(): + # return None + return self.__ret_code + + def clean(self): + """ + Clean left resources + """ + self.stop() + if self.__ret_code is None: + self.__ret_code = 0 + + def run(self): + """ + Main run loops: monitor message thread and payload process. + handle messages from payload and response messages with injecting new event ranges or process outputs. + + :raises: PilotExecption: when a PilotException is caught. + UnknownException: when other unknown exception is caught. + """ + + self.__is_payload_started = True + logger.info('start esprocess with thread ident: %s' % (self.ident)) + logger.debug('initializing') + self.init() + logger.debug('initialization finished.') + + logger.info('starts to main loop') + while self.is_payload_running(): + try: + self.monitor() + self.write_logs_from_queue() + time.sleep(0.01) + except PilotException as e: + logger.error('PilotException caught in the main loop: %s, %s' % (e.get_detail(), traceback.format_exc())) + # TODO: define output message exception. If caught 3 output message exception, terminate + self.stop() + except Exception as e: + logger.error('Exception caught in the main loop: %s, %s' % (e, traceback.format_exc())) + # TODO: catch and raise exceptions + # if catching dead process exception, terminate. + self.stop() + break + logger.info("main loop ends") + self.monitor(terminate=True) + self.write_logs_from_queue() + self.clean() + logger.debug('main loop finished')