From 295210a91e0b73733bf33917876d13ccf4c9986d Mon Sep 17 00:00:00 2001 From: Alexey Anisenkov Date: Mon, 9 Sep 2024 16:00:00 +0700 Subject: [PATCH 1/5] Implemented alternative stage-out (altstageout) to different RSE specified in PQ.astorages on primary stage-out transfer failures (for output and log files) --- pilot/api/data.py | 26 +++++++++++++++++--------- pilot/control/data.py | 33 +++++++++++++++++++++++---------- pilot/info/filespec.py | 7 +++++++ pilot/info/jobdata.py | 35 ++++++++++++++++++++++++++++++++++- pilot/info/queuedata.py | 15 +++++++++++++++ 5 files changed, 96 insertions(+), 20 deletions(-) diff --git a/pilot/api/data.py b/pilot/api/data.py index 3305b47f..b5b96005 100644 --- a/pilot/api/data.py +++ b/pilot/api/data.py @@ -87,8 +87,7 @@ def __init__(self, default_copytools: str = 'rucio', trace_report: dict = None, ipv: str = 'IPv6', - workdir: str = "", - altstageout: str = None): + workdir: str = ""): """ Set default/init values. @@ -113,7 +112,6 @@ def __init__(self, self.infosys = infosys_instance or infosys self.ipv = ipv self.workdir = workdir - self.altstageout = altstageout if isinstance(acopytools, str): acopytools = {'default': [acopytools]} if acopytools else {} @@ -522,7 +520,7 @@ def transfer_files(self, copytool: Any, files: list, activity: list, **kwargs: d """ raise NotImplementedError() - def transfer(self, files: list, activity: list or str = 'default', **kwargs: dict) -> list: # noqa: C901 + def transfer(self, files: list, activity: list or str = 'default', raise_exception: bool = True, **kwargs: dict) -> list: # noqa: C901 """ Perform file transfer. @@ -530,8 +528,9 @@ def transfer(self, files: list, activity: list or str = 'default', **kwargs: dic :param files: list of `FileSpec` objects (list) :param activity: list of activity names used to determine appropriate copytool (list or str) + :param raise_exception: boolean flag used to ignore transfer errors :param kwargs: extra kwargs to be passed to copytool transfer handler (dict) - :raise: PilotException in case of controlled error + :raise: PilotException in case of controlled error if `raise_exception` is `True` :return: list of processed `FileSpec` objects (list). """ self.trace_report.update(relativeStart=time.time(), transferStart=time.time()) @@ -548,7 +547,8 @@ def transfer(self, files: list, activity: list or str = 'default', **kwargs: dic break if not copytools: - raise PilotException(f'failed to resolve copytool by preferred activities={activity}, acopytools={self.acopytools}') + raise PilotException(f'failed to resolve copytool by preferred activities={activity}, acopytools={self.acopytools}', + code=ErrorCodes.UNKNOWNCOPYTOOL) # populate inputddms if needed self.prepare_inputddms(files) @@ -1109,20 +1109,28 @@ def prepare_destinations(self, files: list, activities: list or str) -> list: # take the fist choice for now, extend the logic later if need ddm = storages[0] + ddm_alt = storages[1] if len(storages) > 1 else None self.logger.info(f"[prepare_destinations][{activity}]: allowed (local) destinations: {storages}") - self.logger.info(f"[prepare_destinations][{activity}]: resolved default destination ddm={ddm}") + self.logger.info(f"[prepare_destinations][{activity}]: resolved default destination: ddm={ddm}, ddm_alt={ddm_alt}") for e in files: if not e.ddmendpoint: # no preferences => use default destination self.logger.info("[prepare_destinations][%s]: fspec.ddmendpoint is not set for lfn=%s" - " .. will use default ddm=%s as (local) destination", activity, e.lfn, ddm) + " .. will use default ddm=%s as (local) destination; ddm_alt=%s", activity, e.lfn, ddm, ddm_alt) e.ddmendpoint = ddm + e.ddmendpoint_alt = ddm_alt elif e.ddmendpoint not in storages: # fspec.ddmendpoint is not in associated storages => assume it as final (non local) alternative destination self.logger.info("[prepare_destinations][%s]: Requested fspec.ddmendpoint=%s is not in the list of allowed (local) destinations" " .. will consider default ddm=%s for transfer and tag %s as alt. location", activity, e.ddmendpoint, ddm, e.ddmendpoint) e.ddmendpoint = ddm - e.ddmendpoint_alt = e.ddmendpoint # consider me later + e.ddmendpoint_alt = e.ddmendpoint + else: # set corresponding ddmendpoint_alt if exist (next entry in available storages list) + cur = storages.index(e.ddmendpoint) + ddm_next = storages[cur + 1] if (cur + 1) < len(storages) else storages[0] # cycle storages, take the first elem when reach end + e.ddmendpoint_alt = ddm_next if e.ddmendpoint != ddm_next else None + self.logger.info("[prepare_destinations][%s]: set ddmendpoint_alt=%s for fspec.ddmendpoint=%s", + activity, e.ddmendpoint_alt, e.ddmendpoint) return files diff --git a/pilot/control/data.py b/pilot/control/data.py index 3c76a9b9..712903de 100644 --- a/pilot/control/data.py +++ b/pilot/control/data.py @@ -909,14 +909,31 @@ def _do_stageout(job: JobData, args: object, xdata: list, activity: list, title: # create the trace report trace_report = create_trace_report(job, label=label) - client = StageOutClient(job.infosys, logger=logger, trace_report=trace_report, ipv=ipv, workdir=job.workdir, altstageout=job.altstageout) + client = StageOutClient(job.infosys, logger=logger, trace_report=trace_report, ipv=ipv, workdir=job.workdir) kwargs = {'workdir': job.workdir, 'cwd': job.workdir, 'usecontainer': False, 'job': job, 'output_dir': args.output_dir, 'catchall': job.infosys.queuedata.catchall, 'rucio_host': args.rucio_host} #, mode='stage-out') + is_unified = job.infosys.queuedata.type == 'unified' # prod analy unification: use destination preferences from PanDA server for unified queues - if job.infosys.queuedata.type != 'unified': + if not is_unified: client.prepare_destinations(xdata, activity) ## FIX ME LATER: split activities: for astorages and for copytools (to unify with ES workflow) - client.transfer(xdata, activity, **kwargs) + + altstageout = not is_unified and job.allow_altstageout() # do not use alt stage-out for unified queues + client.transfer(xdata, activity, raise_exception=not altstageout, **kwargs) + remain_files = [entry for entry in xdata if entry.require_transfer()] + # check if alt stageout can be applied (all remain files must have alt storage declared ddmendpoint_alt) + has_altstorage = all(entry.ddmendpoint_alt and entry.ddmendpoint != entry.ddmendpoint_alt for entry in remain_files) + + logger.info('alt stage-out settings[%s]: is_unified=%s, altstageout=%s, remain_files=%s, has_altstorage=%s', + activity, is_unified, len(remain_files), has_altstorage) + + if altstageout and remain_files and has_altstorage: # apply alternative stageout for failed transfers + for entry in remain_files: + entry.ddmendpoint = entry.ddmendpoint_alt + entry.ddmendpoint_alt = None + + client.transfer(xdata, activity, **kwargs) + except PilotException as error: error_msg = traceback.format_exc() logger.error(error_msg) @@ -936,14 +953,10 @@ def _do_stageout(job: JobData, args: object, xdata: list, activity: list, title: logger.info(f'switched back proxy on unified dispatch queue: X509_USER_PROXY={x509_org} (reset X509_UNIFIED_DISPATCH)') logger.info('summary of transferred files:') - for iofile in xdata: - if not iofile.status: - status = "(not transferred)" - else: - status = iofile.status - logger.info(" -- lfn=%s, status_code=%s, status=%s", iofile.lfn, iofile.status_code, status) + for entry in xdata: + logger.info(" -- lfn=%s, status_code=%s, status=%s", entry.lfn, entry.status_code, entry.status or "(not transferred)") - remain_files = [iofile for iofile in xdata if iofile.status not in ['transferred']] + remain_files = [entry for entry in xdata if entry.status not in ['transferred']] return not remain_files diff --git a/pilot/info/filespec.py b/pilot/info/filespec.py index d7e8e906..307b18c9 100644 --- a/pilot/info/filespec.py +++ b/pilot/info/filespec.py @@ -216,3 +216,10 @@ def get_storage_id_and_path_convention(self) -> (str, str): logger.info(f'storage_id: {storage_id}, path_convention: {path_convention}') return storage_id, path_convention + + def require_transfer(self) -> bool: + """ + Check if File needs to be transferred (in error state or never has been started) + """ + + return self.status not in ['remote_io', 'transferred', 'no_transfer'] diff --git a/pilot/info/jobdata.py b/pilot/info/jobdata.py index f936dbbf..2cc149be 100644 --- a/pilot/info/jobdata.py +++ b/pilot/info/jobdata.py @@ -169,7 +169,7 @@ class JobData(BaseData): dask_scheduler_ip = '' # enhanced job definition for Dask jobs jupyter_session_ip = '' # enhanced job definition for Dask jobs minramcount = 0 # minimum number of RAM required by the payload - altstageout = None # alternative stage-out method, on, off, force + altstageout = None # alternative stage-out method: boolean, on (True), off (False) # home package string with additional payload release information; does not need to be added to # the conversion function since it's already lower case homepackage = "" # home package for TRF @@ -575,6 +575,26 @@ def has_remoteio(self) -> bool: """ return any(fspec.status == 'remote_io' for fspec in self.indata) + def allow_altstageout(self): + """ + Resolve if alternative stageout is allowed for given job taking into account `queuedata` settings as well. + `queuedata` specific settings overwrites job preferences for altstageout. + + :return: boolean. + """ + + # consider first the queue specific settings (if any) + if self.infosys and self.infosys.queuedata: + qval = self.infosys.queuedata.altstageout + if qval is not None: + return qval + else: + logger.info('job.infosys.queuedata is not initialized: PandaQueue specific settings for altstageout will not be considered') + + # apply additional job specific checks here if need + + return bool(self.altstageout) + def clean(self): """ Validate and finally clean up required data values (object properties) if needed. @@ -628,6 +648,19 @@ def clean__platform(self, raw: Any, value: str) -> str: return v + def clean__altstageout(self, raw: Any, value: str) -> Any: + """ + Verify and validate value for the altstageout key. + + :param raw: raw value (Any) + :param value: parsed value (str) + :return: cleaned value (bool or None). + """ + if value == 'on': + return True + if value == 'off': + return False + def clean__jobparams(self, raw: Any, value: str) -> str: """ Verify and validate value for the jobparams key. diff --git a/pilot/info/queuedata.py b/pilot/info/queuedata.py index 3e3ee1b7..5508c260 100644 --- a/pilot/info/queuedata.py +++ b/pilot/info/queuedata.py @@ -96,6 +96,8 @@ class QueueData(BaseData): is_cvmfs = True # has cvmfs installed memkillgrace = 100 # memory kill grace value in percentage + altstageout = None # allow altstageout: force (True) or disable (False) or no preferences (None) + # specify the type of attributes for proper data validation and casting _keys = {int: ['timefloor', 'maxwdir', 'pledgedcpu', 'es_stageout_gap', 'corecount', 'maxrss', 'maxtime', 'maxinputsize', 'memkillgrace'], @@ -171,6 +173,16 @@ def resolve_allowed_schemas(self, activity: str or list, copytool: str = None) - return adat.get(copytool) or [] + def allow_altstageout(self): + """ + Resolve if alternative stageout should be forced (True) or disabled (False); None value means no preferences defined. + :return: boolean or None + """ + + val = self.params.get('allow_altstageout', None) + if val is not None: # cast to bool + return bool(val) + def clean(self): """Validate and finally clean up required data values (required object properties) if needed.""" # validate es_stageout_gap value @@ -197,6 +209,9 @@ def clean(self): self.container_options = self.container_options.replace(" --contain", ",${workdir} --contain") logger.info(f"note: added missing $workdir to container_options: {self.container_options}") + # set altstageout settings + self.altstageout = self.allow_altstageout() + ## custom function pattern to apply extra validation to the key values ##def clean__keyname(self, raw, value): ## :param raw: raw value passed from ext source as input From 770ebf58649bceb1ac37350238c176032a668e89 Mon Sep 17 00:00:00 2001 From: Alexey Anisenkov Date: Tue, 10 Sep 2024 15:52:46 +0700 Subject: [PATCH 2/5] typo fix --- pilot/control/data.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pilot/control/data.py b/pilot/control/data.py index 712903de..4c36d70a 100644 --- a/pilot/control/data.py +++ b/pilot/control/data.py @@ -925,7 +925,7 @@ def _do_stageout(job: JobData, args: object, xdata: list, activity: list, title: has_altstorage = all(entry.ddmendpoint_alt and entry.ddmendpoint != entry.ddmendpoint_alt for entry in remain_files) logger.info('alt stage-out settings[%s]: is_unified=%s, altstageout=%s, remain_files=%s, has_altstorage=%s', - activity, is_unified, len(remain_files), has_altstorage) + activity, is_unified, altstageout, len(remain_files), has_altstorage) if altstageout and remain_files and has_altstorage: # apply alternative stageout for failed transfers for entry in remain_files: From f3ea527db1d61f635548806927206b46a777213b Mon Sep 17 00:00:00 2001 From: Alexey Anisenkov Date: Tue, 10 Sep 2024 19:27:59 +0700 Subject: [PATCH 3/5] fixes --- pilot/api/data.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pilot/api/data.py b/pilot/api/data.py index b5b96005..23831cc6 100644 --- a/pilot/api/data.py +++ b/pilot/api/data.py @@ -633,7 +633,8 @@ def transfer(self, files: list, activity: list or str = 'default', raise_excepti code = ErrorCodes.STAGEINFAILED if self.mode == 'stage-in' else ErrorCodes.STAGEOUTFAILED # is it stage-in/out? details = str(caught_errors) + ":" + f'failed to transfer files using copytools={copytools}' self.logger.fatal(details) - raise PilotException(details, code=code) + if raise_exception: + raise PilotException(details, code=code) return files From f13770adeefc2c9b9a016c65770c5e19989d0052 Mon Sep 17 00:00:00 2001 From: Alexey Anisenkov Date: Tue, 10 Sep 2024 20:27:59 +0700 Subject: [PATCH 4/5] minor fix --- pilot/api/data.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pilot/api/data.py b/pilot/api/data.py index 23831cc6..76be4e73 100644 --- a/pilot/api/data.py +++ b/pilot/api/data.py @@ -1124,8 +1124,8 @@ def prepare_destinations(self, files: list, activities: list or str) -> list: elif e.ddmendpoint not in storages: # fspec.ddmendpoint is not in associated storages => assume it as final (non local) alternative destination self.logger.info("[prepare_destinations][%s]: Requested fspec.ddmendpoint=%s is not in the list of allowed (local) destinations" " .. will consider default ddm=%s for transfer and tag %s as alt. location", activity, e.ddmendpoint, ddm, e.ddmendpoint) + e.ddmendpoint_alt = e.ddmendpoint # verify me e.ddmendpoint = ddm - e.ddmendpoint_alt = e.ddmendpoint else: # set corresponding ddmendpoint_alt if exist (next entry in available storages list) cur = storages.index(e.ddmendpoint) ddm_next = storages[cur + 1] if (cur + 1) < len(storages) else storages[0] # cycle storages, take the first elem when reach end From 97d720242a56416f07edc4d98a6f91a8507521a8 Mon Sep 17 00:00:00 2001 From: Alexey Anisenkov Date: Tue, 10 Sep 2024 22:38:05 +0700 Subject: [PATCH 5/5] report back file.ddmendpoint value for alternative staged files --- pilot/control/data.py | 22 +++++++++++++++------- pilot/info/filespec.py | 1 + 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/pilot/control/data.py b/pilot/control/data.py index 4c36d70a..506ce68a 100644 --- a/pilot/control/data.py +++ b/pilot/control/data.py @@ -924,14 +924,16 @@ def _do_stageout(job: JobData, args: object, xdata: list, activity: list, title: # check if alt stageout can be applied (all remain files must have alt storage declared ddmendpoint_alt) has_altstorage = all(entry.ddmendpoint_alt and entry.ddmendpoint != entry.ddmendpoint_alt for entry in remain_files) - logger.info('alt stage-out settings[%s]: is_unified=%s, altstageout=%s, remain_files=%s, has_altstorage=%s', + logger.info('alt stage-out settings: %s, is_unified=%s, altstageout=%s, remain_files=%s, has_altstorage=%s', activity, is_unified, altstageout, len(remain_files), has_altstorage) if altstageout and remain_files and has_altstorage: # apply alternative stageout for failed transfers for entry in remain_files: entry.ddmendpoint = entry.ddmendpoint_alt entry.ddmendpoint_alt = None + entry.is_altstaged = True + logger.info('alt stage-out will be applied for remain=%s files (previously failed)', len(remain_files)) client.transfer(xdata, activity, **kwargs) except PilotException as error: @@ -1072,12 +1074,18 @@ def generate_fileinfo(job: JobData) -> dict: """ fileinfo = {} checksum_type = config.File.checksum_type if config.File.checksum_type == 'adler32' else 'md5sum' - for iofile in job.outdata + job.logdata: - if iofile.status in {'transferred'}: - fileinfo[iofile.lfn] = {'guid': iofile.guid, - 'fsize': iofile.filesize, - f'{checksum_type}': iofile.checksum.get(config.File.checksum_type), - 'surl': iofile.turl} + for entry in job.outdata + job.logdata: + if entry.status in {'transferred'}: + dat = { + 'guid': entry.guid, + 'fsize': entry.filesize, + f'{checksum_type}': entry.checksum.get(config.File.checksum_type), + 'surl': entry.turl + } + if entry.is_altstaged: + dat['ddmendpoint'] = entry.ddmendpoint + + fileinfo[entry.lfn] = dat return fileinfo diff --git a/pilot/info/filespec.py b/pilot/info/filespec.py index 307b18c9..ef4700ed 100644 --- a/pilot/info/filespec.py +++ b/pilot/info/filespec.py @@ -77,6 +77,7 @@ class FileSpec(BaseData): is_tar = False # whether it's a tar file or not ddm_activity = None # DDM activity names (e.g. [read_lan, read_wan]) which should be used to resolve appropriate protocols from StorageData.arprotocols checkinputsize = True + is_altstaged = None # indicates if file was transferred using alternative method (altstageout) # specify the type of attributes for proper data validation and casting _keys = {int: ['filesize', 'mtime', 'status_code'],