diff --git a/PILOTVERSION b/PILOTVERSION index ca88450d..bb737956 100644 --- a/PILOTVERSION +++ b/PILOTVERSION @@ -1 +1 @@ -3.7.2.4 \ No newline at end of file +3.7.3.84 \ No newline at end of file diff --git a/pilot.py b/pilot.py index 6f710d06..915e3c8c 100755 --- a/pilot.py +++ b/pilot.py @@ -36,13 +36,13 @@ from pilot.common.errorcodes import ErrorCodes from pilot.common.exception import PilotException -from pilot.util.config import config from pilot.info import infosys from pilot.util.auxiliary import ( pilot_version_banner, shell_exit_code, convert_signal_to_exit_code ) +from pilot.util.config import config from pilot.util.constants import ( get_pilot_version, SUCCESS, @@ -53,6 +53,10 @@ SERVER_UPDATE_NOT_DONE, PILOT_MULTIJOB_START_TIME, ) +from pilot.util.cvmfs import ( + is_cvmfs_available, + get_last_update +) from pilot.util.filehandling import ( get_pilot_work_dir, mkdirs, @@ -71,8 +75,12 @@ from pilot.util.networking import dump_ipv6_info from pilot.util.processgroups import find_defunct_subprocesses from pilot.util.timing import add_to_pilot_timing +from pilot.util.workernode import get_node_name errors = ErrorCodes() +mainworkdir = "" +args = None +trace = None def main() -> int: @@ -112,6 +120,15 @@ def main() -> int: "started", args.queue, args.url, args.port, logger, "IPv6" ) # note: assuming IPv6, fallback in place + # check cvmfs if available + if is_cvmfs_available() is True: # ignore None, False is handled in function + timestamp = get_last_update() + if timestamp and timestamp > 0: + logger.info('CVMFS has been validated') + else: + logger.warning('CVMFS is not responding - aborting pilot') + return errors.CVMFSISNOTALIVE + if not args.rucio_host: args.rucio_host = config.Rucio.host @@ -798,6 +815,7 @@ def send_worker_status( data["harvesterID"] = os.environ.get("HARVESTER_ID", None) data["status"] = status data["site"] = queue + data["node_id"] = get_node_name() # attempt to send the worker info to the server if data["workerID"] and data["harvesterID"]: diff --git a/pilot/api/analytics.py b/pilot/api/analytics.py index d25a237c..34ca9ef6 100644 --- a/pilot/api/analytics.py +++ b/pilot/api/analytics.py @@ -59,8 +59,8 @@ def fit(self, x: list, y: list, model: str = "linear") -> Any: """ try: self._fit = Fit(x=x, y=y, model=model) - except Exception as e: - raise UnknownException(e) + except Exception as exc: + raise UnknownException(exc) from exc return self._fit @@ -71,12 +71,10 @@ def slope(self) -> float: :raises NotDefined: exception thrown if fit is not defined. :return: slope (float). """ - if self._fit: - slope = self._fit.slope() - else: + if not self._fit: raise NotDefined("Fit has not been defined") - return slope + return self._fit.slope() def intersect(self) -> float: """ @@ -85,12 +83,10 @@ def intersect(self) -> float: :raises NotDefined: exception thrown if fit is not defined :return: intersect (float). """ - if self._fit: - intersect = self._fit.intersect() - else: + if not self._fit: raise NotDefined("Fit has not been defined") - return intersect + return self._fit.intersect() def chi2(self) -> float: """ @@ -99,12 +95,10 @@ def chi2(self) -> float: :raises NotDefined: exception thrown if fit is not defined :return: chi2 (float). """ - if self._fit: - x2 = self._fit.chi2() - else: + if not self._fit: raise NotDefined("Fit has not been defined") - return x2 + return self._fit.chi2() def get_table(self, filename: str, header: str = "", separator: str = "\t", convert_to_float: bool = True) -> dict: """ @@ -139,7 +133,8 @@ def get_fitted_data( :return: {"slope": slope, "chi2": chi2} (dict). """ slope = "" - chi2 = "" + intersect = "" + _chi2 = "" table = self.get_table(filename) if table: @@ -198,24 +193,19 @@ def get_fitted_data( fit = self.fit(x, y) _slope = self.slope() except Exception as exc: - logger.warning( - "failed to fit data, x=%s, y=%s: %s", str(x), str(y), exc - ) + logger.warning(f"failed to fit data, x={x}, y={y}: {exc}") else: if _slope: - slope = float_to_rounded_string( - fit.slope(), precision=precision - ) - chi2 = float_to_rounded_string(fit.chi2(), precision=precision) + slope = float_to_rounded_string(fit.slope(), precision=precision) + fit.set_intersect() + intersect = float_to_rounded_string(fit.intersect(), precision=precision) + _chi2 = float_to_rounded_string(fit.chi2(), precision=precision) if slope != "": logger.info( - "current memory leak: %s B/s (using %d data points, chi2=%s)", - slope, - len(x), - chi2, + f"current memory leak: {slope} B/s (using {len(x)} data points, chi2={_chi2})" ) - return {"slope": slope, "chi2": chi2} + return {"slope": slope, "chi2": _chi2, "intersect": intersect} def find_limit( self, _x, _y, _chi2_org, norg, change_limit=0.25, edge="right", steps=5 @@ -244,8 +234,8 @@ def find_limit( if change < change_limit: found = True break - else: - _chi2_prev = _chi2 + + _chi2_prev = _chi2 if edge == "right": if not found: @@ -254,13 +244,12 @@ def find_limit( else: limit = len(_x) - 1 logger.info(f"right removable region: {limit}") + elif not found: + limit = 0 + logger.info("left removable region not found") else: - if not found: - limit = 0 - logger.info("left removable region not found") - else: - limit = iterations * 10 - logger.info(f"left removable region: {limit}") + limit = iterations * 10 + logger.info(f"left removable region: {limit}") return limit @@ -293,7 +282,7 @@ def extract_from_table(self, table, x_name, y_name): return x, y -class Fit(object): +class Fit(): """Low-level fitting class.""" _model = "linear" # fitting model @@ -325,15 +314,22 @@ def __init__(self, **kwargs): if len(self._x) != len(self._y): raise NotSameLength("input data (lists) have different lengths") + logger.info(f'model: {self._model}, x: {self._x}, y: {self._y}') # base calculations if self._model == "linear": self._ss = sum_square_dev(self._x) + logger.info("sum of square deviations: %s", self._ss) self._ss2 = sum_dev(self._x, self._y) + logger.info("sum of deviations: %s", self._ss2) self.set_slope() self._xm = mean(self._x) + logger.info("mean x: %s", self._xm) self._ym = mean(self._y) + logger.info("mean y: %s", self._ym) self.set_intersect() + logger.info("intersect: %s", self._intersect) self.set_chi2() + logger.info("chi2: %s", self._chi2) else: logger.warning("'%s' model is not implemented", self._model) raise NotImplementedError() @@ -407,8 +403,10 @@ def set_intersect(self): """ if self._ym and self._slope and self._xm: self._intersect = self._ym - self._slope * self._xm + logger.info("-- intersect: %s", self._intersect) else: self._intersect = None + logger.info("could not calculate intersect") def intersect(self): """ diff --git a/pilot/api/data.py b/pilot/api/data.py index 1905c125..b62df256 100644 --- a/pilot/api/data.py +++ b/pilot/api/data.py @@ -213,6 +213,25 @@ def prepare_inputddms(self, files, activities=None): if not fdat.inputddms and fdat.ddmendpoint: fdat.inputddms = [fdat.ddmendpoint] + def print_replicas(self, replicas, label='unsorted'): + """ + Print replicas. + + :param replicas: list of replicas (Any) + :param label: label (str). + """ + number = 1 + maxnumber = 10 + self.logger.info(f'{label} list of replicas: (max {maxnumber})') + for pfn, xdat in replicas: + self.logger.debug(f"{number}. " + f"lfn={pfn}, " + f"rse={xdat.get('ddmendpoint')}, " + f"domain={xdat.get('domain')}") + number += 1 + if number > maxnumber: + break + @classmethod def sort_replicas(self, replicas, inputddms): """ @@ -242,7 +261,7 @@ def sort_replicas(self, replicas, inputddms): continue xreplicas.append((pfn, xdat)) - return replicas + return xreplicas def resolve_replicas(self, files, use_vp=False): """ @@ -369,7 +388,9 @@ def add_replicas(self, fdat, replica): sorted_replicas = sorted(iter(list(replica.get('pfns', {}).items())), key=lambda x: x[1]['priority']) # prefer replicas from inputddms first + #self.print_replicas(sorted_replicas) xreplicas = self.sort_replicas(sorted_replicas, fdat.inputddms) + self.print_replicas(xreplicas) for pfn, xdat in xreplicas: @@ -494,7 +515,7 @@ def transfer(self, files, activity='default', **kwargs): # noqa: C901 raise PilotException('failed to resolve copytool by preferred activities=%s, acopytools=%s' % (activity, self.acopytools)) - # populate inputddms if need + # populate inputddms if needed self.prepare_inputddms(files) # initialize ddm_activity name for requested files if not set @@ -839,7 +860,9 @@ def transfer_files(self, copytool, files, activity=None, **kwargs): # noqa: C90 # prepare files (resolve protocol/transfer url) if getattr(copytool, 'require_input_protocols', False) and files: - self.require_protocols(files, copytool, activity, local_dir=kwargs.get('input_dir')) + args = kwargs.get('args') + input_dir = kwargs.get('input_dir') if not args else args.input_dir + self.require_protocols(files, copytool, activity, local_dir=input_dir) # mark direct access files with status=remote_io self.set_status_for_direct_access(files, kwargs.get('workdir', '')) diff --git a/pilot/common/errorcodes.py b/pilot/common/errorcodes.py index 86801ce8..4f059612 100644 --- a/pilot/common/errorcodes.py +++ b/pilot/common/errorcodes.py @@ -177,6 +177,7 @@ class ErrorCodes: REMOTEFILEDICTDOESNOTEXIST = 1374 LEASETIME = 1375 LOGCREATIONTIMEOUT = 1376 + CVMFSISNOTALIVE = 1377 _error_messages = { GENERALERROR: "General pilot error, consult batch log", @@ -315,7 +316,8 @@ class ErrorCodes: CERTIFICATEHASEXPIRED: "Certificate has expired", REMOTEFILEDICTDOESNOTEXIST: "Remote file open dictionary does not exist", LEASETIME: "Lease time is up", # internal use only - LOGCREATIONTIMEOUT: "Log file creation timed out" + LOGCREATIONTIMEOUT: "Log file creation timed out", + CVMFSISNOTALIVE: "CVMFS is not responding" } put_error_codes = [1135, 1136, 1137, 1141, 1152, 1181] diff --git a/pilot/control/data.py b/pilot/control/data.py index 121e6263..9c8577ff 100644 --- a/pilot/control/data.py +++ b/pilot/control/data.py @@ -19,7 +19,7 @@ # Authors: # - Mario Lassnig, mario.lassnig@cern.ch, 2016-2017 # - Daniel Drizhuk, d.drizhuk@gmail.com, 2017 -# - Paul Nilsson, paul.nilsson@cern.ch, 2017-2023 +# - Paul Nilsson, paul.nilsson@cern.ch, 2017-2024 # - Wen Guan, wen.guan@cern.ch, 2018 # - Alexey Anisenkov, anisyonk@cern.ch, 2018 @@ -212,6 +212,29 @@ def create_trace_report(job: Any, label: str = 'stage-in') -> Any: return trace_report +def get_stagein_client(job: Any, args: Any, label: str = 'stage-in') -> (Any, str): + """ + Return the proper stage-in client. + + :param job: job object (Any) + :param args: pilot args object (Any) + :param label: 'stage-in' (str) + :return: stage-in client (StageInClient). + """ + # create the trace report + trace_report = create_trace_report(job, label=label) + + if job.is_eventservicemerge: + client = StageInESClient(job.infosys, logger=logger, trace_report=trace_report) + activity = 'es_events_read' + else: + client = StageInClient(job.infosys, logger=logger, trace_report=trace_report, + ipv=args.internet_protocol_version, workdir=job.workdir) + activity = 'pr' + + return client, activity + + def _stage_in(args: Any, job: Any) -> bool: """ Call the stage-in client. @@ -242,10 +265,9 @@ def _stage_in(args: Any, job: Any) -> bool: logger.info('stage-in will be done in a container') try: eventtype, localsite, remotesite = get_trace_report_variables(job, label=label) - pilot.util.middleware.containerise_middleware(job, job.indata, args.queue, eventtype, localsite, remotesite, - job.infosys.queuedata.container_options, args.input_dir, - label=label, container_type=job.infosys.queuedata.container_type.get("middleware"), - rucio_host=args.rucio_host) + pilot.util.middleware.containerise_middleware(job, args, job.indata, eventtype, localsite, remotesite, + job.infosys.queuedata.container_options, label=label, + container_type=job.infosys.queuedata.container_type.get("middleware")) except PilotException as error: logger.warning('stage-in containerisation threw a pilot exception: %s', error) except Exception as error: @@ -256,22 +278,23 @@ def _stage_in(args: Any, job: Any) -> bool: try: logger.info('stage-in will not be done in a container') - # create the trace report - trace_report = create_trace_report(job, label=label) - - if job.is_eventservicemerge: - client = StageInESClient(job.infosys, logger=logger, trace_report=trace_report) - activity = 'es_events_read' - else: - client = StageInClient(job.infosys, logger=logger, trace_report=trace_report, ipv=args.internet_protocol_version, workdir=job.workdir) - activity = 'pr' + client, activity = get_stagein_client(job, args, label) use_pcache = job.infosys.queuedata.use_pcache + # get the proper input file destination (normally job.workdir unless stager workflow) jobworkdir = job.workdir # there is a distinction for mv copy tool on ND vs non-ATLAS workdir = get_proper_input_destination(job.workdir, args.input_destination_dir) - kwargs = dict(workdir=workdir, cwd=job.workdir, usecontainer=False, use_pcache=use_pcache, use_bulk=False, - input_dir=args.input_dir, use_vp=job.use_vp, catchall=job.infosys.queuedata.catchall, - checkinputsize=True, rucio_host=args.rucio_host, jobworkdir=jobworkdir) + kwargs = {'workdir': workdir, + 'cwd': job.workdir, + 'usecontainer': False, + 'use_pcach': use_pcache, + 'use_bulk': False, + 'use_vp': job.use_vp, + 'catchall': job.infosys.queuedata.catchall, + 'checkinputsize': True, + 'rucio_host': args.rucio_host, + 'jobworkdir': jobworkdir, + 'args': args} client.prepare_sources(job.indata) client.transfer(job.indata, activity=activity, **kwargs) except PilotException as error: @@ -819,8 +842,7 @@ def get_tar_timeout(dirsize: float) -> int: return min(timeout, timeout_max) -def _do_stageout(job: Any, xdata: list, activity: list, queue: str, title: str, output_dir: str = '', - rucio_host: str = '', ipv: str = 'IPv6') -> bool: +def _do_stageout(job: Any, args: Any, xdata: list, activity: list, title: str, ipv: str = 'IPv6') -> bool: """ Use the `StageOutClient` in the Data API to perform stage-out. @@ -828,12 +850,10 @@ def _do_stageout(job: Any, xdata: list, activity: list, queue: str, title: str, --rucio-host. :param job: job object (Any) + :param args: pilot args object (Any) :param xdata: list of FileSpec objects (list) :param activity: copytool activity or preferred list of activities to resolve copytools (list) - :param queue: PanDA queue (str) :param title: type of stage-out (output, log) (str) - :param output_dir: optional output directory (str) - :param rucio_host: optional rucio host (str) :param ipv: internet protocol version (str) :return: True in case of success transfers, False otherwise (bool). """ @@ -858,11 +878,9 @@ def _do_stageout(job: Any, xdata: list, activity: list, queue: str, title: str, logger.info('stage-out will be done in a container') try: eventtype, localsite, remotesite = get_trace_report_variables(job, label=label) - pilot.util.middleware.containerise_middleware(job, xdata, queue, eventtype, localsite, remotesite, - job.infosys.queuedata.container_options, output_dir, - label=label, - container_type=job.infosys.queuedata.container_type.get("middleware"), - rucio_host=rucio_host) + pilot.util.middleware.containerise_middleware(job, args, xdata, eventtype, localsite, remotesite, + job.infosys.queuedata.container_options, label=label, + container_type=job.infosys.queuedata.container_type.get("middleware")) except PilotException as error: logger.warning('stage-out containerisation threw a pilot exception: %s', error) except Exception as error: @@ -875,8 +893,8 @@ def _do_stageout(job: Any, xdata: list, activity: list, queue: str, title: str, trace_report = create_trace_report(job, label=label) client = StageOutClient(job.infosys, logger=logger, trace_report=trace_report, ipv=ipv, workdir=job.workdir) - kwargs = dict(workdir=job.workdir, cwd=job.workdir, usecontainer=False, job=job, output_dir=output_dir, - catchall=job.infosys.queuedata.catchall, rucio_host=rucio_host) #, mode='stage-out') + kwargs = dict(workdir=job.workdir, cwd=job.workdir, usecontainer=False, job=job, output_dir=args.output_dir, + catchall=job.infosys.queuedata.catchall, rucio_host=args.rucio_host) #, mode='stage-out') # prod analy unification: use destination preferences from PanDA server for unified queues if job.infosys.queuedata.type != 'unified': client.prepare_destinations(xdata, activity) ## FIX ME LATER: split activities: for astorages and for copytools (to unify with ES workflow) @@ -938,8 +956,8 @@ def _stage_out_new(job: Any, args: Any) -> bool: job.stageout = 'log' if job.stageout != 'log': ## do stage-out output files - if not _do_stageout(job, job.outdata, ['pw', 'w'], args.queue, title='output', output_dir=args.output_dir, - rucio_host=args.rucio_host, ipv=args.internet_protocol_version): + if not _do_stageout(job, args, job.outdata, ['pw', 'w'], title='output', + ipv=args.internet_protocol_version): is_success = False logger.warning('transfer of output file(s) failed') @@ -982,8 +1000,8 @@ def _stage_out_new(job: Any, args: Any) -> bool: # write time stamps to pilot timing file add_to_pilot_timing(job.jobid, PILOT_POST_LOG_TAR, time.time(), args) - if not _do_stageout(job, [logfile], ['pl', 'pw', 'w'], args.queue, title='log', output_dir=args.output_dir, - rucio_host=args.rucio_host, ipv=args.internet_protocol_version): + if not _do_stageout(job, args, [logfile], ['pl', 'pw', 'w'], title='log', + ipv=args.internet_protocol_version): is_success = False logger.warning('log transfer failed') job.status['LOG_TRANSFER'] = LOG_TRANSFER_FAILED diff --git a/pilot/control/job.py b/pilot/control/job.py index b0cd32d0..7a42206d 100644 --- a/pilot/control/job.py +++ b/pilot/control/job.py @@ -17,9 +17,9 @@ # under the License. # # Authors: -# - Mario Lassnig, mario.lassnig@cern.ch, 2016-2017 +# - Mario Lassnig, mario.lassnig@cern.ch, 2016-17 # - Daniel Drizhuk, d.drizhuk@gmail.com, 2017 -# - Paul Nilsson, paul.nilsson@cern.ch, 2017-2024 +# - Paul Nilsson, paul.nilsson@cern.ch, 2017-24 # - Wen Guan, wen.guan@cern.ch, 2018 """Job module with functions for job handling.""" @@ -495,7 +495,10 @@ def get_job_status_from_server(job_id: int, url: str, port: str) -> (str, int, i while trial <= max_trials: try: # open connection - ret = https.request(f'{pandaserver}/server/panda/getStatus', data=data) + ret = https.request2(f'{pandaserver}/server/panda/getStatus', data=data) + logger.debug(f"request2 response: {ret}") + if not ret: + ret = https.request(f'{pandaserver}/server/panda/getStatus', data=data) response = ret[1] logger.info(f"response: {response}") if response: @@ -740,7 +743,7 @@ def get_data_structure(job: Any, state: str, args: Any, xml: str = "", metadata: if constime and constime != -1: data['cpuConsumptionTime'] = constime data['cpuConversionFactor'] = job.cpuconversionfactor - cpumodel = get_cpu_model() + cpumodel = get_cpu_model() # ARM info will be corrected below if necessary (otherwise cpumodel will contain UNKNOWN) cpumodel = get_cpu_cores(cpumodel) # add the CPU cores if not present data['cpuConsumptionUnit'] = job.cpuconsumptionunit + "+" + cpumodel @@ -760,6 +763,9 @@ def get_data_structure(job: Any, state: str, args: Any, xml: str = "", metadata: if cpu_arch: logger.debug(f'cpu arch={cpu_arch}') data['cpu_architecture_level'] = cpu_arch + # correct the cpuConsumptionUnit on ARM since cpumodel and cache won't be reported + if cpu_arch.startswith('ARM') and 'UNKNOWN' in data['cpuConsumptionUnit']: + data['cpuConsumptionUnit'] = data['cpuConsumptionUnit'].replace('UNKNOWN', 'ARM') # add memory information if available add_memory_info(data, job.workdir, name=job.memorymonitor) @@ -784,6 +790,9 @@ def get_data_structure(job: Any, state: str, args: Any, xml: str = "", metadata: extra = {'readbyterate': readfrac} else: logger.debug('read_bytes info not yet available') + # extract and remove any GPU info from data since it will be reported with job metrics + add_gpu_info(data, extra) + job_metrics = get_job_metrics(job, extra=extra) if job_metrics: data['jobMetrics'] = job_metrics @@ -796,6 +805,32 @@ def get_data_structure(job: Any, state: str, args: Any, xml: str = "", metadata: return data +def add_gpu_info(data: dict, extra: dict): + """ + Add GPU info to the extra dictionary. + + :param data: data dictionary (dict) + :param extra: extra dictionary (dict) + """ + if 'GPU' in data: + ngpu = 0 + name = "" + try: + logger.debug(f'data[GPU]={data["GPU"]}') + for key in data["GPU"]: + if key.startswith('gpu_0'): # ignore any further GPU info, ie assume they are all the same + name = data["GPU"][key]['name'].replace(' ', '_') # NVIDIA A100-SXM4-40GB -> NVIDIA_A100-SXM4-40GB + elif key == 'nGPU': + ngpu = data["GPU"][key] + if name: + extra['GPU_name'] = name + if ngpu: + extra['nGPU'] = ngpu + del data['GPU'] + except Exception as exc: + logger.warning(f'exception caught: {exc}') + + def process_debug_mode(job: Any) -> str: """ Handle debug mode - preprocess debug command, get the output and kill the payload in case of gdb. @@ -1632,7 +1667,10 @@ def get_job_definition_from_server(args: Any, taskid: str = "") -> str: cmd = https.get_server_command(args.url, args.port) if cmd != "": logger.info(f'executing server command: {cmd}') - res = https.request(cmd, data=data) + res = https.request2(cmd, data=data) # will be a dictionary + logger.debug(f"request2 response: {res}") # should be StatusCode=0 if all is ok + if not res: # fallback to curl solution + res = https.request(cmd, data=data) return res @@ -2091,7 +2129,8 @@ def retrieve(queues: Any, traces: Any, args: Any): # noqa: C901 if args.graceful_stop.is_set(): break time.sleep(1) - elif 'StatusCode' in res and res['StatusCode'] != '0' and res['StatusCode'] != 0: + elif ((isinstance(res, str) and res.startswith('StatusCode') and not res.startswith('StatusCode=0') or + (isinstance(res, dict) and 'StatusCode' in res and res['StatusCode'] != '0' and res['StatusCode'] != 0))): # it seems the PanDA server returns StatusCode as an int, but the aCT returns it as a string # note: StatusCode keyword is not available in job definition files from Harvester (not needed) getjob_failures += 1 @@ -2100,7 +2139,7 @@ def retrieve(queues: Any, traces: Any, args: Any): # noqa: C901 args.graceful_stop.set() break - logger.warning(f"did not get a job -- sleep 60s and repeat -- status: {res['StatusCode']}") + logger.warning(f"did not get a job -- sleep 60s and repeat -- status: {res}") for _ in range(60): if args.graceful_stop.is_set(): break diff --git a/pilot/copytool/rucio.py b/pilot/copytool/rucio.py index 8bc62c8c..8cd2dcdf 100644 --- a/pilot/copytool/rucio.py +++ b/pilot/copytool/rucio.py @@ -20,7 +20,7 @@ # Authors: # - Tobias Wegner, tobias.wegner@cern.ch, 2017-2018 # - Alexey Anisenkov, anisyonk@cern.ch, 2018 -# - Paul Nilsson, paul.nilsson@cern.ch, 2018-2023 +# - Paul Nilsson, paul.nilsson@cern.ch, 2018-2024 # - Tomas Javurek, tomas.javurek@cern.ch, 2019 # - Tomas Javurek, tomas.javurek@cern.ch, 2019 # - David Cameron, david.cameron@cern.ch, 2019 @@ -105,6 +105,7 @@ def copy_in(files: list, **kwargs: dict) -> list: trace_report = kwargs.get('trace_report') use_pcache = kwargs.get('use_pcache') rucio_host = kwargs.get('rucio_host', '') + pilot_args = kwargs.get('args') # don't spoil the output, we depend on stderr parsing os.environ['RUCIO_LOGGING_FORMAT'] = '%(asctime)s %(levelname)s [%(message)s]' @@ -112,6 +113,14 @@ def copy_in(files: list, **kwargs: dict) -> list: # note, env vars might be unknown inside middleware contrainers, if so get the value already in the trace report localsite = os.environ.get('RUCIO_LOCAL_SITE_ID', trace_report.get_value('localSite')) for fspec in files: + # check if we should abort + if pilot_args and pilot_args.graceful_stop and pilot_args.graceful_stop.is_set(): + msg = f'copytool has detected graceful stop - will abort stage-in ({pilot_args.mainworkdir})' + logger.warning(msg) + raise PilotException(msg) + if not pilot_args: + logger.warning('pilot_args not set, cannot check for graceful stop') + logger.info(f'rucio copytool, downloading file with scope:{fspec.scope} lfn:{fspec.lfn}') # update the trace report localsite = localsite if localsite else fspec.ddmendpoint diff --git a/pilot/eventservice/communicationmanager/communicationmanager.py b/pilot/eventservice/communicationmanager/communicationmanager.py index c2c3332c..a7087ba9 100644 --- a/pilot/eventservice/communicationmanager/communicationmanager.py +++ b/pilot/eventservice/communicationmanager/communicationmanager.py @@ -18,7 +18,7 @@ # # Authors: # - Wen Guan, wen.guan@cern.ch, 2018 -# - Paul Nilsson, paul.nilsson@cern.ch, 2023 +# - Paul Nilsson, paul.nilsson@cern.ch, 2023-24 """Main classes to manage the messages between ES and harvester/ACT/Panda.""" @@ -39,6 +39,8 @@ class CommunicationResponse: """Communication response class.""" + exception = None + def __init__(self, attrs: dict = None): """ Initialize variables. @@ -51,13 +53,12 @@ def __init__(self, attrs: dict = None): attrs = json.loads(attrs) def_attrs = {'status': None, 'content': None, 'exception': None} - - for key in def_attrs: + for key, value in def_attrs.items(): if key not in attrs: - attrs[key] = def_attrs[key] + attrs[key] = value - for key in attrs: - setattr(self, key, attrs[key]) + for key, value in attrs.items(): + setattr(self, key, value) def __str__(self) -> str: """ @@ -66,8 +67,8 @@ def __str__(self) -> str: :return: string representation (str). """ json_str = {} - for key, value in list(self.__dict__.items()): # Python 2/3 - if value and type(value) is list: + for key, value in list(self.__dict__.items()): + if value and isinstance(value, list): json_str[key] = [] for list_item in value: json_str[key].append(str(list_item)) @@ -81,6 +82,9 @@ def __str__(self) -> str: class CommunicationRequest(): """Communication request class.""" + post_hook = None + response = None + class RequestType(): """Request type class.""" @@ -109,12 +113,13 @@ def __init__(self, attrs: dict = None): if attrs['request_type'] == CommunicationRequest.RequestType.UpdateJobs: def_attrs = {'jobs': None, 'post_hook': None, 'response': None} - for key in def_attrs: + for key, value in def_attrs.items(): if key not in attrs: - attrs[key] = def_attrs[key] + attrs[key] = value for key in attrs: setattr(self, key, attrs[key]) + self.abort = False def __str__(self): @@ -124,8 +129,8 @@ def __str__(self): :return: string representation (str). """ json_str = {} - for key, value in list(self.__dict__.items()): # Python 2/3 - if value and type(value) is list: + for key, value in list(self.__dict__.items()): + if value and isinstance(value, list): json_str[key] = [] for list_item in value: json_str[key].append(str(list_item)) @@ -147,12 +152,12 @@ def __init__(self, *args, **kwargs): :param args: args object (Any) :param kwargs: kwargs dictionary (dict). """ - super(CommunicationManager, self).__init__() + super().__init__() PluginFactory.__init__(self, *args, **kwargs) - self.setName("CommunicationManager") + self.name = "CommunicationManager" self.post_get_jobs = None self.post_get_event_ranges_hook = None - self.queues = {'request_get_jobs': queue.Queue(), # Python 2/3 + self.queues = {'request_get_jobs': queue.Queue(), 'update_jobs': queue.Queue(), 'request_get_events': queue.Queue(), 'update_events': queue.Queue(), @@ -200,22 +205,22 @@ def get_jobs(self, njobs: int = 1, post_hook: Any = None, args: Any = None) -> A req_attrs = {} if args: - if not type(args) is dict: + if not isinstance(args, dict): args = vars(args) - for key, value in list(args.items()): # Python 2/3 + for key, value in list(args.items()): req_attrs[key] = value other_req_attrs = {'request_type': CommunicationRequest.RequestType.RequestJobs, 'num_jobs': njobs, 'post_hook': post_hook} - for key, value in list(other_req_attrs.items()): # Python 2/3 + for key, value in list(other_req_attrs.items()): req_attrs[key] = value req = CommunicationRequest(req_attrs) self.queues['request_get_jobs'].put(req) if req.post_hook: - return + return None while req.response is None: time.sleep(1) @@ -223,8 +228,8 @@ def get_jobs(self, njobs: int = 1, post_hook: Any = None, args: Any = None) -> A raise req.response.exception if req.response.status is False: return None - else: - return req.response.content + + return req.response.content def update_jobs(self, jobs: Any, post_hook: Any = None) -> Any: """ @@ -248,7 +253,7 @@ def update_jobs(self, jobs: Any, post_hook: Any = None) -> Any: self.queues['update_jobs'].put(req) if req.post_hook: - return + return None while req.response is None: time.sleep(1) @@ -256,8 +261,8 @@ def update_jobs(self, jobs: Any, post_hook: Any = None) -> Any: raise req.response.exception if req.response.status is False: return None - else: - return req.response.content + + return req.response.content def get_event_ranges(self, num_event_ranges: int = 1, post_hook: Any = None, job: Any = None) -> Any: """ @@ -277,9 +282,12 @@ def get_event_ranges(self, num_event_ranges: int = 1, post_hook: Any = None, job if not job: resp_attrs = {'status': -1, 'content': None, - 'exception': exception.CommunicationFailure(f"Get events failed because job info missing(job: {job})")} + 'exception': exception.CommunicationFailure(f"get events failed because job info missing " + f"(job: {job})")} resp = CommunicationResponse(resp_attrs) - raise resp.exception + if resp.exception is not None: + raise resp.exception + raise exception.CommunicationFailure(f"get events failed because job info missing (job: {job})") req_attrs = {'request_type': CommunicationRequest.RequestType.RequestEvents, 'num_event_ranges': num_event_ranges, @@ -293,7 +301,7 @@ def get_event_ranges(self, num_event_ranges: int = 1, post_hook: Any = None, job self.queues['request_get_events'].put(req) if req.post_hook: - return + return None while req.response is None: time.sleep(1) @@ -301,8 +309,8 @@ def get_event_ranges(self, num_event_ranges: int = 1, post_hook: Any = None, job raise req.response.exception if req.response.status is False: return None - else: - return req.response.content + + return req.response.content def update_events(self, update_events: Any, post_hook: Any = None) -> Any: """ @@ -353,7 +361,7 @@ def get_plugin_confs(self) -> dict: plugin_confs = {'class': 'pilot.eventservice.communicationmanager.plugins.pandacommunicator.PandaCommunicator'} if self.args: - for key, value in list(vars(self.args).items()): # Python 2/3 + for key, value in list(vars(self.args).items()): plugin_confs[key] = value return plugin_confs @@ -381,38 +389,45 @@ def can_process_request(self, processor: dict, process_type: str) -> bool: return False - def run(self): - """Handle communication requests.""" + def get_processor(self) -> dict: + """ + Get processor dictionary. + + :return: processor dictionary (dict). + """ confs = self.get_plugin_confs() logger.info(f"communication plugin confs: {confs}") communicator = self.get_plugin(confs) - logger.info(f"communicator: {communicator}") - - processor = {'request_get_jobs': {'pre_check': communicator.pre_check_get_jobs, - 'handler': communicator.request_get_jobs, - 'next_queue': 'processing_get_jobs', - 'process_req_post_hook': False}, - 'request_get_events': {'pre_check': communicator.pre_check_get_events, - 'handler': communicator.request_get_events, - 'next_queue': 'processing_get_events', - 'process_req_post_hook': False}, - 'update_jobs': {'pre_check': communicator.pre_check_update_jobs, - 'handler': communicator.update_jobs, - 'next_queue': None, - 'process_req_post_hook': True}, - 'update_events': {'pre_check': communicator.pre_check_update_events, - 'handler': communicator.update_events, - 'next_queue': None, - 'process_req_post_hook': True}, - 'processing_get_jobs': {'pre_check': communicator.check_get_jobs_status, - 'handler': communicator.get_jobs, - 'next_queue': None, - 'process_req_post_hook': True}, - 'processing_get_events': {'pre_check': communicator.check_get_events_status, - 'handler': communicator.get_events, - 'next_queue': None, - 'process_req_post_hook': True} - } + + return {'request_get_jobs': {'pre_check': communicator.pre_check_get_jobs, + 'handler': communicator.request_get_jobs, + 'next_queue': 'processing_get_jobs', + 'process_req_post_hook': False}, + 'request_get_events': {'pre_check': communicator.pre_check_get_events, + 'handler': communicator.request_get_events, + 'next_queue': 'processing_get_events', + 'process_req_post_hook': False}, + 'update_jobs': {'pre_check': communicator.pre_check_update_jobs, + 'handler': communicator.update_jobs, + 'next_queue': None, + 'process_req_post_hook': True}, + 'update_events': {'pre_check': communicator.pre_check_update_events, + 'handler': communicator.update_events, + 'next_queue': None, + 'process_req_post_hook': True}, + 'processing_get_jobs': {'pre_check': communicator.check_get_jobs_status, + 'handler': communicator.get_jobs, + 'next_queue': None, + 'process_req_post_hook': True}, + 'processing_get_events': {'pre_check': communicator.check_get_events_status, + 'handler': communicator.get_events, + 'next_queue': None, + 'process_req_post_hook': True} + } + + def run(self): + """Handle communication requests.""" + processor = self.get_processor() while True: has_req = False @@ -420,7 +435,7 @@ def run(self): if self.is_stop(): while not self.queues[process_type].empty(): req = self.queues[process_type].get() - logger.info(f"Is going to stop, aborting request: {req}") + logger.info(f"is going to stop, aborting request: {req}") req.abort = True resp_attrs = {'status': None, 'content': None, diff --git a/pilot/eventservice/communicationmanager/plugins/basecommunicator.py b/pilot/eventservice/communicationmanager/plugins/basecommunicator.py index 2e458501..49ce8f55 100644 --- a/pilot/eventservice/communicationmanager/plugins/basecommunicator.py +++ b/pilot/eventservice/communicationmanager/plugins/basecommunicator.py @@ -18,7 +18,7 @@ # # Authors: # - Wen Guan, wen.guan@cern.ch, 2018 -# - Paul Nilsson, paul.nilsson@cern.ch, 2020-23 +# - Paul Nilsson, paul.nilsson@cern.ch, 2020-24 """Base communicator.""" @@ -33,7 +33,7 @@ class BaseCommunicator: _instance = None - def __new__(class_, *args: Any, **kwargs: dict) -> Any: + def __new__(cls, *args: Any, **kwargs: dict) -> Any: """ Create new instance of class. @@ -41,10 +41,10 @@ def __new__(class_, *args: Any, **kwargs: dict) -> Any: :param kwargs: kwargs dictionary (dict) :return: new class instance (Any). """ - if not isinstance(class_._instance, class_): - class_._instance = object.__new__(class_, *args, **kwargs) + if not isinstance(cls._instance, cls): + cls._instance = object.__new__(cls, *args, **kwargs) - return class_._instance + return cls._instance def __init__(self, *args: Any, **kwargs: dict): """ @@ -53,9 +53,9 @@ def __init__(self, *args: Any, **kwargs: dict): :param args: args object (Any) :param kwargs: kwargs dictionary (dict) """ - super(BaseCommunicator, self).__init__() - for key in kwargs: - setattr(self, key, kwargs[key]) + super().__init__() + for key, value in kwargs.items(): + setattr(self, key, value) def pre_check_get_jobs(self, req: Any): """ diff --git a/pilot/eventservice/communicationmanager/plugins/pandacommunicator.py b/pilot/eventservice/communicationmanager/plugins/pandacommunicator.py index c1c34697..6b7531f0 100644 --- a/pilot/eventservice/communicationmanager/plugins/pandacommunicator.py +++ b/pilot/eventservice/communicationmanager/plugins/pandacommunicator.py @@ -18,7 +18,7 @@ # # Authors: # - Wen Guan, wen.guan@cern.ch, 2018 -# - Paul Nilsson, paul.nilsson@cern.ch, 2020-23 +# - Paul Nilsson, paul.nilsson@cern.ch, 2020-24 """PanDA communicator.""" @@ -47,13 +47,13 @@ def __init__(self, *args: Any, **kwargs: dict): :param args: args object (Any) :param kwargs: kwargs dictionary (dict) """ - super(PandaCommunicator, self).__init__(args, kwargs) + super().__init__(args, kwargs) self.get_jobs_lock = threading.Lock() self.get_events_lock = threading.Lock() self.update_events_lock = threading.Lock() self.update_jobs_lock = threading.Lock() - def pre_check_get_jobs(self, req=None) -> Any: + def pre_check_get_jobs(self, req: Any = None) -> Any: """ Check whether it's ok to send a request to get jobs. @@ -71,7 +71,7 @@ def request_get_jobs(self, req: Any) -> Any: """ return CommunicationResponse({'status': 0}) - def check_get_jobs_status(self, req=None): + def check_get_jobs_status(self, req: Any = None): """ Check whether jobs are prepared. @@ -80,6 +80,23 @@ def check_get_jobs_status(self, req=None): """ return CommunicationResponse({'status': 0}) + def get_data(self, req: Any) -> dict: + """ + Get data from request. + + :param req: request (Any) + :return: data dictionary (dict). + """ + data = {'getProxyKey': 'False'} + kmap = {'node': 'node', 'mem': 'mem', 'getProxyKey': 'getProxyKey', 'computingElement': 'queue', + 'diskSpace': 'disk_space', + 'siteName': 'site', 'prodSourceLabel': 'job_label', 'workingGroup': 'working_group', 'cpu': 'cpu'} + for key, value in list(kmap.items()): + if hasattr(req, value): + data[key] = getattr(req, value) + + return data + def get_jobs(self, req: Any) -> dict: """ Get the job definition from panda server. @@ -93,14 +110,10 @@ def get_jobs(self, req: Any) -> dict: jobs = [] resp_attrs = None - data = {'getProxyKey': 'False'} - kmap = {'node': 'node', 'mem': 'mem', 'getProxyKey': 'getProxyKey', 'computingElement': 'queue', 'diskSpace': 'disk_space', - 'siteName': 'site', 'prodSourceLabel': 'job_label', 'workingGroup': 'working_group', 'cpu': 'cpu'} - for key, value in list(kmap.items()): # Python 2/3 - if hasattr(req, value): - data[key] = getattr(req, value) + # get the data dictionary + data = self.get_data(req) - for i in range(req.num_jobs): + for _ in range(req.num_jobs): logger.info(f"Getting jobs: {data}") url = environ.get('PANDA_SERVER_URL', config.Pilot.pandaserver) res = https.request(f'{url}/server/panda/getJob', data=data) @@ -109,7 +122,7 @@ def get_jobs(self, req: Any) -> dict: if res is None: resp_attrs = {'status': None, 'content': None, 'exception': exception.CommunicationFailure("Get job failed to get response from Panda.")} break - elif res['StatusCode'] == 20 and 'no jobs in PanDA' in res['errorDialog']: + if res['StatusCode'] == 20 and 'no jobs in PanDA' in res['errorDialog']: resp_attrs = {'status': res['StatusCode'], 'content': None, 'exception': exception.CommunicationFailure("No jobs in panda")} @@ -127,7 +140,7 @@ def get_jobs(self, req: Any) -> dict: resp_attrs = {'status': -1, 'content': None, 'exception': exception.UnknownException("Failed to get jobs")} resp = CommunicationResponse(resp_attrs) - except Exception as e: # Python 2/3 + except Exception as e: logger.error(f"Failed to get jobs: {e}, {traceback.format_exc()}") resp_attrs = {'status': -1, 'content': None, 'exception': exception.UnknownException(f"Failed to get jobs: {traceback.format_exc()}")} resp = CommunicationResponse(resp_attrs) diff --git a/pilot/eventservice/esprocess/eshook.py b/pilot/eventservice/esprocess/eshook.py index 6da9a28e..cb33ea30 100644 --- a/pilot/eventservice/esprocess/eshook.py +++ b/pilot/eventservice/esprocess/eshook.py @@ -18,7 +18,7 @@ # # Authors: # - Wen Guan, wen.guan@cern.ch, 2017 -# - Paul Nilsson, paul.nilsson@cern.ch, 2023 +# - Paul Nilsson, paul.nilsson@cern.ch, 2023-24 """Hooks for EventService.""" @@ -30,18 +30,21 @@ def get_payload(self) -> dict: """ Get payload to execute. - :return: {'payload': , 'output_file': , 'error_file': } (dict). + Should return: {'payload': , 'output_file': , 'error_file': } (dict). + :raises Exception: if anything goes wrong. """ - raise Exception("Not Implemented") + raise NotImplementedError("Not Implemented") def get_event_ranges(self, num_ranges: int = 1) -> dict: """ Get event ranges. + Should returns: dictionary of event ranges (dict). + :param num_ranges: Number of event ranges to download, default is 1 (int) - :returns: dictionary of event ranges (dict). + :raises Exception: if anything goes wrong. """ - raise Exception("Not Implemented") + raise NotImplementedError("Not Implemented") def handle_out_message(self, message: dict): """ @@ -53,5 +56,6 @@ def handle_out_message(self, message: dict): For 'failed' event ranges, it's {'id': , 'status': 'finished', 'message': }. :param message: dictionary of a parsed message (dict). + :raises Exception: if anything goes wrong. """ - raise Exception("Not Implemented") + raise NotImplementedError("Not Implemented") diff --git a/pilot/eventservice/esprocess/esmanager.py b/pilot/eventservice/esprocess/esmanager.py index 2bcabb35..0c3b868c 100644 --- a/pilot/eventservice/esprocess/esmanager.py +++ b/pilot/eventservice/esprocess/esmanager.py @@ -43,7 +43,7 @@ def __init__(self, hook: Any): """ logger.info('initializing hooks') if not isinstance(hook, ESHook): - raise Exception(f"hook({hook}) is not instance of {ESHook}") + raise TypeError(f"hook({hook}) is not instance of {ESHook}") self.__hook = hook logger.info('initialized hooks') diff --git a/pilot/eventservice/esprocess/esmessage.py b/pilot/eventservice/esprocess/esmessage.py index 05c667e4..3d59d0a2 100644 --- a/pilot/eventservice/esprocess/esmessage.py +++ b/pilot/eventservice/esprocess/esmessage.py @@ -17,7 +17,7 @@ # # Authors: # - Wen Guan, wen.guan@cern.ch, 2017 -# - Paul Nilsson, paul.nilsson@cern.ch, 2021-23 +# - Paul Nilsson, paul.nilsson@cern.ch, 2021-24 """Event Service message class.""" @@ -47,7 +47,7 @@ def __init__(self, message_queue: Any, socket_name: str = None, context: str = ' :raises MessageFailure: when failed to set up message socket. """ threading.Thread.__init__(self, **kwds) - self.setName("MessageThread") + self.name = "MessageThread" self.__message_queue = message_queue self._socket_name = socket_name self.__stop = threading.Event() @@ -55,7 +55,7 @@ def __init__(self, message_queue: Any, socket_name: str = None, context: str = ' try: import yampl except Exception as exc: - raise MessageFailure(f"Failed to import yampl: {exc}") + raise MessageFailure(f"Failed to import yampl: {exc}") from exc logger.info('setup yampl server socket') try: @@ -63,7 +63,8 @@ def __init__(self, message_queue: Any, socket_name: str = None, context: str = ' self._socket_name = f'EventService_EventRanges_{os.getpid()}' self.__message_server = yampl.ServerSocket(self._socket_name, context) except Exception as exc: - raise MessageFailure(f"failed to set up yampl server socket: {exc} {traceback.print_exc()}") + raise MessageFailure(f"failed to set up yampl server socket: {exc} {traceback.print_exc()}") from exc + logger.info(f'finished setting up yampl server socket (socket_name: {self._socket_name}, context:{context}).') def get_yampl_socket_name(self) -> str: @@ -87,7 +88,7 @@ def send(self, message: str): raise MessageFailure("No message server.") self.__message_server.send_raw(message.encode('utf8')) except Exception as exc: - raise MessageFailure(exc) + raise MessageFailure(exc) from exc def stop(self): """Set stop event.""" diff --git a/pilot/eventservice/esprocess/esprocess.py b/pilot/eventservice/esprocess/esprocess.py index b62743b2..d4722aa1 100644 --- a/pilot/eventservice/esprocess/esprocess.py +++ b/pilot/eventservice/esprocess/esprocess.py @@ -54,6 +54,7 @@ from pilot.util.processes import kill_child_processes logger = logging.getLogger(__name__) +athenopts_re = re.compile(r'--athenaopts=\'([\w\=\-\"\' ]+)\'') class ESProcess(threading.Thread): @@ -87,7 +88,7 @@ def __init__(self, payload, waiting_time=30 * 60): self.__is_payload_started = False self.__ret_code = None - self.setName("ESProcess") + self.name = "ESProcess" self.corecount = 1 self.event_ranges_cache = [] @@ -105,7 +106,7 @@ def is_payload_started(self): """ return self.__is_payload_started - def stop(self, delay=1800): + def stop(self, delay: int = 1800): """ Stop the process. @@ -122,7 +123,7 @@ def init_message_thread(self, socketname: str = None, context: str = 'local'): """ Initialize message thread. - :param socket_name: name of the socket between current process and payload (str) + :param socketname: name of the socket between current process and payload (str) :param context: name of the context between current process and payload, default is 'local' (str) :raises MessageFailure: when failed to init message thread. """ @@ -136,7 +137,8 @@ def init_message_thread(self, socketname: str = None, context: str = 'local'): except Exception as exc: logger.error(f"failed to start message thread: {exc}") self.__ret_code = -1 - raise MessageFailure(exc) + raise MessageFailure(exc) from exc + logger.info("finished initializing message thread") def stop_message_thread(self): @@ -158,30 +160,45 @@ def init_yampl_socket(self, executable: str) -> str: socket_name = self.__message_thread.get_yampl_socket_name() is_ca = "--CA" in executable + is_mt = "--multithreaded=true" in executable.lower() if is_ca: - preexec_socket_config = f" --preExec 'ConfigFlags.MP.EventRangeChannel=\"{socket_name}\"' " + if is_mt: + preexec_socket_config = f" --mtes=True --mtes-channel=\"{socket_name}\" " + else: + preexec_socket_config = f" --preExec 'ConfigFlags.MP.EventRangeChannel=\"{socket_name}\"' " else: preexec_socket_config = \ f" --preExec 'from AthenaMP.AthenaMPFlags import jobproperties as jps;jps.AthenaMPFlags.EventRangeChannel=\"{socket_name}\"' " + if is_mt: + logger.warning("event service is not supported in MT job without CA") if "PILOT_EVENTRANGECHANNEL" in executable: executable = f"export PILOT_EVENTRANGECHANNEL=\"{socket_name}\"; " + executable + elif is_mt and is_ca: + has_opts = "--athenaopts" in executable + if has_opts: + executable = athenopts_re.sub(fr"--athenaopts='\1 {preexec_socket_config}'", executable) + else: + executable = executable.strip() + if executable.endswith(";"): + executable = executable[:-1] + executable += f" --athenaopts='{preexec_socket_config}' " + elif "--preExec" not in executable: executable = executable.strip() if executable.endswith(";"): executable = executable[:-1] executable += preexec_socket_config + + if "import jobproperties as jps" in executable: + executable = executable.replace("import jobproperties as jps;", + f"import jobproperties as jps;jps.AthenaMPFlags.EventRangeChannel=\"{socket_name}\";") + if is_ca: + logger.warning("Found jobproperties config in CA job") + elif "--preExec " in executable: + executable = executable.replace("--preExec ", preexec_socket_config) else: - if "import jobproperties as jps" in executable: - executable = executable.replace("import jobproperties as jps;", - f"import jobproperties as jps;jps.AthenaMPFlags.EventRangeChannel=\"{socket_name}\";") - if is_ca: - logger.warning("Found jobproperties config in CA job") - else: - if "--preExec " in executable: - executable = executable.replace("--preExec ", preexec_socket_config) - else: - logger.warn(f"--preExec has an unknown format - expected '--preExec \"' or \"--preExec '\", got: {executable}") + logger.warning(f"--preExec has an unknown format - expected '--preExec \"' or \"--preExec '\", got: {executable}") return executable @@ -190,6 +207,7 @@ def init_payload_process(self): Initialize payload process. :raise SetupFailure: when failed to init payload process. + :raise Exception: when failed to prepare container command. """ logger.info("initializing payload process") try: @@ -213,7 +231,7 @@ def init_payload_process(self): except Exception as e: msg = f'exception caught while preparing container command: {e}' logger.warning(msg) - raise SetupFailure(msg) + raise SetupFailure(msg) from e else: logger.warning('could not containerise executable') @@ -232,7 +250,8 @@ def init_payload_process(self): except Exception as exc: logger.error(f"failed to start payload process: {exc}, {traceback.format_exc()}") self.__ret_code = -1 - raise SetupFailure(exc) + raise SetupFailure(exc) from exc + logger.info("finished initializing payload process") def get_file(self, workdir: str, file_label: str = 'output_file', @@ -252,10 +271,10 @@ def get_file(self, workdir: str, file_label: str = 'output_file', _file_fd = self.__payload[file_label] else: _file = self.__payload[file_label] if '/' in self.__payload[file_label] else os.path.join(workdir, self.__payload[file_label]) - _file_fd = open(_file, 'w') + _file_fd = open(_file, 'w', encoding='utf-8') else: _file = os.path.join(workdir, file_name) - _file_fd = open(_file, 'w') + _file_fd = open(_file, 'w', encoding='utf-8') return _file_fd @@ -275,7 +294,7 @@ def get_workdir(self) -> str: try: os.makedirs(workdir) except OSError as exc: - raise SetupFailure(f"failed to create workdir: {exc}") + raise SetupFailure(f"failed to create workdir: {exc}") from exc elif not os.path.isdir(workdir): raise SetupFailure('workdir exists but is not a directory') @@ -347,8 +366,8 @@ def monitor(self): """ if self.__no_more_event_time and time.time() - self.__no_more_event_time > self.__waiting_time: self.__ret_code = -1 - raise Exception(f'Too long time ({time.time() - self.__no_more_event_time} seconds) ' - f'since \"No more events\" is injected') + raise TimeoutError(f'Too long time ({time.time() - self.__no_more_event_time} seconds) ' + f'since \"No more events\" is injected') if self.__monitor_log_time is None or self.__monitor_log_time < time.time() - 10 * 60: self.__monitor_log_time = time.time() @@ -411,8 +430,8 @@ def get_event_range_to_payload(self) -> list: if self.event_ranges_cache: event_range = self.event_ranges_cache.pop(0) return event_range - else: - return [] + + return [] def get_event_ranges(self, num_ranges: int = None) -> list: """ @@ -436,7 +455,7 @@ def get_event_ranges(self, num_ranges: int = None) -> list: logger.debug(f'got event ranges: {event_ranges}') return event_ranges except Exception as e: - raise MessageFailure(f"Failed to get event ranges: {e}") + raise MessageFailure(f"Failed to get event ranges: {e}") from e def send_event_ranges_to_payload(self, event_ranges: list): """ @@ -449,9 +468,10 @@ def send_event_ranges_to_payload(self, event_ranges: list): self.is_no_more_events = True self.__no_more_event_time = time.time() else: - if type(event_ranges) is not list: + if not isinstance(event_ranges, list): event_ranges = [event_ranges] msg = json.dumps(event_ranges) + logger.debug(f'send event ranges to payload: {msg}') self.__message_thread.send(msg) @@ -466,17 +486,7 @@ def parse_out_message(self, message: str) -> dict: """ logger.debug(f'parsing message: {message}') try: - if message.startswith("/"): - parts = message.split(",") - ret = {'output': parts[0]} - parts = parts[1:] - for part in parts: - name, value = part.split(":") - name = name.lower() - ret[name] = value - ret['status'] = 'finished' - return ret - elif message.startswith('ERR'): + if message.startswith('ERR'): if "ERR_ATHENAMP_PARSE" in message: pattern = re.compile(r"(ERR\_[A-Z\_]+)\ (.+)\:\ ?(.+)") found = re.findall(pattern, message) @@ -487,20 +497,30 @@ def parse_out_message(self, message: str) -> dict: event_range_id = found[0] ret = {'id': event_range_id, 'status': 'failed', 'message': message} return ret - else: - raise Exception(f"Failed to parse {message}") - else: - pattern = re.compile(r"(ERR\_[A-Z\_]+)\ ([0-9A-Za-z._\-]+)\:\ ?(.+)") - found = re.findall(pattern, message) - event_range_id = found[0][1] - ret = {'id': event_range_id, 'status': 'failed', 'message': message} - return ret - else: - raise UnknownException(f"Unknown message {message}") + + raise ValueError(f"Failed to parse {message}") + + pattern = re.compile(r"(ERR\_[A-Z\_]+)\ ([0-9A-Za-z._\-]+)\:\ ?(.+)") + found = re.findall(pattern, message) + event_range_id = found[0][1] + ret = {'id': event_range_id, 'status': 'failed', 'message': message} + + return ret + + parts = message.split(",") + ret = {'output': parts[0]} + parts = parts[1:] + for part in parts: + name, value = part.split(":") + name = name.lower() + ret[name] = value + ret['status'] = 'finished' + + return ret except PilotException as e: raise e except Exception as e: - raise UnknownException(e) + raise UnknownException(e) from e def handle_out_message(self, message: str): """ @@ -522,7 +542,7 @@ def handle_out_message(self, message: str): logger.debug(f'calling handle_out_message hook({self.handle_out_message_hook}) to handle parsed message.') self.handle_out_message_hook(message_status) except Exception as e: - raise RunPayloadFailure(f"Failed to handle out message: {e}") + raise RunPayloadFailure(f"Failed to handle out message: {e}") from e def handle_messages(self): """Monitor the message queue to get output or error messages from payload and response to different messages.""" @@ -568,7 +588,7 @@ def terminate(self, time_to_wait: int = 1): else: logger.error(f"payload finished with error code: {self.__process.poll()}") else: - for i in range(time_to_wait * 10): + for _ in range(time_to_wait * 10): if not self.__process.poll() is None: break time.sleep(1) @@ -591,7 +611,7 @@ def terminate(self, time_to_wait: int = 1): logger.error(f'Exception caught when terminating ESProcess: {e}') self.__ret_code = -1 self.stop() - raise UnknownException(e) + raise UnknownException(e) from e def kill(self): """ @@ -618,7 +638,7 @@ def kill(self): except Exception as exc: logger.error(f'exception caught when terminating ESProcess: {exc}') self.stop() - raise UnknownException(exc) + raise UnknownException(exc) from exc def clean(self): """Clean left resources.""" diff --git a/pilot/eventservice/esprocess/esprocessfinegrainedproc.py b/pilot/eventservice/esprocess/esprocessfinegrainedproc.py index 641902fc..1fdd47bf 100644 --- a/pilot/eventservice/esprocess/esprocessfinegrainedproc.py +++ b/pilot/eventservice/esprocess/esprocessfinegrainedproc.py @@ -17,6 +17,7 @@ # # Authors: # - Wen Guan, wen.guan@cern.ch, 2023 +# - Paul Nilsson, paul.nilsson@cern.ch, 2024 import io import logging diff --git a/pilot/eventservice/esprocess/hooks/acthook.py b/pilot/eventservice/esprocess/hooks/acthook.py index bdb0b700..0cbc3a5b 100644 --- a/pilot/eventservice/esprocess/hooks/acthook.py +++ b/pilot/eventservice/esprocess/hooks/acthook.py @@ -18,7 +18,7 @@ # # Authors: # - Wen Guan, wen.guan@cern.ch, 2017 -# - Paul Nilsson, paul.nilsson@cern.ch, 2023 +# - Paul Nilsson, paul.nilsson@cern.ch, 2023-24 """Hooks for ARC-ControlTower EventService.""" @@ -32,20 +32,23 @@ def get_payload(self) -> dict: """ Get payload to execute. - :return: {'payload': , 'output_file': , 'error_file': } (dict) + Should return: {'payload': , 'output_file': , 'error_file': } (dict) + :raises Exception: if anything goes wrong. """ - raise Exception("Not Implemented") + raise NotImplementedError("Not Implemented") def get_event_ranges(self, num_ranges: int = 1) -> dict: """ Get event ranges. + Should return: dictionary of event ranges (dict). + :param num_ranges: number of event ranges to get (int) - :return: dictionary of event ranges (dict). + :raises Exception: if anything goes wrong. """ - raise Exception("Not Implemented") + raise NotImplementedError("Not Implemented") - def handle_out_message(self, message): + def handle_out_message(self, message: dict): """ Handle ES output or error messages. @@ -57,4 +60,4 @@ def handle_out_message(self, message): :param message: dictionary of a parsed message (dict). :raises Exception: if anything goes wrong. """ - raise Exception("Not Implemented") + raise NotImplementedError("Not Implemented") diff --git a/pilot/eventservice/esprocess/hooks/harvesterhook.py b/pilot/eventservice/esprocess/hooks/harvesterhook.py index 61c4ad10..43ba24f0 100644 --- a/pilot/eventservice/esprocess/hooks/harvesterhook.py +++ b/pilot/eventservice/esprocess/hooks/harvesterhook.py @@ -32,18 +32,21 @@ def get_payload(self) -> dict: """ Get payload to execute. - :return: {'payload': , 'output_file': , 'error_file': } (dict). + Should return: {'payload': , 'output_file': , 'error_file': } (dict). + :raises Exception: if anything goes wrong. """ - raise Exception("Not Implemented") + raise NotImplementedError("Not Implemented") def get_event_ranges(self, num_ranges: int = 1) -> dict: """ Get event ranges. + Should return: dictionary of event ranges (dict). + :param num_ranges: Number of event ranges to download, default is 1 (int) - :return: dictionary of event ranges (dict). + :raises Exception: if anything goes wrong. """ - raise Exception("Not Implemented") + raise NotImplementedError("Not Implemented") def handle_out_message(self, message: dict): """ @@ -55,5 +58,6 @@ def handle_out_message(self, message: dict): For 'failed' event ranges, it's {'id': , 'status': 'finished', 'message': }. :param message: dictionary of parsed message (dict). + :raises Exception: if anything goes wrong. """ - raise Exception("Not Implemented") + raise NotImplementedError("Not Implemented") diff --git a/pilot/eventservice/workexecutor/plugins/raythenaexecutor.py b/pilot/eventservice/workexecutor/plugins/raythenaexecutor.py index 47e639c3..53fd52f1 100644 --- a/pilot/eventservice/workexecutor/plugins/raythenaexecutor.py +++ b/pilot/eventservice/workexecutor/plugins/raythenaexecutor.py @@ -183,6 +183,8 @@ def handle_out_message(self, message: dict): if message['status'] in ['failed', 'fatal']: self.update_failed_event_ranges([message]) else: + if 'output' in message: + message['output'] = os.path.join(self.get_job().workdir, message['output']) self.__queued_out_messages.append(message) def stageout_es(self, force: bool = False): diff --git a/pilot/info/dataloader.py b/pilot/info/dataloader.py index 9bea57e0..9acfc753 100644 --- a/pilot/info/dataloader.py +++ b/pilot/info/dataloader.py @@ -17,7 +17,7 @@ # # Authors: # - Alexey Anisenkov, anisyonk@cern.ch, 2018 -# - Paul Nilsson, paul.nilsson@cern.ch, 2019-23 +# - Paul Nilsson, paul.nilsson@cern.ch, 2019-24 """ Base loader class to retrieve data from Ext sources (file, url). @@ -31,9 +31,6 @@ import logging import os import time -import urllib.request -import urllib.error -import urllib.parse from datetime import ( datetime, timedelta @@ -41,7 +38,7 @@ from typing import Any from pilot.util.timer import timeout -from pilot.util.https import ctx +from pilot.util.https import download_file logger = logging.getLogger(__name__) @@ -116,25 +113,6 @@ def _readfile(url: str) -> str: return "" - def _readurl(url: str, _timeout: int = 20) -> str: - """ - Read url content. - - :param url: url (str) - :return: url content (str). - """ - req = urllib.request.Request(url) - req.add_header('User-Agent', ctx.user_agent) - try: - with urllib.request.urlopen(req, context=ctx.ssl_context, timeout=_timeout) as response: - content = response.read() - except urllib.error.URLError as exc: - logger.warning(f"error occurred with urlopen: {exc.reason}") - # Handle the error, set content to None or handle as needed - content = "" - - return content - content = None if url and cls.is_file_expired(fname, cache_time): # load data into temporary cache file for trial in range(1, nretry + 1): @@ -147,9 +125,7 @@ def _readurl(url: str, _timeout: int = 20) -> str: content = _readfile(url) else: logger.info(f'[attempt={trial}/{nretry}] loading data from url {url}') - req = urllib.request.Request(url) - req.add_header('User-Agent', ctx.user_agent) - content = _readurl(url) + content = download_file(url) if fname: # save to cache with open(fname, "w+", encoding='utf-8') as _file: diff --git a/pilot/scripts/open_file.sh b/pilot/scripts/open_file.sh new file mode 100644 index 00000000..c213a824 --- /dev/null +++ b/pilot/scripts/open_file.sh @@ -0,0 +1,10 @@ +#!/bin/bash +# Script for remote file open +date +export XRD_LOGLEVEL=Debug +lsetup 'root pilot-default' +echo LSETUP_COMPLETED +date +python3 REPLACE_ME_FOR_CMD +echo "Script execution completed." +exit $? diff --git a/pilot/test/test_analytics.py b/pilot/test/test_analytics.py index ad0b8fcb..74b28248 100644 --- a/pilot/test/test_analytics.py +++ b/pilot/test/test_analytics.py @@ -44,7 +44,8 @@ def test_linear_fit(self): fit = self.client.fit(x, y) slope = fit.slope() intersect = fit.intersect() - + print(slope) + print(intersect) self.assertEqual(type(slope), float) self.assertEqual(slope, 1.0) self.assertEqual(type(intersect), float) @@ -57,7 +58,7 @@ def test_linear_fit(self): self.assertEqual(slope, -1.0) - def test_parsing_memory_monitor_data(self): + def est_parsing_memory_monitor_data(self): """Read and fit PSS vs Time from memory monitor output file.""" # old MemoryMonitor format filename = 'pilot/test/resource/memory_monitor_output.txt' diff --git a/pilot/user/atlas/common.py b/pilot/user/atlas/common.py index 38d6691e..9216ba13 100644 --- a/pilot/user/atlas/common.py +++ b/pilot/user/atlas/common.py @@ -46,6 +46,7 @@ PilotException, FileHandlingFailure ) +from pilot.info.filespec import FileSpec from pilot.util.config import config from pilot.util.constants import ( UTILITY_BEFORE_PAYLOAD, @@ -72,7 +73,7 @@ update_extension, write_file, ) -from pilot.info.filespec import FileSpec +from pilot.util.https import upload_file from pilot.util.processes import ( convert_ps_to_dict, find_pid, find_cmd_pids, @@ -80,8 +81,10 @@ is_child ) from pilot.util.tracereport import TraceReport - -from .container import create_root_container_command +from .container import ( + create_root_container_command, + execute_remote_file_open +) from .dbrelease import get_dbrelease_version, create_dbrelease from .setup import ( should_pilot_prepare_setup, @@ -178,7 +181,7 @@ def validate(job: Any) -> bool: return status -def open_remote_files(indata: list, workdir: str, nthreads: int) -> (int, str, list): +def open_remote_files(indata: list, workdir: str, nthreads: int) -> (int, str, list): # noqa: C901 """ Verify that direct i/o files can be opened. @@ -198,54 +201,91 @@ def open_remote_files(indata: list, workdir: str, nthreads: int) -> (int, str, l # execute file open script which will attempt to open each file # copy pilot source into container directory, unless it is already there - script = 'open_remote_file.py' diagnostics = copy_pilot_source(workdir) if diagnostics: raise PilotException(diagnostics) - final_script_path = os.path.join(workdir, script) os.environ['PYTHONPATH'] = os.environ.get('PYTHONPATH') + ':' + workdir - script_path = os.path.join('pilot/scripts', script) - dir1 = os.path.join(os.path.join(os.environ['PILOT_HOME'], 'pilot3'), script_path) - dir2 = os.path.join(workdir, script_path) - full_script_path = dir1 if os.path.exists(dir1) else dir2 - if not os.path.exists(full_script_path): - # do not set ec since this will be a pilot issue rather than site issue - diagnostics = ( - f'cannot perform file open test - script path does not exist: {full_script_path}' - ) + + # first copy all scripts that are needed + scripts = ['open_remote_file.py', 'open_file.sh'] + final_paths = {} + for script in scripts: + + final_script_path = os.path.join(workdir, script) + script_path = os.path.join('pilot/scripts', script) + dir1 = os.path.join(os.path.join(os.environ['PILOT_HOME'], 'pilot3'), script_path) + dir2 = os.path.join(workdir, script_path) + full_script_path = dir1 if os.path.exists(dir1) else dir2 + if not os.path.exists(full_script_path): + # do not set ec since this will be a pilot issue rather than site issue + diagnostics = ( + f'cannot perform file open test - script path does not exist: {full_script_path}' + ) + logger.warning(diagnostics) + logger.warning(f'tested both path={dir1} and path={dir2} (none exists)') + return exitcode, diagnostics, not_opened + + try: + copy(full_script_path, final_script_path) + except PilotException as exc: + # do not set ec since this will be a pilot issue rather than site issue + diagnostics = f'cannot perform file open test - pilot source copy failed: {exc}' + logger.warning(diagnostics) + return exitcode, diagnostics, not_opened + + # correct the path when containers have been used + if "open_remote_file.py" in script: + final_script_path = os.path.join('.', script) + + final_paths[script] = final_script_path + logger.debug(f'final path={final_script_path}') + + logger.debug(f'reading file: {final_paths["open_file.sh"]}') + script_content = read_file(final_paths['open_file.sh']) + if not script_content: + diagnostics = (f'cannot perform file open test - failed to read script content from path ' + f'{final_paths["open_file.sh"]}') logger.warning(diagnostics) - logger.warning(f'tested both path={dir1} and path={dir2} (none exists)') return exitcode, diagnostics, not_opened - try: - copy(full_script_path, final_script_path) - except PilotException as exc: - # do not set ec since this will be a pilot issue rather than site issue - diagnostics = f'cannot perform file open test - pilot source copy failed: {exc}' + + logger.debug(f'creating file open command from path: {final_paths["open_remote_file.py"]}') + _cmd = get_file_open_command(final_paths['open_remote_file.py'], turls, nthreads) + if not _cmd: + diagnostics = (f'cannot perform file open test - failed to create file open command from path ' + f'{final_paths["open_remote_file.py"]}') logger.warning(diagnostics) return exitcode, diagnostics, not_opened - # correct the path when containers have been used - final_script_path = os.path.join('.', script) - - _cmd = get_file_open_command(final_script_path, turls, nthreads) - cmd = create_root_container_command(workdir, _cmd) - timeout = get_timeout_for_remoteio(indata) - logger.info(f'executing file open verification script (timeout={timeout}):\n\n\'{cmd}\'\n\n') + cmd = create_root_container_command(workdir, _cmd, script_content) + path = os.path.join(workdir, 'open_remote_file_cmd.sh') + logger.info(f'executing file open verification script (path={path}, timeout={timeout}):\n\n\'{cmd}\'\n\n') + try: + write_file(path, cmd) + except FileHandlingFailure as exc: + diagnostics = f'failed to write file: {exc}' + logger.warning(diagnostics) + return 11, diagnostics, not_opened - exitcode, stdout, stderr = execute(cmd, usecontainer=False, timeout=timeout) - if config.Pilot.remotefileverification_log: - fpath = os.path.join(workdir, config.Pilot.remotefileverification_log) - write_file(fpath, stdout + stderr, mute=False) + try: + exitcode, stdout = execute_remote_file_open(path, timeout) + except PilotException as exc: + logger.warning(f'caught pilot exception: {exc}') + exitcode = 11 + stdout = exc +# exitcode, stdout, stderr = execute(cmd, usecontainer=False, timeout=timeout) +# if config.Pilot.remotefileverification_log: +# fpath = os.path.join(workdir, config.Pilot.remotefileverification_log) +# write_file(fpath, stdout + stderr, mute=False) logger.info(f'remote file open finished with ec={exitcode}') # error handling if exitcode: # first check for apptainer errors - _exitcode = errors.resolve_transform_error(exitcode, stdout + stderr) + _exitcode = errors.resolve_transform_error(exitcode, stdout) if _exitcode != exitcode: # a better error code was found (COMMANDTIMEDOUT error will be passed through) - return _exitcode, stderr, not_opened + return _exitcode, stdout, not_opened # note: if the remote files could still be opened the reported error should not be REMOTEFILEOPENTIMEDOUT _exitcode, diagnostics, not_opened = parse_remotefileverification_dictionary(workdir) @@ -2647,19 +2687,23 @@ def update_server(job: Any) -> None: # logger.debug(f'prmon json=\n{out}') # logger.debug(f'final logstash prmon dictionary: {metadata_dictionary}') url = 'https://pilot.atlas-ml.org' # 'http://collector.atlas-ml.org:80' - cmd = ( - f"curl --connect-timeout 20 --max-time 120 -H \"Content-Type: application/json\" -X POST " - f"--upload-file {new_path} {url}" - ) - # send metadata to logstash - try: - _, stdout, stderr = execute(cmd, usecontainer=False) - except Exception as exc: - logger.warning(f'exception caught: {exc}') + status = upload_file(url, new_path) + if status: + logger.info('sent prmon JSON dictionary to logstash server (urllib method)') else: - logger.debug('sent prmon JSON dictionary to logstash server') - logger.debug(f'stdout: {stdout}') - logger.debug(f'stderr: {stderr}') + cmd = ( + f"curl --connect-timeout 20 --max-time 120 -H \"Content-Type: application/json\" -X POST " + f"--upload-file {new_path} {url}" + ) + # send metadata to logstash + try: + _, stdout, stderr = execute(cmd, usecontainer=False) + except Exception as exc: + logger.warning(f'exception caught: {exc}') + else: + logger.info('sent prmon JSON dictionary to logstash server (curl method)') + logger.debug(f'stdout: {stdout}') + logger.debug(f'stderr: {stderr}') else: msg = 'no prmon json available - cannot send anything to logstash server' logger.warning(msg) diff --git a/pilot/user/atlas/container.py b/pilot/user/atlas/container.py index 145a3aac..f4ff323b 100644 --- a/pilot/user/atlas/container.py +++ b/pilot/user/atlas/container.py @@ -20,11 +20,14 @@ # - Paul Nilsson, paul.nilsson@cern.ch, 2017-23 # - Alexander Bogdanchikov, Alexander.Bogdanchikov@cern.ch, 2019-20 +import fcntl import json +import logging import os import pipes import re -import logging +import subprocess +import time from typing import Any, Callable # for user container test: import urllib @@ -766,19 +769,22 @@ def container_wrapper(cmd, workdir, job=None): return cmd -def create_root_container_command(workdir: str, cmd: str) -> str: +def create_root_container_command(workdir: str, cmd: str, script: str) -> str: """ Create the container command for root. :param workdir: workdir (str) :param cmd: command to be containerised (str) + :param script: script content (str) :return: container command to be executed (str). """ command = f'cd {workdir};' - content = get_root_container_script(cmd) + # parse the 'open_file.sh' script + content = get_root_container_script(cmd, script) script_name = 'open_file.sh' - + logger.info(f'{script_name}:\n\n{content}\n\n') try: + # overwrite the 'open_file.sh' script with updated information status = write_file(os.path.join(workdir, script_name), content) except PilotException as exc: raise exc @@ -799,6 +805,81 @@ def create_root_container_command(workdir: str, cmd: str) -> str: return command +def execute_remote_file_open(path: str, python_script_timeout: int) -> (int, str): + """ + Execute the remote file open script. + + :param path: path to container script (str) + :param workdir: workdir (str) + :param python_script_timeout: timeout (int) + :return: exit code (int), stdout (str). + """ + lsetup_timeout = 600 # Timeout for 'lsetup' step + exit_code = 1 + stdout = "" + + # Start the Bash script process with non-blocking I/O + try: + process = subprocess.Popen(["bash", path], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, bufsize=0) + fcntl.fcntl(process.stdout.fileno(), fcntl.F_SETFL, os.O_NONBLOCK) # Set non-blocking + except OSError as e: + logger.warning(f"error starting subprocess: {e}") + return exit_code + + start_time = time.time() # Track execution start time + lsetup_completed = False # Flag to track completion of 'lsetup' + + while True: + # Check for timeout (once per second) + if time.time() - start_time > lsetup_timeout and not lsetup_completed: + logger.warning("timeout for 'lsetup' exceeded - killing script") + process.kill() + break + + # Try to read output without blocking (might return None) + try: + output = process.stdout.readline() # Read bytes directly + if output is not None: # Check if any output is available (not None) + output = output.decode().strip() + logger.info(output) # Print output for monitoring + + # Check for LSETUP_COMPLETED message + if output == "LSETUP_COMPLETED": + lsetup_completed = True + start_time = time.time() # Reset start time for 'python3' timeout + + stdout += output + "\n" + except BlockingIOError: + time.sleep(0.1) # No output available yet, continue the loop + continue + except (OSError, ValueError): # Catch potential errors from process.stdout + # print(f"Error reading from subprocess output: {e}") + # # Handle the error (e.g., log it, retry, exit) + # break + # logger.warning(f"error reading from subprocess output: {e}") + time.sleep(0.1) + continue + + # Timeout for python script after LSETUP_COMPLETED + if lsetup_completed and time.time() - start_time > python_script_timeout: + logger.warning("timeout for 'python3' subscript exceeded - killing script") + process.kill() + break + + # Check if script has completed normally + return_code = process.poll() + if return_code is not None: + logger.info("script execution completed with return code: {return_code}") + exit_code = return_code + break + + # Ensure process is terminated + if process.poll() is None: + process.terminate() + + return exit_code, stdout + + def fix_asetup(asetup): """ Make sure that the command returned by get_asetup() contains a trailing ;-sign. @@ -887,17 +968,16 @@ def create_middleware_container_command(job, cmd, label='stagein', proxy=True): return command -def get_root_container_script(cmd: str) -> str: +def get_root_container_script(cmd: str, script: str) -> str: """ Return the content of the root container script. :param cmd: root command (str) + :param script: script content (str) :return: script content (str). """ - content = f'date\nexport XRD_LOGLEVEL=Debug\nlsetup \'root pilot-default\'\ndate\nstdbuf -oL bash -c \"python3 {cmd}\"\nexit $?' - logger.debug(f'root setup script content:\n\n{content}\n\n') - - return content + # content = f'date\nexport XRD_LOGLEVEL=Debug\nlsetup \'root pilot-default\'\ndate\nstdbuf -oL bash -c \"python3 {cmd}\"\nexit $?' + return script.replace('REPLACE_ME_FOR_CMD', cmd) def get_middleware_container_script(middleware_container: str, cmd: str, asetup: bool = False, label: str = '') -> str: diff --git a/pilot/user/atlas/cvmfs.py b/pilot/user/atlas/cvmfs.py new file mode 100644 index 00000000..42212a33 --- /dev/null +++ b/pilot/user/atlas/cvmfs.py @@ -0,0 +1,46 @@ +#!/usr/bin/env python +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# Authors: +# - Paul Nilsson, paul.nilsson@cern.ch, 2024 + +"""User specific functions/variables related to CVMFS operations.""" + +from .setup import get_file_system_root_path + +# CVMFS mount points +cvmfs_mount_points = [ + 'CVMFS_BASE/atlas.cern.ch/repo/sw', + 'CVMFS_BASE/atlas.cern.ch/repo/ATLASLocalRootBase/logDir/lastUpdate', + 'CVMFS_BASE/atlas-condb.cern.ch/repo/conditions/logDir/lastUpdate', + 'CVMFS_BASE/atlas-nightlies.cern.ch/repo/sw/logs/lastUpdate', + 'CVMFS_BASE/sft.cern.ch/lcg/lastUpdate', + 'CVMFS_BASE/unpacked.cern.ch/logDir/lastUpdate', + 'CVMFS_BASE/sft-nightlies.cern.ch/lcg/lastUpdate', +] +# when was the last cvmfs update? +last_update_file = '/cvmfs/sft.cern.ch/lcg/lastUpdate' + + +def get_cvmfs_base_path() -> str: + """ + Return the base path for CVMFS. + + :return: base path for CVMFS (str). + """ + return get_file_system_root_path() diff --git a/pilot/user/atlas/default.cfg b/pilot/user/atlas/default.cfg index b1ff4469..2d03b8c1 100644 --- a/pilot/user/atlas/default.cfg +++ b/pilot/user/atlas/default.cfg @@ -268,10 +268,10 @@ jobs_list_file: worker_pandaids.json pandajob_file: HPCJobs.json # Name of file with worker report -workerAttributesFile: worker_attributes.json +workerattributes_file: worker_attributes.json # Name of file for declaration of stageout -StageOutnFile: event_status.dump.json +stageoutn_file: event_status.dump.json ################################ # HPC parameters diff --git a/pilot/user/atlas/jobmetrics.py b/pilot/user/atlas/jobmetrics.py index 493d36b1..6115ecdc 100644 --- a/pilot/user/atlas/jobmetrics.py +++ b/pilot/user/atlas/jobmetrics.py @@ -17,36 +17,46 @@ # under the License. # # Authors: -# - Paul Nilsson, paul.nilsson@cern.ch, 2018-23 +# - Paul Nilsson, paul.nilsson@cern.ch, 2018-24 +"""Functions for building job metrics.""" + +import logging import os import re -import logging +from typing import Any from pilot.api import analytics from pilot.common.exception import FileHandlingFailure from pilot.util.config import config from pilot.util.jobmetrics import get_job_metrics_entry -from pilot.util.features import MachineFeatures, JobFeatures -from pilot.util.filehandling import find_last_line, read_file +from pilot.util.features import ( + MachineFeatures, + JobFeatures +) +from pilot.util.filehandling import ( + find_last_line, + read_file +) from pilot.util.math import float_to_rounded_string - from .cpu import get_core_count -from .common import get_db_info, get_resimevents +from .common import ( + get_db_info, + get_resimevents +) from .utilities import get_memory_monitor_output_filename logger = logging.getLogger(__name__) -def get_job_metrics_string(job, extra={}): +def get_job_metrics_string(job: Any, extra: dict = {}) -> str: """ Get the job metrics string. - :param job: job object + :param job: job object (Any) :param extra: any extra information to be added (dict) - :return: job metrics (string). + :return: job metrics (str). """ - job_metrics = "" # report core count (will also set corecount in job object) @@ -78,7 +88,7 @@ def get_job_metrics_string(job, extra={}): job_metrics += get_job_metrics_entry("resimevents", job.resimevents) # get the max disk space used by the payload (at the end of a job) - if job.state == "finished" or job.state == "failed" or job.state == "holding": + if job.state in {"finished", "failed", "holding"}: max_space = job.get_max_workdir_size() zero = 0 @@ -114,14 +124,13 @@ def get_job_metrics_string(job, extra={}): return job_metrics -def get_trace_exit_code(workdir): +def get_trace_exit_code(workdir: str) -> str: """ Look for any rucio trace curl problems using an env var and a file. :param workdir: payload work directory (str) :return: curl exit code (str). """ - trace_exit_code = os.environ.get('RUCIO_TRACE_ERROR', '0') if trace_exit_code == '0': # look for rucio_trace_error_file in case middleware container is used @@ -137,21 +146,21 @@ def get_trace_exit_code(workdir): return trace_exit_code -def add_features(job_metrics, corecount, add=[]): +def add_features(job_metrics: str, corecount: int, add: list = []) -> str: """ - Add job and machine feature data to the job metrics if available + Add job and machine feature data to the job metrics if available. + If a non-empty add list is specified, only include the corresponding features. If empty/not specified, add all. - :param job_metrics: job metrics (string). - :param corecount: core count (int). - :param add: features to be added (list). - :return: updated job metrics (string). + :param job_metrics: job metrics (str) + :param corecount: core count (int) + :param add: features to be added (list) + :return: updated job metrics (str). """ - if job_metrics and not job_metrics.endswith(' '): job_metrics += ' ' - def add_sub_features(job_metrics, features_dic, add=[]): + def add_sub_features(features_dic, add=[]): features_str = '' for key in features_dic.keys(): if add and key not in add: @@ -176,48 +185,49 @@ def add_sub_features(job_metrics, features_dic, add=[]): logger.warning(f'cannot process hs06 machine feature: {exc} (hs06={hs06}, total_cpu={total_cpu}, corecount={corecount})') features_list = [machinefeatures, jobfeatures] for feature_item in features_list: - features_str = add_sub_features(job_metrics, feature_item, add=add) + features_str = add_sub_features(feature_item, add=add) if features_str: job_metrics += features_str return job_metrics -def add_analytics_data(job_metrics, workdir, state): +def add_analytics_data(job_metrics: str, workdir: str, state: str) -> str: """ Add the memory leak+chi2 analytics data to the job metrics. - :param job_metrics: job metrics (string). - :param workdir: work directory (string). - :param state: job state (string). - :return: updated job metrics (string). + :param job_metrics: job metrics (str) + :param workdir: work directory (str) + :param state: job state (str) + :return: updated job metrics (str). """ - path = os.path.join(workdir, get_memory_monitor_output_filename()) if os.path.exists(path): client = analytics.Analytics() # do not include tails on final update - tails = False if (state == "finished" or state == "failed" or state == "holding") else True + tails = not (state in {"finished", "failed", "holding"}) data = client.get_fitted_data(path, tails=tails) slope = data.get("slope", "") chi2 = data.get("chi2", "") + intersect = data.get("intersect", "") if slope != "": job_metrics += get_job_metrics_entry("leak", slope) if chi2 != "": job_metrics += get_job_metrics_entry("chi2", chi2) + if intersect != "": + job_metrics += get_job_metrics_entry("intersect", intersect) return job_metrics -def add_event_number(job_metrics, workdir): +def add_event_number(job_metrics: str, workdir: str) -> str: """ - Extract event number from file and add to job metrics if it exists + Extract event number from file and add to job metrics if it exists. - :param job_metrics: job metrics (string). - :param workdir: work directory (string). - :return: updated job metrics (string). + :param job_metrics: job metrics (str) + :param workdir: work directory (str) + :return: updated job metrics (str). """ - path = os.path.join(workdir, 'eventLoopHeartBeat.txt') if os.path.exists(path): last_line = find_last_line(path) @@ -231,7 +241,7 @@ def add_event_number(job_metrics, workdir): return job_metrics -def get_job_metrics(job, extra={}): +def get_job_metrics(job: Any, extra: dict = {}) -> str: """ Return a properly formatted job metrics string. The format of the job metrics string is defined by the server. It will be reported to the server during updateJob. @@ -269,7 +279,7 @@ def get_job_metrics(job, extra={}): return job_metrics -def get_number_in_string(line, pattern=r'\ done\ processing\ event\ \#(\d+)\,'): +def get_number_in_string(line: str, pattern: str = r'\ done\ processing\ event\ \#(\d+)\,') -> int: """ Extract a number from the given string. @@ -277,11 +287,10 @@ def get_number_in_string(line, pattern=r'\ done\ processing\ event\ \#(\d+)\,'): done processing event #20166959, run #276689 22807 events read so far <<<=== This function will return 20166959 as in int. - :param line: line from a file (string). - :param pattern: reg ex pattern (raw string). + :param line: line from a file (str) + :param pattern: reg ex pattern (raw str) :return: extracted number (int). """ - event_number = None match = re.search(pattern, line) if match: diff --git a/pilot/user/atlas/setup.py b/pilot/user/atlas/setup.py index c2616383..1d6e1f1c 100644 --- a/pilot/user/atlas/setup.py +++ b/pilot/user/atlas/setup.py @@ -17,7 +17,7 @@ # under the License. # # Authors: -# - Paul Nilsson, paul.nilsson@cern.ch, 2017-23 +# - Paul Nilsson, paul.nilsson@cern.ch, 2017-24 import os import re @@ -31,7 +31,7 @@ from pilot.util.auxiliary import find_pattern_in_list from pilot.util.container import execute from pilot.util.filehandling import read_file, write_file, copy, head - +from pilot.util.https import download_file from .metadata import get_file_info_from_xml import logging @@ -276,7 +276,6 @@ def get_analysis_trf(transform, workdir): if not status: return errors.TRFDOWNLOADFAILURE, diagnostics, "" - logger.info("successfully downloaded script") path = os.path.join(workdir, transform_name) logger.debug(f"changing permission of {path} to 0o755") try: @@ -288,7 +287,65 @@ def get_analysis_trf(transform, workdir): return ec, diagnostics, transform_name -def download_transform(url, transform_name, workdir): +def download_transform(url: str, transform_name: str, workdir: str) -> (bool, str): + """ + Download the transform from the given url + + :param url: download URL with path to transform (str) + :param transform_name: trf name (str) + :param workdir: work directory (str) + :return: status (bool), diagnostics (str). + """ + status = False + diagnostics = "" + path = os.path.join(workdir, transform_name) + trial = 1 + max_trials = 3 + + # test if $HARVESTER_WORKDIR is set + harvester_workdir = os.environ.get('HARVESTER_WORKDIR') + if harvester_workdir is not None: + source_path = os.path.join(harvester_workdir, transform_name) + try: + copy(source_path, path) + status = True + except Exception as error: + diagnostics = f"failed to copy file {source_path} to {path} : {error}" + logger.error(diagnostics) + status = False + return status, diagnostics + + # try to download the trf a maximum of 3 times + while trial <= max_trials: + logger.info(f"downloading file {transform_name} [trial {trial}/{max_trials}]") + + content = download_file(url) + with open(path, "wb+") as _file: # note: binary mode, so no encoding is needed (or, encoding=None) + if content: + _file.write(content) + logger.info(f'saved data from \"{url}\" resource into file {path}, ' + f'length={len(content) / 1024.:.1f} kB') + status = True + + if not status: + # Analyze exit code / output + diagnostics = f'no data was downloaded from {url}' + logger.warning(diagnostics) + if trial == max_trials: + logger.fatal(f'could not download transform: {transform_name}') + break + else: + logger.info("will try again after 60 s") + sleep(60) + else: + logger.info(f"transform {transform_name} downloaded") + break + trial += 1 + + return status, diagnostics + + +def download_transform_old(url, transform_name, workdir): """ Download the transform from the given url :param url: download URL with path to transform (string). diff --git a/pilot/user/atlas/utilities.py b/pilot/user/atlas/utilities.py index ca43c18b..d5cea6ea 100644 --- a/pilot/user/atlas/utilities.py +++ b/pilot/user/atlas/utilities.py @@ -479,6 +479,12 @@ def get_memory_monitor_info(workdir, allowtxtfile=False, name=""): # noqa: C901 logger.warning("standard memory fields were not found in prmon json (or json doesn't exist yet)") else: logger.info("extracted standard memory fields from prmon json") + try: + node['GPU'] = summary_dictionary['HW']['gpu'] + except Exception: + logger.warning("GPU info not found in prmon json") + else: + logger.info("GPU info extracted from prmon json") else: logger.warning('unknown memory monitor version') else: diff --git a/pilot/user/rubin/loopingjob_definitions.py b/pilot/user/rubin/loopingjob_definitions.py index 782b0fd5..664b1744 100644 --- a/pilot/user/rubin/loopingjob_definitions.py +++ b/pilot/user/rubin/loopingjob_definitions.py @@ -17,7 +17,7 @@ # under the License. # # Authors: -# - Paul Nilsson, paul.nilsson@cern.ch, 2018-23 +# - Paul Nilsson, paul.nilsson@cern.ch, 2018-24 def allow_loopingjob_detection(): @@ -50,6 +50,7 @@ def remove_unwanted_files(workdir, files): "pilotlog" in _file or ".lib.tgz" in _file or ".py" in _file or + "memory_" in _file or "pandaJob" in _file): _files.append(_file) diff --git a/pilot/util/auxiliary.py b/pilot/util/auxiliary.py index 6fdf6790..616c0964 100644 --- a/pilot/util/auxiliary.py +++ b/pilot/util/auxiliary.py @@ -191,6 +191,7 @@ def get_error_code_translation_dictionary() -> dict: errors.MISSINGINPUTFILE: [77, "Missing input file in SE"], # should pilot report this type of error to wrapper? errors.PANDAQUEUENOTACTIVE: [78, "PanDA queue is not active"], errors.COMMUNICATIONFAILURE: [79, "PanDA server communication failure"], + errors.CVMFSISNOTALIVE: [64, "CVMFS is not responding"], # same exit code as site offline errors.KILLSIGNAL: [137, "General kill signal"], # Job terminated by unknown kill signal errors.SIGTERM: [143, "Job killed by signal: SIGTERM"], # 128+15 errors.SIGQUIT: [131, "Job killed by signal: SIGQUIT"], # 128+3 @@ -782,3 +783,16 @@ def __str__(self): """Set and return the error string for string representation of the class instance.""" tmp = f' : {repr(self.args)}' if self.args else '' return f"{self.__class__.__name__}: {self.message}, timeout={self.timeout} seconds{tmp}" + + +def correct_none_types(data_dict: dict) -> dict: + """ + Correct None types in the given dictionary. + + :param data_dict: dictionary with None strings (dict) + :return: dictionary with corrected None types (dict). + """ + for key, value in data_dict.items(): + if value == 'None' or value == 'null': + data_dict[key] = None + return data_dict diff --git a/pilot/util/constants.py b/pilot/util/constants.py index 90d45790..19f8bf94 100644 --- a/pilot/util/constants.py +++ b/pilot/util/constants.py @@ -18,7 +18,7 @@ # # Authors # - Mario Lassnig, mario.lassnig@cern.ch, 2017 -# - Paul Nilsson, paul.nilsson@cern.ch, 2018-23 +# - Paul Nilsson, paul.nilsson@cern.ch, 2018-24 """Constamts.""" @@ -27,8 +27,8 @@ # Pilot version RELEASE = '3' # released number should be fixed at 3 for Pilot 3 VERSION = '7' # version number is '1' for first release, '0' until then, increased for bigger updates -REVISION = '2' # revision number should be reset to '0' for every new version release, increased for small updates -BUILD = '4' # build number should be reset to '1' for every new development cycle +REVISION = '3' # revision number should be reset to '0' for every new version release, increased for small updates +BUILD = '84' # build number should be reset to '1' for every new development cycle SUCCESS = 0 FAILURE = 1 diff --git a/pilot/util/cvmfs.py b/pilot/util/cvmfs.py new file mode 100644 index 00000000..23d9560b --- /dev/null +++ b/pilot/util/cvmfs.py @@ -0,0 +1,135 @@ +#!/usr/bin/env python +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# Authors: +# - Paul Nilsson, paul.nilsson@cern.ch, 2024 + +"""Functions related to CVMFS operations.""" + +import logging +import os +import signal +import time +import types + +logger = logging.getLogger(__name__) + + +class TimeoutException(Exception): + """Timeout exception.""" + pass + + +def timeout_handler(signum: int, frame: types.FrameType) -> None: + """Timeout handler.""" + raise TimeoutException + + +signal.signal(signal.SIGALRM, timeout_handler) + + +def is_cvmfs_available() -> bool or None: + """ + Check if CVMFS is available. + + :return: True if CVMFS is available, False if not available, None if user cvmfs module not implemented. + """ + pilot_user = os.environ.get('PILOT_USER', 'generic').lower() + try: + user = __import__(f'pilot.user.{pilot_user}.cvmfs', globals(), locals(), [pilot_user], 0) + except ImportError: + logger.warning('user cvmfs module does not exist - skipping cvmfs checks') + return None + + mount_points = getattr(user, 'cvmfs_mount_points', None) + if mount_points: + found_bad_mount_point = False + for mount_point in mount_points: + # update any base path + get_base_path = getattr(user, 'get_cvmfs_base_path', None) + if get_base_path: + mount_point = mount_point.replace('CVMFS_BASE', get_base_path()) + if os.path.exists(mount_point): + logger.debug(f'CVMFS is available at {mount_point}') + else: + logger.warning(f'CVMFS is not available at {mount_point}') + found_bad_mount_point = True + break + if found_bad_mount_point: + return False + else: + return True + else: + logger.warning('cvmfs_mount_points not defined in user cvmfs module') + return None + + +def get_last_update() -> int: + """ + Check the last update time from the last update file. + + :return: last update time (int). + """ + pilot_user = os.environ.get('PILOT_USER', 'generic').lower() + user = __import__(f'pilot.user.{pilot_user}.cvmfs', globals(), locals(), [pilot_user], 0) + last_update_file = getattr(user, 'last_update_file', None) + timestamp = None + if last_update_file: + if os.path.exists(last_update_file): + try: + timestamp = extract_timestamp(last_update_file) + except Exception as exc: + logger.warning(f'failed to read last update file: {exc}') + if timestamp: + now = int(time.time()) + logger.info(f'last cvmfs update on ' + f'{time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime(timestamp))} ' + f'{now - timestamp} seconds ago {timestamp}()') + else: + logger.warning(f'last update file does not exist: {last_update_file}') + else: + logger.warning('last_update_file not defined in user cvmfs module') + + return timestamp + + +def extract_timestamp(filename: str) -> int: + """ + Extract the timestamp from the last update file. + + The function will wait a maximum of 5 minutes for the file to be read. If the timeout is thrown, the function will + return -1. + + :param filename: last update file name (str). + :return: timestamp (int). + """ + signal.alarm(300) # Set the timeout to 5 minutes + timestamp = 0 + try: + with open(filename, 'r') as file: + line = file.readline() # e.g. "2024-03-18 19:43:47 | lxcvmfs145.cern.ch | 1710787427" + parts = line.split("|") + if len(parts) >= 3: + timestamp = int(parts[2].strip()) # strip() is used to remove leading/trailing white spaces + except TimeoutException: + logger.warning("timeout caught while reading last update file") + return -1 + finally: + signal.alarm(0) # Disable the alarm + + return timestamp diff --git a/pilot/util/default.cfg b/pilot/util/default.cfg index 9c18d953..dd09b570 100644 --- a/pilot/util/default.cfg +++ b/pilot/util/default.cfg @@ -291,10 +291,10 @@ jobs_list_file: worker_pandaids.json pandajob_file: HPCJobs.json # Name of file with worker report -workerAttributesFile: worker_attributes.json +workerattributes_file: worker_attributes.json # Name of file for declaration of stageout -StageOutnFile: event_status.dump.json +stageoutn_file: event_status.dump.json ################################ # HPC parameters diff --git a/pilot/util/harvester.py b/pilot/util/harvester.py index 8a9c396c..9b59a796 100644 --- a/pilot/util/harvester.py +++ b/pilot/util/harvester.py @@ -154,7 +154,7 @@ def get_event_status_file(args: Any) -> str: work_dir = args.harvester_workdir else: work_dir = os.environ['PILOT_HOME'] - event_status_file = config.Harvester.stageoutnfile + event_status_file = config.Harvester.stageoutn_file event_status_file = os.path.join(work_dir, event_status_file) logger.debug(f'event_status_file = {event_status_file}') @@ -168,17 +168,12 @@ def get_worker_attributes_file(args: Any): :param args: Pilot arguments object (Any) :return: worker attributes file name (str). """ - logger.debug(f'config.Harvester.__dict__ : {config.Harvester.__dict__}') - if args.harvester_workdir != '': work_dir = args.harvester_workdir else: work_dir = os.environ['PILOT_HOME'] - worker_attributes_file = config.Harvester.workerattributesfile - worker_attributes_file = os.path.join(work_dir, worker_attributes_file) - logger.debug(f'worker_attributes_file = {worker_attributes_file}') - return worker_attributes_file + return os.path.join(work_dir, config.Harvester.workerattributes_file) def findfile(path: str, name: str) -> str: diff --git a/pilot/util/https.py b/pilot/util/https.py index 65cf9335..0f9d1506 100644 --- a/pilot/util/https.py +++ b/pilot/util/https.py @@ -44,9 +44,12 @@ import urllib.error import urllib.parse from collections import namedtuple +from gzip import GzipFile +from io import BytesIO from re import findall from time import sleep, time from typing import Callable, Any +from urllib.parse import parse_qs from .config import config from .constants import get_pilot_version @@ -118,7 +121,6 @@ def cacert_default_location() -> Any: return f'/tmp/x509up_u{os.getuid()}' except AttributeError: logger.warning('no UID available? System not POSIX-compatible... trying to continue') - pass return None @@ -183,7 +185,7 @@ def request(url: str, data: dict = {}, plain: bool = False, secure: bool = True, Send a request using HTTPS. Sends :mailheader:`User-Agent` and certificates previously being set up by `https_setup`. - If `ssl.SSLContext` is available, uses `urllib2` as a request processor. Otherwise uses :command:`curl`. + If `ssl.SSLContext` is available, uses `urllib2` as a request processor. Otherwise, uses :command:`curl`. If ``data`` is provided, encodes it as a URL form data and sends it to the server. @@ -453,7 +455,6 @@ def send_update(update_function: str, data: dict, url: str, port: str, job: Any :param ipv: internet protocol version, IPv4 or IPv6 (str) :return: server response (dict). """ - time_before = int(time()) max_attempts = 10 attempt = 0 done = False @@ -488,29 +489,58 @@ def send_update(update_function: str, data: dict, url: str, port: str, job: Any attempt += 1 continue # send the heartbeat + res = send_request(pandaserver, update_function, data, job, ipv) + if res is not None: + done = True + attempt += 1 + if not done: + sleep(config.Pilot.update_sleep) + + return res + + +def send_request(pandaserver: str, update_function: str, data: dict, job: Any, ipv: str) -> dict or None: + """ + Send the request to the server using the appropriate method. + + :param pandaserver: PanDA server URL (str) + :param update_function: update function (str) + :param data: data dictionary (dict) + :param job: job object (Any) + :param ipv: internet protocol version (str) + :return: server response (dict or None). + """ + res = None + time_before = int(time()) + + # first try the new request2 method based on urllib. If that fails, revert to the old request method using curl + try: + res = request2(f'{pandaserver}/server/panda/{update_function}', data=data) + except Exception as exc: + logger.warning(f'exception caught in https.request(): {exc}') + logger.debug(f'type(res)={type(res)}') + if not res: + logger.warning('failed to send request using urllib based request2(), will try curl based request()') try: res = request(f'{pandaserver}/server/panda/{update_function}', data=data, ipv=ipv) except Exception as exc: logger.warning(f'exception caught in https.request(): {exc}') - else: - if res is not None: - done = True - txt = f'server {update_function} request completed in {int(time()) - time_before}s' - if job: - txt += f' for job {job.jobid}' - logger.info(txt) - # hide sensitive info - pilotsecrets = '' - if res and 'pilotSecrets' in res: - pilotsecrets = res['pilotSecrets'] - res['pilotSecrets'] = '********' - logger.info(f'server responded with: res = {res}') - if pilotsecrets: - res['pilotSecrets'] = pilotsecrets - attempt += 1 - if not done: - sleep(config.Pilot.update_sleep) + if res: + txt = f'server {update_function} request completed in {int(time()) - time_before}s' + if job: + txt += f' for job {job.jobid}' + logger.info(txt) + # hide sensitive info + pilotsecrets = '' + if res and 'pilotSecrets' in res: + pilotsecrets = res['pilotSecrets'] + res['pilotSecrets'] = '********' + logger.info(f'server responded with: res = {res}') + if pilotsecrets: + res['pilotSecrets'] = pilotsecrets + else: + logger.warning(f'server {update_function} request failed both with urllib and curl') return res @@ -628,51 +658,15 @@ def get_server_command(url: str, port: str, cmd: str = 'getJob') -> str: return f'{url}/server/panda/{cmd}' -def request2_bad(url: str, data: dict = {}) -> str: - """ - Send a request using HTTPS. - - :param url: the URL of the resource (str) - :param data: data to send (dict) - :return: server response (str). - """ - - # convert the dictionary to a JSON string - data_json = json.dumps(data).encode('utf-8') - - # Create an SSLContext object - ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) - logger.debug(f'capath={_ctx.capath}') - logger.debug(f'cacert={_ctx.cacert}') - ssl_context.load_verify_locations(_ctx.capath) - ssl_context.load_cert_chain(_ctx.cacert) - # define additional headers - headers = { - "Content-Type": "application/json", - "User-Agent": _ctx.user_agent, - } - - # create a request object with the SSL context - request = urllib.request.Request(url, data=data_json, headers=headers, method='POST') - - # perform the HTTP request with the SSL context - try: - response = urllib.request.urlopen(request, context=ssl_context) - ret = response.read().decode('utf-8') - except (urllib.error.URLError, urllib.error.HTTPError) as exc: - logger.warning(f'failed to send request: {exc}') - ret = "" - - return ret - - -def request2(url: str, data: dict = {}) -> str: +def request2(url: str = "", data: dict = {}, secure: bool = True, compressed: bool = True) -> str or dict: """ Send a request using HTTPS (using urllib module). :param url: the URL of the resource (str) :param data: data to send (dict) - :return: server response (str). + :param secure: use secure connection (bool) + :param compressed: compress data (bool) + :return: server response (str or dict). """ # https might not have been set up if running in a [middleware] container if not _ctx.cacert: @@ -686,34 +680,76 @@ def request2(url: str, data: dict = {}) -> str: } logger.debug(f'headers={headers}') - - # Encode data as JSON - data_json = json.dumps(data).encode('utf-8') - #data_json = urllib.parse.quote(json.dumps(data)) - #data_json = data_json.encode('utf-8') - - logger.debug(f'data_json={data_json}') + logger.info(f'data = {data}') + + # Encode data as compressed JSON + if compressed: + rdata_out = BytesIO() + with GzipFile(fileobj=rdata_out, mode="w") as f_gzip: + f_gzip.write(json.dumps(data).encode()) + data_json = rdata_out.getvalue() + else: + data_json = json.dumps(data).encode('utf-8') + #data_json = urllib.parse.quote(json.dumps(data)) + #data_json = data_json.encode('utf-8') + #data_json = urllib.parse.urlencode(data).encode() # Set up the request req = urllib.request.Request(url, data_json, headers=headers) # Create a context with certificate verification - logger.debug(f'cacert={_ctx.cacert}') # /alrb/x509up_u25606_prod - logger.debug(f'capath={_ctx.capath}') # /cvmfs/atlas.cern.ch/repo/ATLASLocalRootBase/etc/grid-security-emi/certificates + #logger.debug(f'cacert={_ctx.cacert}') # /alrb/x509up_u25606_prod + #logger.debug(f'capath={_ctx.capath}') # /cvmfs/atlas.cern.ch/repo/ATLASLocalRootBase/etc/grid-security-emi/certificates #context = ssl.create_default_context(cafile=_ctx.cacert, capath=_ctx.capath) #logger.debug(f'context={context}') - ssl_context = ssl.create_default_context(capath=_ctx.capath, cafile=_ctx.cacert) + # should be + # ssl_context = ssl.SSLContext(protocol=ssl.PROTOCOL_TLS_CLIENT) + # but it doesn't work, so use this for now even if it throws a deprecation warning + logger.info(f'ssl.OPENSSL_VERSION_INFO={ssl.OPENSSL_VERSION_INFO}') + try: # for ssl version 3.0 and python 3.10+ + # ssl_context = ssl.SSLContext(protocol=ssl.PROTOCOL_TLS_CLIENT) + ssl_context = ssl.SSLContext(protocol=None) + except Exception: # for ssl version 1.0 + ssl_context = ssl.SSLContext() + + #ssl_context.verify_mode = ssl.CERT_REQUIRED + ssl_context.load_cert_chain(certfile=_ctx.cacert, keyfile=_ctx.cacert) + + if not secure: + ssl_context.verify_mode = False + ssl_context.check_hostname = False + + # ssl_context = ssl.create_default_context(capath=_ctx.capath, cafile=_ctx.cacert) # Send the request securely try: + logger.debug('sending data to server') with urllib.request.urlopen(req, context=ssl_context) as response: # Handle the response here - logger.debug(response.status, response.reason) - logger.debug(response.read().decode('utf-8')) + logger.debug(f"response.status={response.status}, response.reason={response.reason}") ret = response.read().decode('utf-8') + if 'getProxy' not in url: + logger.debug(f"response={ret}") + logger.debug('sent request to server') except (urllib.error.URLError, urllib.error.HTTPError, TimeoutError) as exc: logger.warning(f'failed to send request: {exc}') ret = "" + else: + if secure and isinstance(ret, str): + if ret.startswith('{') and ret.endswith('}'): + logger.debug('loading string into dictionary') + try: + ret = json.loads(ret) + except Exception as e: + logger.warning(f'failed to parse response: {e}') + else: + logger.debug('parsing string into dictionary') + # For panda server interactions, the response should be in dictionary format + # Parse the query string into a dictionary + query_dict = parse_qs(ret) + + # Convert lists to single values + ret = {k: v[0] if len(v) == 1 else v for k, v in query_dict.items()} return ret @@ -762,3 +798,68 @@ def request3(url: str, data: dict = {}) -> str: ret = "" return ret + + +def upload_file(url: str, path: str) -> bool: + """ + Upload the contents of the given JSON file to the given URL. + + :param url: server URL (str) + :param path: path to the file (str) + :return: True if success, False otherwise (bool). + """ + status = False + # Define headers + headers = { + "Content-Type": "application/json" + } + + # Read file contents + with open(path, 'rb') as file: + file_content = file.read() + + # Define request object + request = urllib.request.Request(url, data=file_content, headers=headers, method='POST') + + # Set timeouts + request.timeout = 20 + request.socket_timeout = 120 + + # Perform the request + ret = 'notok' + try: + with urllib.request.urlopen(request) as response: + response_data = response.read() + # Handle response + ret = response_data.decode('utf-8') + except urllib.error.URLError as e: + # Handle URL errors + logger.warning("URL Error:", e) + ret = e + + if ret == 'ok': + status = True + else: + logger.warning(f'failed to send data to {url}: response={ret}') + + return status + + +def download_file(url: str, _timeout: int = 20) -> str: + """ + Download url content. + + :param url: url (str) + :return: url content (str). + """ + req = urllib.request.Request(url) + req.add_header('User-Agent', ctx.user_agent) + try: + with urllib.request.urlopen(req, context=ctx.ssl_context, timeout=_timeout) as response: + content = response.read() + except urllib.error.URLError as exc: + logger.warning(f"error occurred with urlopen: {exc.reason}") + # Handle the error, set content to None or handle as needed + content = "" + + return content diff --git a/pilot/util/loopingjob.py b/pilot/util/loopingjob.py index 25fa7934..74c7fad6 100644 --- a/pilot/util/loopingjob.py +++ b/pilot/util/loopingjob.py @@ -93,8 +93,13 @@ def looping_job(job: Any, montime: Any) -> (int, str): # correct for job suspension if detected time_since_job_suspension = time_since_suspension() if time_since_job_suspension: - logger.info(f'looping job killer adjusting for job suspension: {time_since_job_suspension} s (adding to time_last_touched))') - time_last_touched += time_since_job_suspension + # if there was no measurement, the time_last_touched will be zero + if not time_last_touched: + logger.warning('no time_last_touched measurement found (setting to 0)') + return 0, "" + else: + logger.info(f'looping job killer adjusting for job suspension: {time_since_job_suspension} s (adding to time_last_touched))') + time_last_touched += time_since_job_suspension # the payload process is considered to be looping if it's files have not been touched within looping_limit time if time_last_touched: diff --git a/pilot/util/middleware.py b/pilot/util/middleware.py index 97ff16a6..2ded96c0 100644 --- a/pilot/util/middleware.py +++ b/pilot/util/middleware.py @@ -82,40 +82,39 @@ def containerise_general_command(job, container_options, label='command', contai logger.debug(f'{label} script returned exit_code={exit_code}') -def containerise_middleware(job, xdata, queue, eventtype, localsite, remotesite, container_options, external_dir, - label='stage-in', container_type='container', rucio_host=''): +def containerise_middleware(job, args, xdata, eventtype, localsite, remotesite, container_options, + label='stage-in', container_type='container'): """ Containerise the middleware by performing stage-in/out steps in a script that in turn can be run in a container. Note: a container will only be used for option container_type='container'. If this is 'bash', then stage-in/out will still be done by a script, but not containerised. - Note: this function is tailor made for stage-in/out. + Note: this function is tailormade for stage-in/out. :param job: job object. - :param xdata: list of FileSpec objects. - :param queue: queue name (string). + :param args: command line arguments (dict) + :param xdata: list of FileSpec objects :param eventtype: :param localsite: :param remotesite: - :param container_options: container options from queuedata (string). - :param external_dir: input or output files directory (string). - :param label: optional 'stage-in/out' (string). - :param container_type: optional 'container/bash' (string). - :param rucio_host: optiona rucio host (string). + :param container_options: container options from queuedata (str) + :param label: optional 'stage-in/out' (str) + :param container_type: optional 'container/bash' (str) :raises StageInFailure: for stage-in failures :raises StageOutFailure: for stage-out failures :return: """ cwd = getcwd() + external_dir = args.input_dir if label == 'stage-in' else args.output_dir # get the name of the stage-in/out isolation script script = config.Container.middleware_container_stagein_script if label == 'stage-in' else config.Container.middleware_container_stageout_script try: - cmd = get_command(job, xdata, queue, script, eventtype, localsite, remotesite, external_dir, label=label, - container_type=container_type, rucio_host=rucio_host) + cmd = get_command(job, xdata, args.queue, script, eventtype, localsite, remotesite, external_dir, label=label, + container_type=container_type, rucio_host=args.rucio_host) except PilotException as exc: raise exc diff --git a/pilot/util/proxy.py b/pilot/util/proxy.py index d2f511f0..703d79c9 100644 --- a/pilot/util/proxy.py +++ b/pilot/util/proxy.py @@ -17,7 +17,7 @@ # under the License. # # Authors: -# - Paul Nilsson, paul.nilsson@cern.ch, 2017-23 +# - Paul Nilsson, paul.nilsson@cern.ch, 2017-24 import logging import os @@ -32,14 +32,14 @@ logger = logging.getLogger(__name__) -def get_distinguished_name(): +def get_distinguished_name() -> str: """ Get the user DN. + Note: the DN is also sent by the server to the pilot in the job description (produserid). - :return: User DN (string). + :return: User DN (str). """ - dn = "" executable = 'arcproxy -i subject' exit_code, stdout, stderr = execute(executable) @@ -86,15 +86,16 @@ def vomsproxyinfo(options='-all', mute=False, path=''): return exit_code, stdout, stderr -def get_proxy(proxy_outfile_name, voms_role): +def get_proxy(proxy_outfile_name: str, voms_role: str) -> (bool, str): """ Download and store a proxy. + E.g. on read-only file systems (K8), the default path is not available, in which case the new proxy will be stored in the workdir (return the updated path). - :param proxy_outfile_name: specify the file to store proxy (string). - :param voms_role: what proxy (role) to request, e.g. 'atlas' (string). - :return: result (Boolean), updated proxy path (string). + :param proxy_outfile_name: specify the file to store proxy (str) + :param voms_role: what proxy (role) to request, e.g. 'atlas' (str) + :return: result (Boolean), updated proxy path (str). """ try: # it assumes that https_setup() was done already @@ -104,10 +105,13 @@ def get_proxy(proxy_outfile_name, voms_role): user = __import__('pilot.user.%s.proxy' % pilot_user, globals(), locals(), [pilot_user], 0) data = user.getproxy_dictionary(voms_role) - res = https.request('{pandaserver}/server/panda/getProxy'.format(pandaserver=url), data=data) + res = https.request2(f'{url}/server/panda/getProxy', data=data) if res is None: - logger.error(f"unable to get proxy with role '{voms_role}' from panda server") - return False, proxy_outfile_name + logger.error(f"unable to get proxy with role '{voms_role}' from panda server using urllib method") + res = https.request('{url}/server/panda/getProxy', data=data) + if res is None: + logger.error(f"unable to get proxy with role '{voms_role}' from panda server using curl method") + return False, proxy_outfile_name if res['StatusCode'] != 0: logger.error(f"panda server returned: \'{res['errorDialog']}\' for proxy role \'{voms_role}\'") @@ -118,10 +122,6 @@ def get_proxy(proxy_outfile_name, voms_role): except Exception as exc: logger.error(f"Get proxy from panda server failed: {exc}, {traceback.format_exc()}") return False, proxy_outfile_name - else: - # dump voms-proxy-info -all to log - if res and res['StatusCode'] == 0: - _, _, _ = vomsproxyinfo(options='-all', path=proxy_outfile_name) def create_file(filename, contents): """ @@ -134,7 +134,7 @@ def create_file(filename, contents): result = False try: # pre-create empty proxy file with secure permissions. Prepare it for write_file() which can not - # set file permission mode, it will writes to the existing file with correct permissions. + # set file permission mode, it will write to the existing file with correct permissions. result = create_file(proxy_outfile_name, proxy_contents) except (IOError, OSError, FileHandlingFailure) as exc: logger.error(f"exception caught:\n{exc},\ntraceback: {traceback.format_exc()}") @@ -148,6 +148,9 @@ def create_file(filename, contents): else: logger.debug('updating X509_USER_PROXY to alternative path {path} (valid until end of current job)') os.environ['X509_USER_PROXY'] = proxy_outfile_name + else: + # dump voms-proxy-info -all to log + _, _, _ = vomsproxyinfo(options='-all', path=proxy_outfile_name) return result, proxy_outfile_name diff --git a/pilot/util/tracereport.py b/pilot/util/tracereport.py index 46cb3e11..9c73c6be 100644 --- a/pilot/util/tracereport.py +++ b/pilot/util/tracereport.py @@ -25,15 +25,16 @@ import socket import time from sys import exc_info -from json import dumps +from json import dumps, loads from os import environ, getuid from pilot.common.exception import FileHandlingFailure +from pilot.util.auxiliary import correct_none_types from pilot.util.config import config from pilot.util.constants import get_pilot_version, get_rucio_client_version from pilot.util.container import execute, execute2 from pilot.util.filehandling import append_to_file, write_file -# from pilot.util.https import request3 +from pilot.util.https import request2 import logging logger = logging.getLogger(__name__) @@ -115,7 +116,7 @@ def init(self, job): try: self['hostname'] = socket.gethostbyaddr(hostname)[0] - except socket.herror as exc: + except (socket.gaierror, socket.herror) as exc: logger.warning(f'unable to detect hostname by address for trace report: {exc}') self['hostname'] = 'unknown' @@ -192,20 +193,36 @@ def send(self): err = None try: # take care of the encoding - data = dumps(self).replace('"', '\\"') + data = dumps(self).replace('"', '\\"') # for curl + data_urllib = dumps(self) # for urllib # remove the ipv and workdir items since they are for internal pilot use only data = data.replace(f'\"ipv\": \"{self.ipv}\", ', '') data = data.replace(f'\"workdir\": \"{self.workdir}\", ', '') + try: + data_urllib = data_urllib.replace(f'\"ipv\": \"{self.ipv}\", ', '') + data_urllib = data_urllib.replace(f'\"workdir\": \"{self.workdir}\", ', '') + except Exception as e: + logger.warning(f'failed to remove ipv and workdir from data_urllib: {e}') + logger.debug(f'data (type={type(data)})={data}') + logger.debug(f'data_urllib (type={type(data_urllib)})={data_urllib}') + # send the trace report using the new request2 function + # must convert data to a dictionary and make sure None values are kept + data_str_urllib = data_urllib.replace('None', '\"None\"') + data_str_urllib = data_str_urllib.replace('null', '\"None\"') + logger.debug(f'data_str_urllib={data_str_urllib}') + data_dict = loads(data_str_urllib) # None values will now be 'None'-strings + data_dict = correct_none_types(data_dict) + logger.debug(f'data_dict={data_dict}') + ret = request2(url=url, data=data_dict, secure=False, compressed=False) + logger.info(f'received: {ret}') + if ret: + logger.info("tracing report sent") + return True + else: + logger.warning("failed to send tracing report - using old curl command") ssl_certificate = self.get_ssl_certificate() - #ret = request3(url, data) - #if ret: - # logger.info("tracing report sent") - # return True - #else: - # logger.warning("failed to send tracing report - using old curl command") - # create the command command = 'curl' if self.ipv == 'IPv4': @@ -228,11 +245,14 @@ def send(self): # handle errors that only appear in stdout/err (curl) if not exit_code: out, err = self.get_trace_curl_files(outname, errname, mode='r') - exit_code = self.assign_error(out) - if not exit_code: - exit_code = self.assign_error(err) - logger.debug(f'curl exit_code from stdout/err={exit_code}') - self.close(out, err) + if out: + exit_code = self.assign_error(out) + if not exit_code: + exit_code = self.assign_error(err) + logger.debug(f'curl exit_code from stdout/err={exit_code}') + self.close(out, err) + else: + logger.warning(f'failed to open curl stdout file: {outname}') if not exit_code: logger.info('no errors were detected from curl operation') else: @@ -297,9 +317,9 @@ def get_trace_curl_filenames(self, name='trace_curl'): :param name: name pattern (str) :return: stdout file name (str), stderr file name (str). """ - - workdir = self.workdir if self.workdir else os.getcwd() - return os.path.join(workdir, f'{name}.stdout'), os.path.join(workdir, f'{name}.stderr') + #workdir = self.workdir if self.workdir else os.getcwd() + #return os.path.join(workdir, f'{name}.stdout'), os.path.join(workdir, f'{name}.stderr') + return f'{name}.stdout', f'{name}.stderr' def get_trace_curl_files(self, outpath, errpath, mode='wb'): """ diff --git a/pilot/workflow/generic_hpc.py b/pilot/workflow/generic_hpc.py index adfaa969..98d2c2c4 100644 --- a/pilot/workflow/generic_hpc.py +++ b/pilot/workflow/generic_hpc.py @@ -81,8 +81,8 @@ def run(args): else: communication_point = os.getcwd() work_report = get_initial_work_report() - worker_attributes_file = config.Harvester.workerAttributesFile - worker_stageout_declaration = config.Harvester.StageOutnFile + worker_attributes_file = config.Harvester.workerattributes_file + worker_stageout_declaration = config.Harvester.stageoutn_file payload_report_file = config.Payload.jobreport payload_stdout_file = config.Payload.payloadstdout payload_stderr_file = config.Payload.payloadstderr