Skip to content

Commit

Permalink
Merge pull request #142 from anisyonk/altstageout
Browse files Browse the repository at this point in the history
alt stage-out
  • Loading branch information
PalNilsson authored Sep 13, 2024
2 parents 4a45bc2 + 97d7202 commit fca3597
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 27 deletions.
29 changes: 19 additions & 10 deletions pilot/api/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,7 @@ def __init__(self,
default_copytools: str = 'rucio',
trace_report: dict = None,
ipv: str = 'IPv6',
workdir: str = "",
altstageout: str = None):
workdir: str = ""):
"""
Set default/init values.
Expand All @@ -113,7 +112,6 @@ def __init__(self,
self.infosys = infosys_instance or infosys
self.ipv = ipv
self.workdir = workdir
self.altstageout = altstageout

if isinstance(acopytools, str):
acopytools = {'default': [acopytools]} if acopytools else {}
Expand Down Expand Up @@ -522,16 +520,17 @@ def transfer_files(self, copytool: Any, files: list, activity: list, **kwargs: d
"""
raise NotImplementedError()

def transfer(self, files: list, activity: list or str = 'default', **kwargs: dict) -> list: # noqa: C901
def transfer(self, files: list, activity: list or str = 'default', raise_exception: bool = True, **kwargs: dict) -> list: # noqa: C901
"""
Perform file transfer.
Automatically stage passed files using copy tools related to given `activity`.
:param files: list of `FileSpec` objects (list)
:param activity: list of activity names used to determine appropriate copytool (list or str)
:param raise_exception: boolean flag used to ignore transfer errors
:param kwargs: extra kwargs to be passed to copytool transfer handler (dict)
:raise: PilotException in case of controlled error
:raise: PilotException in case of controlled error if `raise_exception` is `True`
:return: list of processed `FileSpec` objects (list).
"""
self.trace_report.update(relativeStart=time.time(), transferStart=time.time())
Expand All @@ -548,7 +547,8 @@ def transfer(self, files: list, activity: list or str = 'default', **kwargs: dic
break

if not copytools:
raise PilotException(f'failed to resolve copytool by preferred activities={activity}, acopytools={self.acopytools}')
raise PilotException(f'failed to resolve copytool by preferred activities={activity}, acopytools={self.acopytools}',
code=ErrorCodes.UNKNOWNCOPYTOOL)

# populate inputddms if needed
self.prepare_inputddms(files)
Expand Down Expand Up @@ -633,7 +633,8 @@ def transfer(self, files: list, activity: list or str = 'default', **kwargs: dic
code = ErrorCodes.STAGEINFAILED if self.mode == 'stage-in' else ErrorCodes.STAGEOUTFAILED # is it stage-in/out?
details = str(caught_errors) + ":" + f'failed to transfer files using copytools={copytools}'
self.logger.fatal(details)
raise PilotException(details, code=code)
if raise_exception:
raise PilotException(details, code=code)

return files

Expand Down Expand Up @@ -1109,20 +1110,28 @@ def prepare_destinations(self, files: list, activities: list or str) -> list:

# take the fist choice for now, extend the logic later if need
ddm = storages[0]
ddm_alt = storages[1] if len(storages) > 1 else None

self.logger.info(f"[prepare_destinations][{activity}]: allowed (local) destinations: {storages}")
self.logger.info(f"[prepare_destinations][{activity}]: resolved default destination ddm={ddm}")
self.logger.info(f"[prepare_destinations][{activity}]: resolved default destination: ddm={ddm}, ddm_alt={ddm_alt}")

for e in files:
if not e.ddmendpoint: # no preferences => use default destination
self.logger.info("[prepare_destinations][%s]: fspec.ddmendpoint is not set for lfn=%s"
" .. will use default ddm=%s as (local) destination", activity, e.lfn, ddm)
" .. will use default ddm=%s as (local) destination; ddm_alt=%s", activity, e.lfn, ddm, ddm_alt)
e.ddmendpoint = ddm
e.ddmendpoint_alt = ddm_alt
elif e.ddmendpoint not in storages: # fspec.ddmendpoint is not in associated storages => assume it as final (non local) alternative destination
self.logger.info("[prepare_destinations][%s]: Requested fspec.ddmendpoint=%s is not in the list of allowed (local) destinations"
" .. will consider default ddm=%s for transfer and tag %s as alt. location", activity, e.ddmendpoint, ddm, e.ddmendpoint)
e.ddmendpoint_alt = e.ddmendpoint # verify me
e.ddmendpoint = ddm
e.ddmendpoint_alt = e.ddmendpoint # consider me later
else: # set corresponding ddmendpoint_alt if exist (next entry in available storages list)
cur = storages.index(e.ddmendpoint)
ddm_next = storages[cur + 1] if (cur + 1) < len(storages) else storages[0] # cycle storages, take the first elem when reach end
e.ddmendpoint_alt = ddm_next if e.ddmendpoint != ddm_next else None
self.logger.info("[prepare_destinations][%s]: set ddmendpoint_alt=%s for fspec.ddmendpoint=%s",
activity, e.ddmendpoint_alt, e.ddmendpoint)

return files

Expand Down
53 changes: 37 additions & 16 deletions pilot/control/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -909,14 +909,33 @@ def _do_stageout(job: JobData, args: object, xdata: list, activity: list, title:
# create the trace report
trace_report = create_trace_report(job, label=label)

client = StageOutClient(job.infosys, logger=logger, trace_report=trace_report, ipv=ipv, workdir=job.workdir, altstageout=job.altstageout)
client = StageOutClient(job.infosys, logger=logger, trace_report=trace_report, ipv=ipv, workdir=job.workdir)
kwargs = {'workdir': job.workdir, 'cwd': job.workdir, 'usecontainer': False, 'job': job,
'output_dir': args.output_dir, 'catchall': job.infosys.queuedata.catchall,
'rucio_host': args.rucio_host} #, mode='stage-out')
is_unified = job.infosys.queuedata.type == 'unified'
# prod analy unification: use destination preferences from PanDA server for unified queues
if job.infosys.queuedata.type != 'unified':
if not is_unified:
client.prepare_destinations(xdata, activity) ## FIX ME LATER: split activities: for astorages and for copytools (to unify with ES workflow)
client.transfer(xdata, activity, **kwargs)

altstageout = not is_unified and job.allow_altstageout() # do not use alt stage-out for unified queues
client.transfer(xdata, activity, raise_exception=not altstageout, **kwargs)
remain_files = [entry for entry in xdata if entry.require_transfer()]
# check if alt stageout can be applied (all remain files must have alt storage declared ddmendpoint_alt)
has_altstorage = all(entry.ddmendpoint_alt and entry.ddmendpoint != entry.ddmendpoint_alt for entry in remain_files)

logger.info('alt stage-out settings: %s, is_unified=%s, altstageout=%s, remain_files=%s, has_altstorage=%s',
activity, is_unified, altstageout, len(remain_files), has_altstorage)

if altstageout and remain_files and has_altstorage: # apply alternative stageout for failed transfers
for entry in remain_files:
entry.ddmendpoint = entry.ddmendpoint_alt
entry.ddmendpoint_alt = None
entry.is_altstaged = True

logger.info('alt stage-out will be applied for remain=%s files (previously failed)', len(remain_files))
client.transfer(xdata, activity, **kwargs)

except PilotException as error:
error_msg = traceback.format_exc()
logger.error(error_msg)
Expand All @@ -936,14 +955,10 @@ def _do_stageout(job: JobData, args: object, xdata: list, activity: list, title:
logger.info(f'switched back proxy on unified dispatch queue: X509_USER_PROXY={x509_org} (reset X509_UNIFIED_DISPATCH)')

logger.info('summary of transferred files:')
for iofile in xdata:
if not iofile.status:
status = "(not transferred)"
else:
status = iofile.status
logger.info(" -- lfn=%s, status_code=%s, status=%s", iofile.lfn, iofile.status_code, status)
for entry in xdata:
logger.info(" -- lfn=%s, status_code=%s, status=%s", entry.lfn, entry.status_code, entry.status or "(not transferred)")

remain_files = [iofile for iofile in xdata if iofile.status not in ['transferred']]
remain_files = [entry for entry in xdata if entry.status not in ['transferred']]

return not remain_files

Expand Down Expand Up @@ -1059,12 +1074,18 @@ def generate_fileinfo(job: JobData) -> dict:
"""
fileinfo = {}
checksum_type = config.File.checksum_type if config.File.checksum_type == 'adler32' else 'md5sum'
for iofile in job.outdata + job.logdata:
if iofile.status in {'transferred'}:
fileinfo[iofile.lfn] = {'guid': iofile.guid,
'fsize': iofile.filesize,
f'{checksum_type}': iofile.checksum.get(config.File.checksum_type),
'surl': iofile.turl}
for entry in job.outdata + job.logdata:
if entry.status in {'transferred'}:
dat = {
'guid': entry.guid,
'fsize': entry.filesize,
f'{checksum_type}': entry.checksum.get(config.File.checksum_type),
'surl': entry.turl
}
if entry.is_altstaged:
dat['ddmendpoint'] = entry.ddmendpoint

fileinfo[entry.lfn] = dat

return fileinfo

Expand Down
8 changes: 8 additions & 0 deletions pilot/info/filespec.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ class FileSpec(BaseData):
is_tar = False # whether it's a tar file or not
ddm_activity = None # DDM activity names (e.g. [read_lan, read_wan]) which should be used to resolve appropriate protocols from StorageData.arprotocols
checkinputsize = True
is_altstaged = None # indicates if file was transferred using alternative method (altstageout)

# specify the type of attributes for proper data validation and casting
_keys = {int: ['filesize', 'mtime', 'status_code'],
Expand Down Expand Up @@ -216,3 +217,10 @@ def get_storage_id_and_path_convention(self) -> (str, str):
logger.info(f'storage_id: {storage_id}, path_convention: {path_convention}')

return storage_id, path_convention

def require_transfer(self) -> bool:
"""
Check if File needs to be transferred (in error state or never has been started)
"""

return self.status not in ['remote_io', 'transferred', 'no_transfer']
35 changes: 34 additions & 1 deletion pilot/info/jobdata.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ class JobData(BaseData):
dask_scheduler_ip = '' # enhanced job definition for Dask jobs
jupyter_session_ip = '' # enhanced job definition for Dask jobs
minramcount = 0 # minimum number of RAM required by the payload
altstageout = None # alternative stage-out method, on, off, force
altstageout = None # alternative stage-out method: boolean, on (True), off (False)
# home package string with additional payload release information; does not need to be added to
# the conversion function since it's already lower case
homepackage = "" # home package for TRF
Expand Down Expand Up @@ -575,6 +575,26 @@ def has_remoteio(self) -> bool:
"""
return any(fspec.status == 'remote_io' for fspec in self.indata)

def allow_altstageout(self):
"""
Resolve if alternative stageout is allowed for given job taking into account `queuedata` settings as well.
`queuedata` specific settings overwrites job preferences for altstageout.
:return: boolean.
"""

# consider first the queue specific settings (if any)
if self.infosys and self.infosys.queuedata:
qval = self.infosys.queuedata.altstageout
if qval is not None:
return qval
else:
logger.info('job.infosys.queuedata is not initialized: PandaQueue specific settings for altstageout will not be considered')

# apply additional job specific checks here if need

return bool(self.altstageout)

def clean(self):
"""
Validate and finally clean up required data values (object properties) if needed.
Expand Down Expand Up @@ -628,6 +648,19 @@ def clean__platform(self, raw: Any, value: str) -> str:

return v

def clean__altstageout(self, raw: Any, value: str) -> Any:
"""
Verify and validate value for the altstageout key.
:param raw: raw value (Any)
:param value: parsed value (str)
:return: cleaned value (bool or None).
"""
if value == 'on':
return True
if value == 'off':
return False

def clean__jobparams(self, raw: Any, value: str) -> str:
"""
Verify and validate value for the jobparams key.
Expand Down
15 changes: 15 additions & 0 deletions pilot/info/queuedata.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ class QueueData(BaseData):
is_cvmfs = True # has cvmfs installed
memkillgrace = 100 # memory kill grace value in percentage

altstageout = None # allow altstageout: force (True) or disable (False) or no preferences (None)

# specify the type of attributes for proper data validation and casting
_keys = {int: ['timefloor', 'maxwdir', 'pledgedcpu', 'es_stageout_gap',
'corecount', 'maxrss', 'maxtime', 'maxinputsize', 'memkillgrace'],
Expand Down Expand Up @@ -171,6 +173,16 @@ def resolve_allowed_schemas(self, activity: str or list, copytool: str = None) -

return adat.get(copytool) or []

def allow_altstageout(self):
"""
Resolve if alternative stageout should be forced (True) or disabled (False); None value means no preferences defined.
:return: boolean or None
"""

val = self.params.get('allow_altstageout', None)
if val is not None: # cast to bool
return bool(val)

def clean(self):
"""Validate and finally clean up required data values (required object properties) if needed."""
# validate es_stageout_gap value
Expand All @@ -197,6 +209,9 @@ def clean(self):
self.container_options = self.container_options.replace(" --contain", ",${workdir} --contain")
logger.info(f"note: added missing $workdir to container_options: {self.container_options}")

# set altstageout settings
self.altstageout = self.allow_altstageout()

## custom function pattern to apply extra validation to the key values
##def clean__keyname(self, raw, value):
## :param raw: raw value passed from ext source as input
Expand Down

0 comments on commit fca3597

Please sign in to comment.