From 295210a91e0b73733bf33917876d13ccf4c9986d Mon Sep 17 00:00:00 2001 From: Alexey Anisenkov Date: Mon, 9 Sep 2024 16:00:00 +0700 Subject: [PATCH 01/15] Implemented alternative stage-out (altstageout) to different RSE specified in PQ.astorages on primary stage-out transfer failures (for output and log files) --- pilot/api/data.py | 26 +++++++++++++++++--------- pilot/control/data.py | 33 +++++++++++++++++++++++---------- pilot/info/filespec.py | 7 +++++++ pilot/info/jobdata.py | 35 ++++++++++++++++++++++++++++++++++- pilot/info/queuedata.py | 15 +++++++++++++++ 5 files changed, 96 insertions(+), 20 deletions(-) diff --git a/pilot/api/data.py b/pilot/api/data.py index 3305b47f..b5b96005 100644 --- a/pilot/api/data.py +++ b/pilot/api/data.py @@ -87,8 +87,7 @@ def __init__(self, default_copytools: str = 'rucio', trace_report: dict = None, ipv: str = 'IPv6', - workdir: str = "", - altstageout: str = None): + workdir: str = ""): """ Set default/init values. @@ -113,7 +112,6 @@ def __init__(self, self.infosys = infosys_instance or infosys self.ipv = ipv self.workdir = workdir - self.altstageout = altstageout if isinstance(acopytools, str): acopytools = {'default': [acopytools]} if acopytools else {} @@ -522,7 +520,7 @@ def transfer_files(self, copytool: Any, files: list, activity: list, **kwargs: d """ raise NotImplementedError() - def transfer(self, files: list, activity: list or str = 'default', **kwargs: dict) -> list: # noqa: C901 + def transfer(self, files: list, activity: list or str = 'default', raise_exception: bool = True, **kwargs: dict) -> list: # noqa: C901 """ Perform file transfer. @@ -530,8 +528,9 @@ def transfer(self, files: list, activity: list or str = 'default', **kwargs: dic :param files: list of `FileSpec` objects (list) :param activity: list of activity names used to determine appropriate copytool (list or str) + :param raise_exception: boolean flag used to ignore transfer errors :param kwargs: extra kwargs to be passed to copytool transfer handler (dict) - :raise: PilotException in case of controlled error + :raise: PilotException in case of controlled error if `raise_exception` is `True` :return: list of processed `FileSpec` objects (list). """ self.trace_report.update(relativeStart=time.time(), transferStart=time.time()) @@ -548,7 +547,8 @@ def transfer(self, files: list, activity: list or str = 'default', **kwargs: dic break if not copytools: - raise PilotException(f'failed to resolve copytool by preferred activities={activity}, acopytools={self.acopytools}') + raise PilotException(f'failed to resolve copytool by preferred activities={activity}, acopytools={self.acopytools}', + code=ErrorCodes.UNKNOWNCOPYTOOL) # populate inputddms if needed self.prepare_inputddms(files) @@ -1109,20 +1109,28 @@ def prepare_destinations(self, files: list, activities: list or str) -> list: # take the fist choice for now, extend the logic later if need ddm = storages[0] + ddm_alt = storages[1] if len(storages) > 1 else None self.logger.info(f"[prepare_destinations][{activity}]: allowed (local) destinations: {storages}") - self.logger.info(f"[prepare_destinations][{activity}]: resolved default destination ddm={ddm}") + self.logger.info(f"[prepare_destinations][{activity}]: resolved default destination: ddm={ddm}, ddm_alt={ddm_alt}") for e in files: if not e.ddmendpoint: # no preferences => use default destination self.logger.info("[prepare_destinations][%s]: fspec.ddmendpoint is not set for lfn=%s" - " .. will use default ddm=%s as (local) destination", activity, e.lfn, ddm) + " .. will use default ddm=%s as (local) destination; ddm_alt=%s", activity, e.lfn, ddm, ddm_alt) e.ddmendpoint = ddm + e.ddmendpoint_alt = ddm_alt elif e.ddmendpoint not in storages: # fspec.ddmendpoint is not in associated storages => assume it as final (non local) alternative destination self.logger.info("[prepare_destinations][%s]: Requested fspec.ddmendpoint=%s is not in the list of allowed (local) destinations" " .. will consider default ddm=%s for transfer and tag %s as alt. location", activity, e.ddmendpoint, ddm, e.ddmendpoint) e.ddmendpoint = ddm - e.ddmendpoint_alt = e.ddmendpoint # consider me later + e.ddmendpoint_alt = e.ddmendpoint + else: # set corresponding ddmendpoint_alt if exist (next entry in available storages list) + cur = storages.index(e.ddmendpoint) + ddm_next = storages[cur + 1] if (cur + 1) < len(storages) else storages[0] # cycle storages, take the first elem when reach end + e.ddmendpoint_alt = ddm_next if e.ddmendpoint != ddm_next else None + self.logger.info("[prepare_destinations][%s]: set ddmendpoint_alt=%s for fspec.ddmendpoint=%s", + activity, e.ddmendpoint_alt, e.ddmendpoint) return files diff --git a/pilot/control/data.py b/pilot/control/data.py index 3c76a9b9..712903de 100644 --- a/pilot/control/data.py +++ b/pilot/control/data.py @@ -909,14 +909,31 @@ def _do_stageout(job: JobData, args: object, xdata: list, activity: list, title: # create the trace report trace_report = create_trace_report(job, label=label) - client = StageOutClient(job.infosys, logger=logger, trace_report=trace_report, ipv=ipv, workdir=job.workdir, altstageout=job.altstageout) + client = StageOutClient(job.infosys, logger=logger, trace_report=trace_report, ipv=ipv, workdir=job.workdir) kwargs = {'workdir': job.workdir, 'cwd': job.workdir, 'usecontainer': False, 'job': job, 'output_dir': args.output_dir, 'catchall': job.infosys.queuedata.catchall, 'rucio_host': args.rucio_host} #, mode='stage-out') + is_unified = job.infosys.queuedata.type == 'unified' # prod analy unification: use destination preferences from PanDA server for unified queues - if job.infosys.queuedata.type != 'unified': + if not is_unified: client.prepare_destinations(xdata, activity) ## FIX ME LATER: split activities: for astorages and for copytools (to unify with ES workflow) - client.transfer(xdata, activity, **kwargs) + + altstageout = not is_unified and job.allow_altstageout() # do not use alt stage-out for unified queues + client.transfer(xdata, activity, raise_exception=not altstageout, **kwargs) + remain_files = [entry for entry in xdata if entry.require_transfer()] + # check if alt stageout can be applied (all remain files must have alt storage declared ddmendpoint_alt) + has_altstorage = all(entry.ddmendpoint_alt and entry.ddmendpoint != entry.ddmendpoint_alt for entry in remain_files) + + logger.info('alt stage-out settings[%s]: is_unified=%s, altstageout=%s, remain_files=%s, has_altstorage=%s', + activity, is_unified, len(remain_files), has_altstorage) + + if altstageout and remain_files and has_altstorage: # apply alternative stageout for failed transfers + for entry in remain_files: + entry.ddmendpoint = entry.ddmendpoint_alt + entry.ddmendpoint_alt = None + + client.transfer(xdata, activity, **kwargs) + except PilotException as error: error_msg = traceback.format_exc() logger.error(error_msg) @@ -936,14 +953,10 @@ def _do_stageout(job: JobData, args: object, xdata: list, activity: list, title: logger.info(f'switched back proxy on unified dispatch queue: X509_USER_PROXY={x509_org} (reset X509_UNIFIED_DISPATCH)') logger.info('summary of transferred files:') - for iofile in xdata: - if not iofile.status: - status = "(not transferred)" - else: - status = iofile.status - logger.info(" -- lfn=%s, status_code=%s, status=%s", iofile.lfn, iofile.status_code, status) + for entry in xdata: + logger.info(" -- lfn=%s, status_code=%s, status=%s", entry.lfn, entry.status_code, entry.status or "(not transferred)") - remain_files = [iofile for iofile in xdata if iofile.status not in ['transferred']] + remain_files = [entry for entry in xdata if entry.status not in ['transferred']] return not remain_files diff --git a/pilot/info/filespec.py b/pilot/info/filespec.py index d7e8e906..307b18c9 100644 --- a/pilot/info/filespec.py +++ b/pilot/info/filespec.py @@ -216,3 +216,10 @@ def get_storage_id_and_path_convention(self) -> (str, str): logger.info(f'storage_id: {storage_id}, path_convention: {path_convention}') return storage_id, path_convention + + def require_transfer(self) -> bool: + """ + Check if File needs to be transferred (in error state or never has been started) + """ + + return self.status not in ['remote_io', 'transferred', 'no_transfer'] diff --git a/pilot/info/jobdata.py b/pilot/info/jobdata.py index f936dbbf..2cc149be 100644 --- a/pilot/info/jobdata.py +++ b/pilot/info/jobdata.py @@ -169,7 +169,7 @@ class JobData(BaseData): dask_scheduler_ip = '' # enhanced job definition for Dask jobs jupyter_session_ip = '' # enhanced job definition for Dask jobs minramcount = 0 # minimum number of RAM required by the payload - altstageout = None # alternative stage-out method, on, off, force + altstageout = None # alternative stage-out method: boolean, on (True), off (False) # home package string with additional payload release information; does not need to be added to # the conversion function since it's already lower case homepackage = "" # home package for TRF @@ -575,6 +575,26 @@ def has_remoteio(self) -> bool: """ return any(fspec.status == 'remote_io' for fspec in self.indata) + def allow_altstageout(self): + """ + Resolve if alternative stageout is allowed for given job taking into account `queuedata` settings as well. + `queuedata` specific settings overwrites job preferences for altstageout. + + :return: boolean. + """ + + # consider first the queue specific settings (if any) + if self.infosys and self.infosys.queuedata: + qval = self.infosys.queuedata.altstageout + if qval is not None: + return qval + else: + logger.info('job.infosys.queuedata is not initialized: PandaQueue specific settings for altstageout will not be considered') + + # apply additional job specific checks here if need + + return bool(self.altstageout) + def clean(self): """ Validate and finally clean up required data values (object properties) if needed. @@ -628,6 +648,19 @@ def clean__platform(self, raw: Any, value: str) -> str: return v + def clean__altstageout(self, raw: Any, value: str) -> Any: + """ + Verify and validate value for the altstageout key. + + :param raw: raw value (Any) + :param value: parsed value (str) + :return: cleaned value (bool or None). + """ + if value == 'on': + return True + if value == 'off': + return False + def clean__jobparams(self, raw: Any, value: str) -> str: """ Verify and validate value for the jobparams key. diff --git a/pilot/info/queuedata.py b/pilot/info/queuedata.py index 3e3ee1b7..5508c260 100644 --- a/pilot/info/queuedata.py +++ b/pilot/info/queuedata.py @@ -96,6 +96,8 @@ class QueueData(BaseData): is_cvmfs = True # has cvmfs installed memkillgrace = 100 # memory kill grace value in percentage + altstageout = None # allow altstageout: force (True) or disable (False) or no preferences (None) + # specify the type of attributes for proper data validation and casting _keys = {int: ['timefloor', 'maxwdir', 'pledgedcpu', 'es_stageout_gap', 'corecount', 'maxrss', 'maxtime', 'maxinputsize', 'memkillgrace'], @@ -171,6 +173,16 @@ def resolve_allowed_schemas(self, activity: str or list, copytool: str = None) - return adat.get(copytool) or [] + def allow_altstageout(self): + """ + Resolve if alternative stageout should be forced (True) or disabled (False); None value means no preferences defined. + :return: boolean or None + """ + + val = self.params.get('allow_altstageout', None) + if val is not None: # cast to bool + return bool(val) + def clean(self): """Validate and finally clean up required data values (required object properties) if needed.""" # validate es_stageout_gap value @@ -197,6 +209,9 @@ def clean(self): self.container_options = self.container_options.replace(" --contain", ",${workdir} --contain") logger.info(f"note: added missing $workdir to container_options: {self.container_options}") + # set altstageout settings + self.altstageout = self.allow_altstageout() + ## custom function pattern to apply extra validation to the key values ##def clean__keyname(self, raw, value): ## :param raw: raw value passed from ext source as input From df9a4d9ddd21c094430ab9a30707a8775d3e06ca Mon Sep 17 00:00:00 2001 From: Paul Nilsson Date: Mon, 9 Sep 2024 16:50:40 +0200 Subject: [PATCH 02/15] New version --- PILOTVERSION | 2 +- pilot/util/constants.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/PILOTVERSION b/PILOTVERSION index 4b3244f5..bd3ffbfc 100644 --- a/PILOTVERSION +++ b/PILOTVERSION @@ -1 +1 @@ -3.8.1.66 \ No newline at end of file +3.8.2.1 \ No newline at end of file diff --git a/pilot/util/constants.py b/pilot/util/constants.py index 0a4003b3..9e37aba5 100644 --- a/pilot/util/constants.py +++ b/pilot/util/constants.py @@ -27,8 +27,8 @@ # Pilot version RELEASE = '3' # released number should be fixed at 3 for Pilot 3 VERSION = '8' # version number is '1' for first release, '0' until then, increased for bigger updates -REVISION = '1' # revision number should be reset to '0' for every new version release, increased for small updates -BUILD = '66' # build number should be reset to '1' for every new development cycle +REVISION = '2' # revision number should be reset to '0' for every new version release, increased for small updates +BUILD = '1' # build number should be reset to '1' for every new development cycle SUCCESS = 0 FAILURE = 1 From 770ebf58649bceb1ac37350238c176032a668e89 Mon Sep 17 00:00:00 2001 From: Alexey Anisenkov Date: Tue, 10 Sep 2024 15:52:46 +0700 Subject: [PATCH 03/15] typo fix --- pilot/control/data.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pilot/control/data.py b/pilot/control/data.py index 712903de..4c36d70a 100644 --- a/pilot/control/data.py +++ b/pilot/control/data.py @@ -925,7 +925,7 @@ def _do_stageout(job: JobData, args: object, xdata: list, activity: list, title: has_altstorage = all(entry.ddmendpoint_alt and entry.ddmendpoint != entry.ddmendpoint_alt for entry in remain_files) logger.info('alt stage-out settings[%s]: is_unified=%s, altstageout=%s, remain_files=%s, has_altstorage=%s', - activity, is_unified, len(remain_files), has_altstorage) + activity, is_unified, altstageout, len(remain_files), has_altstorage) if altstageout and remain_files and has_altstorage: # apply alternative stageout for failed transfers for entry in remain_files: From f3ea527db1d61f635548806927206b46a777213b Mon Sep 17 00:00:00 2001 From: Alexey Anisenkov Date: Tue, 10 Sep 2024 19:27:59 +0700 Subject: [PATCH 04/15] fixes --- pilot/api/data.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pilot/api/data.py b/pilot/api/data.py index b5b96005..23831cc6 100644 --- a/pilot/api/data.py +++ b/pilot/api/data.py @@ -633,7 +633,8 @@ def transfer(self, files: list, activity: list or str = 'default', raise_excepti code = ErrorCodes.STAGEINFAILED if self.mode == 'stage-in' else ErrorCodes.STAGEOUTFAILED # is it stage-in/out? details = str(caught_errors) + ":" + f'failed to transfer files using copytools={copytools}' self.logger.fatal(details) - raise PilotException(details, code=code) + if raise_exception: + raise PilotException(details, code=code) return files From 9d7309e07a1105c1f610f636e92b7d58c7fca6fd Mon Sep 17 00:00:00 2001 From: Alexey Anisenkov Date: Tue, 10 Sep 2024 19:42:45 +0700 Subject: [PATCH 05/15] Fix job infosys initialization --- pilot/control/job.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pilot/control/job.py b/pilot/control/job.py index d01d9b45..f9b80c25 100644 --- a/pilot/control/job.py +++ b/pilot/control/job.py @@ -2365,7 +2365,7 @@ def create_job(dispatcher_response: dict, queuename: str) -> Any: job = JobData(dispatcher_response) jobinfosys = InfoService() jobinfosys.init(queuename, infosys.confinfo, infosys.extinfo, JobInfoProvider(job)) - job.init(infosys) + job.init(jobinfosys) logger.info(f'received job: {job.jobid} (sleep until the job has finished)') From f13770adeefc2c9b9a016c65770c5e19989d0052 Mon Sep 17 00:00:00 2001 From: Alexey Anisenkov Date: Tue, 10 Sep 2024 20:27:59 +0700 Subject: [PATCH 06/15] minor fix --- pilot/api/data.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pilot/api/data.py b/pilot/api/data.py index 23831cc6..76be4e73 100644 --- a/pilot/api/data.py +++ b/pilot/api/data.py @@ -1124,8 +1124,8 @@ def prepare_destinations(self, files: list, activities: list or str) -> list: elif e.ddmendpoint not in storages: # fspec.ddmendpoint is not in associated storages => assume it as final (non local) alternative destination self.logger.info("[prepare_destinations][%s]: Requested fspec.ddmendpoint=%s is not in the list of allowed (local) destinations" " .. will consider default ddm=%s for transfer and tag %s as alt. location", activity, e.ddmendpoint, ddm, e.ddmendpoint) + e.ddmendpoint_alt = e.ddmendpoint # verify me e.ddmendpoint = ddm - e.ddmendpoint_alt = e.ddmendpoint else: # set corresponding ddmendpoint_alt if exist (next entry in available storages list) cur = storages.index(e.ddmendpoint) ddm_next = storages[cur + 1] if (cur + 1) < len(storages) else storages[0] # cycle storages, take the first elem when reach end From 97d720242a56416f07edc4d98a6f91a8507521a8 Mon Sep 17 00:00:00 2001 From: Alexey Anisenkov Date: Tue, 10 Sep 2024 22:38:05 +0700 Subject: [PATCH 07/15] report back file.ddmendpoint value for alternative staged files --- pilot/control/data.py | 22 +++++++++++++++------- pilot/info/filespec.py | 1 + 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/pilot/control/data.py b/pilot/control/data.py index 4c36d70a..506ce68a 100644 --- a/pilot/control/data.py +++ b/pilot/control/data.py @@ -924,14 +924,16 @@ def _do_stageout(job: JobData, args: object, xdata: list, activity: list, title: # check if alt stageout can be applied (all remain files must have alt storage declared ddmendpoint_alt) has_altstorage = all(entry.ddmendpoint_alt and entry.ddmendpoint != entry.ddmendpoint_alt for entry in remain_files) - logger.info('alt stage-out settings[%s]: is_unified=%s, altstageout=%s, remain_files=%s, has_altstorage=%s', + logger.info('alt stage-out settings: %s, is_unified=%s, altstageout=%s, remain_files=%s, has_altstorage=%s', activity, is_unified, altstageout, len(remain_files), has_altstorage) if altstageout and remain_files and has_altstorage: # apply alternative stageout for failed transfers for entry in remain_files: entry.ddmendpoint = entry.ddmendpoint_alt entry.ddmendpoint_alt = None + entry.is_altstaged = True + logger.info('alt stage-out will be applied for remain=%s files (previously failed)', len(remain_files)) client.transfer(xdata, activity, **kwargs) except PilotException as error: @@ -1072,12 +1074,18 @@ def generate_fileinfo(job: JobData) -> dict: """ fileinfo = {} checksum_type = config.File.checksum_type if config.File.checksum_type == 'adler32' else 'md5sum' - for iofile in job.outdata + job.logdata: - if iofile.status in {'transferred'}: - fileinfo[iofile.lfn] = {'guid': iofile.guid, - 'fsize': iofile.filesize, - f'{checksum_type}': iofile.checksum.get(config.File.checksum_type), - 'surl': iofile.turl} + for entry in job.outdata + job.logdata: + if entry.status in {'transferred'}: + dat = { + 'guid': entry.guid, + 'fsize': entry.filesize, + f'{checksum_type}': entry.checksum.get(config.File.checksum_type), + 'surl': entry.turl + } + if entry.is_altstaged: + dat['ddmendpoint'] = entry.ddmendpoint + + fileinfo[entry.lfn] = dat return fileinfo diff --git a/pilot/info/filespec.py b/pilot/info/filespec.py index 307b18c9..ef4700ed 100644 --- a/pilot/info/filespec.py +++ b/pilot/info/filespec.py @@ -77,6 +77,7 @@ class FileSpec(BaseData): is_tar = False # whether it's a tar file or not ddm_activity = None # DDM activity names (e.g. [read_lan, read_wan]) which should be used to resolve appropriate protocols from StorageData.arprotocols checkinputsize = True + is_altstaged = None # indicates if file was transferred using alternative method (altstageout) # specify the type of attributes for proper data validation and casting _keys = {int: ['filesize', 'mtime', 'status_code'], From a292eb8d8774cb6db6e2c76b9f4af33d8a9bb945 Mon Sep 17 00:00:00 2001 From: Paul Nilsson Date: Tue, 17 Sep 2024 11:53:13 +0200 Subject: [PATCH 08/15] Improved IPv6 info extraction --- PILOTVERSION | 2 +- pilot/util/constants.py | 2 +- pilot/util/networking.py | 26 +++++++++++++++++++++++--- 3 files changed, 25 insertions(+), 5 deletions(-) diff --git a/PILOTVERSION b/PILOTVERSION index bd3ffbfc..ebf6e6f6 100644 --- a/PILOTVERSION +++ b/PILOTVERSION @@ -1 +1 @@ -3.8.2.1 \ No newline at end of file +3.8.2.2 \ No newline at end of file diff --git a/pilot/util/constants.py b/pilot/util/constants.py index 9e37aba5..8c0bcef3 100644 --- a/pilot/util/constants.py +++ b/pilot/util/constants.py @@ -28,7 +28,7 @@ RELEASE = '3' # released number should be fixed at 3 for Pilot 3 VERSION = '8' # version number is '1' for first release, '0' until then, increased for bigger updates REVISION = '2' # revision number should be reset to '0' for every new version release, increased for small updates -BUILD = '1' # build number should be reset to '1' for every new development cycle +BUILD = '2' # build number should be reset to '1' for every new development cycle SUCCESS = 0 FAILURE = 1 diff --git a/pilot/util/networking.py b/pilot/util/networking.py index 5e03368d..d081b784 100644 --- a/pilot/util/networking.py +++ b/pilot/util/networking.py @@ -35,7 +35,7 @@ def dump_ipv6_info() -> None: """Dump the IPv6 info to the log.""" cmd = 'ifconfig' if not is_command_available(cmd): - _cmd = '/usr/sbin/ifconfig' + _cmd = '/usr/sbin/ifconfig -a' if not is_command_available(_cmd): logger.warning(f'command {cmd} is not available - this WN might not support IPv6') return @@ -43,15 +43,35 @@ def dump_ipv6_info() -> None: _, stdout, stderr = execute(cmd, timeout=10) if stdout: - ipv6 = extract_ipv6(stdout) + ipv6 = extract_ipv6_addresses(stdout) if ipv6: logger.info(f'IPv6 addresses: {ipv6}') else: - logger.warning('no IPv6 addresses found - this WN does not support IPv6') + logger.warning('no IPv6 addresses were found') else: logger.warning(f'failed to run ifconfig: {stderr}') +def extract_ipv6_addresses(ifconfig_output: str) -> list: + """Extracts IPv6 addresses from ifconfig output. + + Args: + ifconfig_output: The output of the ifconfig command. + + Returns: + A list of IPv6 addresses. + """ + + ipv6_addresses = [] + for line in ifconfig_output.splitlines(): + line = line.strip().replace("\t", " ").replace("\r", "").replace("\n", "") + match = re.search(r"inet6 (.*?)\s", line) + if match and match.group(1) != "::1": # skip loopback address + ipv6_addresses.append(match.group(1)) + + return ipv6_addresses + + def extract_ipv6(ifconfig: str) -> str: """ Extract the IPv6 address from the ifconfig output. From 3e6ef76c0514fa85c043d328471717efc767fc3c Mon Sep 17 00:00:00 2001 From: PalNilsson Date: Wed, 18 Sep 2024 11:01:16 +0200 Subject: [PATCH 09/15] Spelling correction. Debug info --- pilot/api/data.py | 2 +- pilot/control/data.py | 6 +++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/pilot/api/data.py b/pilot/api/data.py index 3305b47f..a0748044 100644 --- a/pilot/api/data.py +++ b/pilot/api/data.py @@ -133,7 +133,7 @@ def __init__(self, self.trace_report = trace_report if trace_report else TraceReport(pq=os.environ.get('PILOT_SITENAME', ''), ipv=self.ipv, workdir=self.workdir) if not self.acopytools: - msg = f'failed to initilize StagingClient: no acopytools options found, acopytools={self.acopytools}' + msg = f'failed to initialize StagingClient: no acopytools options found, acopytools={self.acopytools}' logger.error(msg) self.trace_report.update(clientState='BAD_COPYTOOL', stateReason=msg) self.trace_report.send() diff --git a/pilot/control/data.py b/pilot/control/data.py index 3c76a9b9..864d6bfe 100644 --- a/pilot/control/data.py +++ b/pilot/control/data.py @@ -285,11 +285,13 @@ def _stage_in(args: object, job: JobData) -> bool: logger.info('stage-in will not be done in a container') client, activity = get_stagein_client(job, args, label) + logger.info(f'activity={activity}') use_pcache = job.infosys.queuedata.use_pcache - + logger.debug(f'use_pcache={use_pcache}') # get the proper input file destination (normally job.workdir unless stager workflow) jobworkdir = job.workdir # there is a distinction for mv copy tool on ND vs non-ATLAS workdir = get_proper_input_destination(job.workdir, args.input_destination_dir) + logger.debug(f'workdir={workdir}') kwargs = {'workdir': workdir, 'cwd': job.workdir, 'usecontainer': False, @@ -301,7 +303,9 @@ def _stage_in(args: object, job: JobData) -> bool: 'rucio_host': args.rucio_host, 'jobworkdir': jobworkdir, 'args': args} + logger.debug(f'kwargs={kwargs}') client.prepare_sources(job.indata) + logger.info('prepared sources - will now transfer files') client.transfer(job.indata, activity=activity, **kwargs) except PilotException as error: error_msg = traceback.format_exc() From c9ce7f9a7c25722d7d34ff014724cacc67825604 Mon Sep 17 00:00:00 2001 From: Paul Nilsson Date: Wed, 18 Sep 2024 16:11:53 +0200 Subject: [PATCH 10/15] Now using psutils instead of ps command --- pilot/user/atlas/utilities.py | 22 ++++++++++++++-------- pilot/util/constants.py | 2 +- pilot/util/psutils.py | 24 ++++++++++++++++++++++++ 3 files changed, 39 insertions(+), 9 deletions(-) diff --git a/pilot/user/atlas/utilities.py b/pilot/user/atlas/utilities.py index c73df92a..27d91384 100644 --- a/pilot/user/atlas/utilities.py +++ b/pilot/user/atlas/utilities.py @@ -41,7 +41,10 @@ ) from pilot.util.parameters import convert_to_int from pilot.util.processes import is_process_running -from pilot.util.psutils import get_command_by_pid +from pilot.util.psutils import ( + get_command_by_pid, + find_process_by_jobid +) from .setup import get_asetup @@ -158,11 +161,9 @@ def get_proper_pid(pid: int, jobid: str, use_container: bool = True) -> int: if not is_process_running(pid): return -1 - ps = get_ps_info() - - # lookup the process id using ps aux + # lookup the process id using ps command or psutils logger.debug(f'attempting to identify pid from job id ({jobid})') - _pid = get_pid_for_jobid(ps, jobid) + _pid = get_pid_for_jobid(jobid) if _pid: logger.debug(f'discovered pid={_pid} for job id {jobid}') cmd = get_command_by_pid(_pid) @@ -188,6 +189,8 @@ def get_ps_info(whoami: str = None, options: str = 'axfo pid,user,args') -> str: """ Return ps info for the given user. + Note: this is a fallback solution in case the pid cannot be found in the psutils lookup. + :param whoami: username (str) :param options: ps options (str) :return: ps aux for given user (str). @@ -200,16 +203,19 @@ def get_ps_info(whoami: str = None, options: str = 'axfo pid,user,args') -> str: return stdout -def get_pid_for_jobid(ps: str, jobid: str) -> int or None: +def get_pid_for_jobid(jobid: str) -> int or None: """ Return the process id for the ps entry that contains the job id. - :param ps: ps command output (str) :param jobid: PanDA job id (str). :return: pid (int) or None if no such process (int or None). """ - pid = None + pid = find_process_by_jobid(jobid) + if pid: + return pid + # fallback to ps command + ps = get_ps_info() for line in ps.split('\n'): if jobid in line and 'xrootd' not in line: # extract pid diff --git a/pilot/util/constants.py b/pilot/util/constants.py index 8c0bcef3..83e17faf 100644 --- a/pilot/util/constants.py +++ b/pilot/util/constants.py @@ -28,7 +28,7 @@ RELEASE = '3' # released number should be fixed at 3 for Pilot 3 VERSION = '8' # version number is '1' for first release, '0' until then, increased for bigger updates REVISION = '2' # revision number should be reset to '0' for every new version release, increased for small updates -BUILD = '2' # build number should be reset to '1' for every new development cycle +BUILD = '4' # build number should be reset to '1' for every new development cycle SUCCESS = 0 FAILURE = 1 diff --git a/pilot/util/psutils.py b/pilot/util/psutils.py index f9b606e2..70bea556 100644 --- a/pilot/util/psutils.py +++ b/pilot/util/psutils.py @@ -267,3 +267,27 @@ def get_command_by_pid(pid: int) -> str or None: except psutil.NoSuchProcess: logger.warning(f"process with PID {pid} not found") return None + + +def find_process_by_jobid(jobid: int) -> int or None: + """ + Find the process ID of a process whose command arguments contain the given job ID. + + :param jobid: the job ID to search for (int) + :return: the process ID of the matching process, or None if no match is found (int or None). + """ + if not _is_psutil_available: + logger.warning('find_process_by_jobid(): psutil not available - aborting') + return None + + for proc in psutil.process_iter(): + try: + cmd_line = proc.cmdline() + except psutil.NoSuchProcess: + continue + + for arg in cmd_line: + if str(jobid) in arg: + return proc.pid + + return None From a6acee86a87e6a8a8e814f9d6168c1c838108f94 Mon Sep 17 00:00:00 2001 From: PalNilsson Date: Wed, 18 Sep 2024 18:21:57 +0200 Subject: [PATCH 11/15] Added debug messages, avoiding xrootd command --- PILOTVERSION | 2 +- pilot/util/constants.py | 2 +- pilot/util/psutils.py | 4 +++- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/PILOTVERSION b/PILOTVERSION index ebf6e6f6..3466543b 100644 --- a/PILOTVERSION +++ b/PILOTVERSION @@ -1 +1 @@ -3.8.2.2 \ No newline at end of file +3.8.2.5 \ No newline at end of file diff --git a/pilot/util/constants.py b/pilot/util/constants.py index 83e17faf..4a6c98f4 100644 --- a/pilot/util/constants.py +++ b/pilot/util/constants.py @@ -28,7 +28,7 @@ RELEASE = '3' # released number should be fixed at 3 for Pilot 3 VERSION = '8' # version number is '1' for first release, '0' until then, increased for bigger updates REVISION = '2' # revision number should be reset to '0' for every new version release, increased for small updates -BUILD = '4' # build number should be reset to '1' for every new development cycle +BUILD = '5' # build number should be reset to '1' for every new development cycle SUCCESS = 0 FAILURE = 1 diff --git a/pilot/util/psutils.py b/pilot/util/psutils.py index 70bea556..d54670b3 100644 --- a/pilot/util/psutils.py +++ b/pilot/util/psutils.py @@ -286,8 +286,10 @@ def find_process_by_jobid(jobid: int) -> int or None: except psutil.NoSuchProcess: continue + logger.debug(f'cmd_line={cmd_line}') for arg in cmd_line: - if str(jobid) in arg: + logger.debug(f'arg={arg}') + if str(jobid) in arg and 'xrootd' not in arg: return proc.pid return None From 5685d2a116b7401d604e8ba62b353d0ace1a05a2 Mon Sep 17 00:00:00 2001 From: Paul Nilsson Date: Thu, 19 Sep 2024 17:48:51 +0200 Subject: [PATCH 12/15] Added possibility of setting rt logging info in catchall --- PILOTVERSION | 2 +- pilot/control/payload.py | 20 +++++++++++++++++--- pilot/util/constants.py | 2 +- pilot/util/realtimelogger.py | 2 +- 4 files changed, 20 insertions(+), 6 deletions(-) diff --git a/PILOTVERSION b/PILOTVERSION index 3466543b..0a24dc84 100644 --- a/PILOTVERSION +++ b/PILOTVERSION @@ -1 +1 @@ -3.8.2.5 \ No newline at end of file +3.8.2.6 \ No newline at end of file diff --git a/pilot/control/payload.py b/pilot/control/payload.py index 25adce15..7d091073 100644 --- a/pilot/control/payload.py +++ b/pilot/control/payload.py @@ -376,13 +376,21 @@ def extract_error_info(error: str) -> (int, str): return error_code, diagnostics -def get_rtlogging() -> str: +def get_rtlogging(catchall: str) -> str: """ - Return the proper rtlogging value from the experiment specific plug-in or the config file. + Return the proper rtlogging value from PQ.catchall, the experiment specific plug-in or the config file. + :param catchall: catchall field from queuedata (str) :return: rtlogging (str). """ + if catchall: + _rtlogging = findall(r'logging=([^,]+)', catchall) + if _rtlogging and ";" in _rtlogging[0]: + logger.info(f"found rtlogging in catchall: {_rtlogging[0]}") + return _rtlogging[0] + rtlogging = None + pilot_user = os.environ.get('PILOT_USER', 'generic').lower() try: user = __import__(f'pilot.user.{pilot_user}.common', globals(), locals(), [pilot_user], 0) @@ -419,7 +427,13 @@ def get_logging_info(job: JobData, args: object) -> dict: info_dic['logname'] = args.realtime_logname if args.realtime_logname else "pilot-log" logserver = args.realtime_logging_server if args.realtime_logging_server else "" - info = findall(r'(\S+)\;(\S+)\:\/\/(\S+)\:(\d+)', get_rtlogging()) + try: + catchall = job.infosys.queuedata.catchall + except Exception as exc: + logger.warning(f'exception caught: {exc}') + catchall = "" + + info = findall(r'(\S+)\;(\S+)\:\/\/(\S+)\:(\d+)', get_rtlogging(catchall)) if not logserver and not info: logger.warning(f"not enough info available for activating real-time logging (info='{info}', logserver='{logserver}')") return {} diff --git a/pilot/util/constants.py b/pilot/util/constants.py index 4a6c98f4..b2693f62 100644 --- a/pilot/util/constants.py +++ b/pilot/util/constants.py @@ -28,7 +28,7 @@ RELEASE = '3' # released number should be fixed at 3 for Pilot 3 VERSION = '8' # version number is '1' for first release, '0' until then, increased for bigger updates REVISION = '2' # revision number should be reset to '0' for every new version release, increased for small updates -BUILD = '5' # build number should be reset to '1' for every new development cycle +BUILD = '6' # build number should be reset to '1' for every new development cycle SUCCESS = 0 FAILURE = 1 diff --git a/pilot/util/realtimelogger.py b/pilot/util/realtimelogger.py index d06984fd..df085a29 100644 --- a/pilot/util/realtimelogger.py +++ b/pilot/util/realtimelogger.py @@ -121,7 +121,7 @@ def __init__(self, args: Any, info_dic: dict, workdir: str, secrets: str, level: if workdir: # bypass pylint warning - keep workdir for possible future development pass if not info_dic: - logger.warning('info dictionary not set - add \'logging=type:protocol://host:port\' to PQ.catchall)') + logger.warning('info dictionary not set - add \'logging=type;protocol://host:port\' to PQ.catchall)') RealTimeLogger.glogger = None return From 98d1360417feb53165e1b514b8ae5cf17f040fdf Mon Sep 17 00:00:00 2001 From: Paul Nilsson Date: Mon, 23 Sep 2024 12:09:11 +0200 Subject: [PATCH 13/15] Removed debug message --- pilot/util/psutils.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/pilot/util/psutils.py b/pilot/util/psutils.py index d54670b3..eb70f263 100644 --- a/pilot/util/psutils.py +++ b/pilot/util/psutils.py @@ -286,9 +286,7 @@ def find_process_by_jobid(jobid: int) -> int or None: except psutil.NoSuchProcess: continue - logger.debug(f'cmd_line={cmd_line}') for arg in cmd_line: - logger.debug(f'arg={arg}') if str(jobid) in arg and 'xrootd' not in arg: return proc.pid From 486af877850f079cf729534c2f853ec9ce2168da Mon Sep 17 00:00:00 2001 From: Paul Nilsson Date: Mon, 23 Sep 2024 13:08:59 +0200 Subject: [PATCH 14/15] Improved exception handling --- PILOTVERSION | 2 +- pilot/util/constants.py | 2 +- pilot/util/https.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/PILOTVERSION b/PILOTVERSION index 0a24dc84..6e7d9009 100644 --- a/PILOTVERSION +++ b/PILOTVERSION @@ -1 +1 @@ -3.8.2.6 \ No newline at end of file +3.8.2.7 \ No newline at end of file diff --git a/pilot/util/constants.py b/pilot/util/constants.py index b2693f62..4c42e76e 100644 --- a/pilot/util/constants.py +++ b/pilot/util/constants.py @@ -28,7 +28,7 @@ RELEASE = '3' # released number should be fixed at 3 for Pilot 3 VERSION = '8' # version number is '1' for first release, '0' until then, increased for bigger updates REVISION = '2' # revision number should be reset to '0' for every new version release, increased for small updates -BUILD = '6' # build number should be reset to '1' for every new development cycle +BUILD = '7' # build number should be reset to '1' for every new development cycle SUCCESS = 0 FAILURE = 1 diff --git a/pilot/util/https.py b/pilot/util/https.py index 9a2da0d2..a26c75ce 100644 --- a/pilot/util/https.py +++ b/pilot/util/https.py @@ -646,7 +646,7 @@ def get_panda_server(url: str, port: int, update_server: bool = True) -> str: if default in pandaserver: try: rnd = random.choice([socket.getfqdn(vv) for vv in set([v[-1][0] for v in socket.getaddrinfo(default, 25443, socket.AF_INET)])]) - except socket.herror as exc: + except (socket.herror, socket.gaierror) as exc: logger.warning(f'failed to get address from socket: {exc} - will use default server ({pandaserver})') else: pandaserver = pandaserver.replace(default, rnd) From adaedd74cd00ea0126803f3af4349775c5e0151b Mon Sep 17 00:00:00 2001 From: Paul Nilsson Date: Tue, 24 Sep 2024 10:53:41 +0200 Subject: [PATCH 15/15] Merge with Alexey's code --- PILOTVERSION | 2 +- pilot/util/constants.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/PILOTVERSION b/PILOTVERSION index 6e7d9009..e88f7664 100644 --- a/PILOTVERSION +++ b/PILOTVERSION @@ -1 +1 @@ -3.8.2.7 \ No newline at end of file +3.8.2.8 \ No newline at end of file diff --git a/pilot/util/constants.py b/pilot/util/constants.py index 4c42e76e..e497600c 100644 --- a/pilot/util/constants.py +++ b/pilot/util/constants.py @@ -28,7 +28,7 @@ RELEASE = '3' # released number should be fixed at 3 for Pilot 3 VERSION = '8' # version number is '1' for first release, '0' until then, increased for bigger updates REVISION = '2' # revision number should be reset to '0' for every new version release, increased for small updates -BUILD = '7' # build number should be reset to '1' for every new development cycle +BUILD = '8' # build number should be reset to '1' for every new development cycle SUCCESS = 0 FAILURE = 1