Skip to content

Commit

Permalink
Merge pull request #152 from anisyonk/altstageout_unified
Browse files Browse the repository at this point in the history
alt stage-out for unified queues
  • Loading branch information
PalNilsson authored Nov 22, 2024
2 parents 41ed500 + 024cd70 commit 3440240
Show file tree
Hide file tree
Showing 27 changed files with 398 additions and 249 deletions.
2 changes: 1 addition & 1 deletion PILOTVERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.9.1.13
3.9.2.23b
14 changes: 11 additions & 3 deletions pilot.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def main() -> int:
return error.get_error_code()

# update the OIDC token if necessary (after queuedata has been downloaded, since PQ.catchall can contain instruction to prevent token renewal)
if 'no_token_renewal' in infosys.queuedata.catchall:
if 'no_token_renewal' in infosys.queuedata.catchall or args.token_renewal is False:
logger.info("OIDC token will not be renewed by the pilot")
else:
update_local_oidc_token_info(args.url, args.port)
Expand Down Expand Up @@ -182,8 +182,6 @@ def main() -> int:
f"pilot.workflow.{args.workflow}", globals(), locals(), [args.workflow], 0
)

# check if real-time logging is requested for this queue
#rtloggingtype
# update the pilot heartbeat file
update_pilot_heartbeat(time.time())

Expand Down Expand Up @@ -451,6 +449,16 @@ def get_args() -> Any:
help="Maximum number of getjob request failures in Harvester mode",
)

# no_token_renewal
arg_parser.add_argument(
"-y",
"--notokenrenewal",
dest="token_renewal",
action="store_false",
default=True,
help="Disable token renewal",
)

arg_parser.add_argument(
"--subscribe-to-msgsvc",
dest="subscribe_to_msgsvc",
Expand Down
49 changes: 33 additions & 16 deletions pilot/api/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
# - Mario Lassnig, [email protected], 2017
# - Paul Nilsson, [email protected], 2017-2024
# - Tobias Wegner, [email protected], 2017-2018
# - Alexey Anisenkov, [email protected], 2018-2019
# - Alexey Anisenkov, [email protected], 2018-2024

"""API for data transfers."""

Expand Down Expand Up @@ -1072,14 +1072,15 @@ class StageOutClient(StagingClient):

mode = "stage-out"

def prepare_destinations(self, files: list, activities: list or str) -> list:
def prepare_destinations(self, files: list, activities: list or str, alt_exclude: list = []) -> list:
"""
Resolve destination RSE (filespec.ddmendpoint) for each entry from `files` according to requested `activities`.
Apply Pilot-side logic to choose proper destination.
:param files: list of FileSpec objects to be processed (list)
:param activities: ordered list of activities to be used to resolve astorages (list or str)
:param alt_exclude: global list of destinations that should be excluded / not used for alternative stage-out
:return: updated fspec entries (list).
"""
if not self.infosys.queuedata: # infosys is not initialized: not able to fix destination if need, nothing to do
Expand Down Expand Up @@ -1108,11 +1109,26 @@ def prepare_destinations(self, files: list, activities: list or str) -> list:
raise PilotException(f"Failed to resolve destination: no associated storages defined for activity={activity} ({act})",
code=ErrorCodes.NOSTORAGE, state='NO_ASTORAGES_DEFINED')

# take the fist choice for now, extend the logic later if need
ddm = storages[0]
ddm_alt = storages[1] if len(storages) > 1 else None
def resolve_alt_destination(primary, exclude=None):
""" resolve alt destination as the next to primary entry not equal to `primary` and `exclude` """

cur = storages.index(primary) if primary in storages else 0
inext = (cur + 1) % len(storages) # cycle storages, take the first elem when reach end
exclude = set([primary] + list(exclude if exclude is not None else []))
alt = None
for attempt in range(len(exclude) or 1): # apply several tries to jump exclude entries (in case of dublicated data will stack)
inext = (cur + 1) % len(storages) # cycle storages, start from the beginning when reach end
if storages[inext] not in exclude:
alt = storages[inext]
break
cur += 1
return alt

# default destination
ddm = storages[0] # take the fist choice for now, extend the logic later if need
ddm_alt = resolve_alt_destination(ddm, exclude=alt_exclude)

self.logger.info(f"[prepare_destinations][{activity}]: allowed (local) destinations: {storages}")
self.logger.info(f"[prepare_destinations][{activity}]: allowed (local) destinations: {storages}, alt_exclude={alt_exclude}")
self.logger.info(f"[prepare_destinations][{activity}]: resolved default destination: ddm={ddm}, ddm_alt={ddm_alt}")

for e in files:
Expand All @@ -1121,17 +1137,18 @@ def prepare_destinations(self, files: list, activities: list or str) -> list:
" .. will use default ddm=%s as (local) destination; ddm_alt=%s", activity, e.lfn, ddm, ddm_alt)
e.ddmendpoint = ddm
e.ddmendpoint_alt = ddm_alt
elif e.ddmendpoint not in storages: # fspec.ddmendpoint is not in associated storages => assume it as final (non local) alternative destination
#elif e.ddmendpoint not in storages and is_unified: ## customize nucleus logic if need
# pass
elif e.ddmendpoint not in storages: # fspec.ddmendpoint is not in associated storages => use it as (non local) alternative destination
self.logger.info("[prepare_destinations][%s]: Requested fspec.ddmendpoint=%s is not in the list of allowed (local) destinations"
" .. will consider default ddm=%s for transfer and tag %s as alt. location", activity, e.ddmendpoint, ddm, e.ddmendpoint)
e.ddmendpoint_alt = e.ddmendpoint # verify me
e.ddmendpoint = ddm
else: # set corresponding ddmendpoint_alt if exist (next entry in available storages list)
cur = storages.index(e.ddmendpoint)
ddm_next = storages[cur + 1] if (cur + 1) < len(storages) else storages[0] # cycle storages, take the first elem when reach end
e.ddmendpoint_alt = ddm_next if e.ddmendpoint != ddm_next else None
self.logger.info("[prepare_destinations][%s]: set ddmendpoint_alt=%s for fspec.ddmendpoint=%s",
activity, e.ddmendpoint_alt, e.ddmendpoint)
" .. will consider default ddm=%s as primary and set %s as alt. location", activity, e.ddmendpoint, ddm, e.ddmendpoint)
e.ddmendpoint_alt = e.ddmendpoint if e.ddmendpoint not in alt_exclude else None
e.ddmendpoint = ddm # use default destination, check/verify nucleus case
else: # set corresponding ddmendpoint_alt if exist (next entry in cycled storages list)
e.ddmendpoint_alt = resolve_alt_destination(e.ddmendpoint, exclude=alt_exclude)

self.logger.info("[prepare_destinations][%s]: use ddmendpoint_alt=%s for fspec.ddmendpoint=%s",
activity, e.ddmendpoint_alt, e.ddmendpoint)

return files

Expand Down
2 changes: 2 additions & 0 deletions pilot/common/errorcodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ class ErrorCodes:
PREEMPTION = 1379
ARCPROXYFAILURE = 1380
ARCPROXYLIBFAILURE = 1381
PROXYTOOSHORT = 1382 # used at the beginning of the pilot to indicate that the proxy is too short

_error_messages = {
GENERALERROR: "General pilot error, consult batch log",
Expand Down Expand Up @@ -326,6 +327,7 @@ class ErrorCodes:
PREEMPTION: "Job was preempted",
ARCPROXYFAILURE: "General arcproxy failure",
ARCPROXYLIBFAILURE: "Arcproxy failure while loading shared libraries",
PROXYTOOSHORT: "Proxy is too short",
}

put_error_codes = [1135, 1136, 1137, 1141, 1152, 1181]
Expand Down
23 changes: 15 additions & 8 deletions pilot/control/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -917,19 +917,22 @@ def _do_stageout(job: JobData, args: object, xdata: list, activity: list, title:
kwargs = {'workdir': job.workdir, 'cwd': job.workdir, 'usecontainer': False, 'job': job,
'output_dir': args.output_dir, 'catchall': job.infosys.queuedata.catchall,
'rucio_host': args.rucio_host} #, mode='stage-out')
is_unified = job.infosys.queuedata.type == 'unified'
#is_unified = job.infosys.queuedata.type == 'unified'
# prod analy unification: use destination preferences from PanDA server for unified queues
if not is_unified:
client.prepare_destinations(xdata, activity) ## FIX ME LATER: split activities: for astorages and for copytools (to unify with ES workflow)
#if not is_unified:
# client.prepare_destinations(xdata, activity) ## FIX ME LATER: split activities: for astorages and for copytools (to unify with ES workflow)

altstageout = not is_unified and job.allow_altstageout() # do not use alt stage-out for unified queues
## FIX ME LATER: split activities: for `astorages` and `copytools` (to unify with ES workflow)
client.prepare_destinations(xdata, activity, alt_exclude=list(filter(None, [job.nucleus])))

altstageout = job.allow_altstageout()
client.transfer(xdata, activity, raise_exception=not altstageout, **kwargs)
remain_files = [entry for entry in xdata if entry.require_transfer()]
# check if alt stageout can be applied (all remain files must have alt storage declared ddmendpoint_alt)
has_altstorage = all(entry.ddmendpoint_alt and entry.ddmendpoint != entry.ddmendpoint_alt for entry in remain_files)

logger.info('alt stage-out settings: %s, is_unified=%s, altstageout=%s, remain_files=%s, has_altstorage=%s',
activity, is_unified, altstageout, len(remain_files), has_altstorage)
logger.info('alt stage-out settings: %s, allow_altstageout=%s, remain_files=%s, has_altstorage=%s',
activity, altstageout, len(remain_files), has_altstorage)

if altstageout and remain_files and has_altstorage: # apply alternative stageout for failed transfers
for entry in remain_files:
Expand Down Expand Up @@ -992,8 +995,12 @@ def _stage_out_new(job: JobData, args: object) -> bool:
logger.info('this job does not have any output files, only stage-out log file')
job.stageout = 'log'

is_unified = job.infosys.queuedata.type == 'unified'
is_analysis = job.is_analysis()
activities = ['write_lan_analysis', 'write_lan', 'w'] if is_unified and is_analysis else ['write_lan', 'w']

if job.stageout != 'log': ## do stage-out output files
if not _do_stageout(job, args, job.outdata, ['pw', 'w'], title='output',
if not _do_stageout(job, args, job.outdata, activities, title='output',
ipv=args.internet_protocol_version):
is_success = False
logger.warning('transfer of output file(s) failed')
Expand Down Expand Up @@ -1037,7 +1044,7 @@ def _stage_out_new(job: JobData, args: object) -> bool:
# write time stamps to pilot timing file
add_to_pilot_timing(job.jobid, PILOT_POST_LOG_TAR, time.time(), args)

if not _do_stageout(job, args, [logfile], ['pl', 'pw', 'w'], title='log',
if not _do_stageout(job, args, [logfile], ['pl'] + activities, title='log',
ipv=args.internet_protocol_version):
is_success = False
logger.warning('log transfer failed')
Expand Down
6 changes: 3 additions & 3 deletions pilot/control/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -1550,13 +1550,13 @@ def proceed_with_getjob(timefloor: int, starttime: int, jobnumber: int, getjob_r
if verify_proxy:
userproxy = __import__(f'pilot.user.{pilot_user}.proxy', globals(), locals(), [pilot_user], 0)

# is the proxy still valid?
exit_code, diagnostics = userproxy.verify_proxy(test=False)
# is the proxy still valid? at pilot startup, the proxy lifetime must be at least 72h
exit_code, diagnostics = userproxy.verify_proxy(test=False, pilotstartup=True)
if traces.pilot['error_code'] == 0: # careful so we don't overwrite another error code
traces.pilot['error_code'] = exit_code
if exit_code == errors.ARCPROXYLIBFAILURE:
logger.warning("currently ignoring arcproxy library failure")
if exit_code in {errors.NOPROXY, errors.NOVOMSPROXY, errors.CERTIFICATEHASEXPIRED}:
if exit_code in {errors.NOPROXY, errors.NOVOMSPROXY, errors.CERTIFICATEHASEXPIRED, errors.PROXYTOOSHORT}:
logger.warning(diagnostics)
return False

Expand Down
2 changes: 1 addition & 1 deletion pilot/control/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def control(queues: namedtuple, traces: Any, args: object): # noqa: C901
if tokendownloadchecktime and queuedata:
if int(time.time() - last_token_check) > tokendownloadchecktime:
last_token_check = time.time()
if 'no_token_renewal' in queuedata.catchall:
if 'no_token_renewal' in queuedata.catchall or args.token_renewal is False:
logger.info("OIDC token will not be renewed by the pilot")
else:
update_local_oidc_token_info(args.url, args.port)
Expand Down
2 changes: 1 addition & 1 deletion pilot/control/payloads/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -842,7 +842,7 @@ def run(self) -> tuple[int, str]: # noqa: C901
if stdout and stderr
else "General payload setup verification error (check setup logs)"
)
# check for special errors in thw output
# check for special errors in the output
exit_code = errors.resolve_transform_error(exit_code, diagnostics)
diagnostics = errors.format_diagnostics(exit_code, diagnostics)
return exit_code, diagnostics
Expand Down
5 changes: 3 additions & 2 deletions pilot/info/jobdata.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# under the License.
#
# Authors:
# - Alexey Anisenkov, [email protected], 2018-19
# - Alexey Anisenkov, [email protected], 2018-24
# - Paul Nilsson, [email protected], 2018-24
# - Wen Guan, [email protected], 2018

Expand Down Expand Up @@ -177,6 +177,7 @@ class JobData(BaseData):
noexecstrcnv = None # server instruction to the pilot if it should take payload setup from job parameters
swrelease = "" # software release string
writetofile = "" #
nucleus = ""

# cmtconfig encoded info
alrbuserplatform = "" # ALRB_USER_PLATFORM encoded in platform/cmtconfig value
Expand All @@ -195,7 +196,7 @@ class JobData(BaseData):
'swrelease', 'zipmap', 'imagename', 'imagename_jobdef', 'accessmode', 'transfertype',
'datasetin', ## TO BE DEPRECATED: moved to FileSpec (job.indata)
'infilesguids', 'memorymonitor', 'allownooutput', 'pandasecrets', 'prodproxy', 'alrbuserplatform',
'debug_command', 'dask_scheduler_ip', 'jupyter_session_ip', 'altstageout'],
'debug_command', 'dask_scheduler_ip', 'jupyter_session_ip', 'altstageout', 'nucleus'],
list: ['piloterrorcodes', 'piloterrordiags', 'workdirsizes', 'zombies', 'corecounts', 'subprocesses',
'logdata', 'outdata', 'indata'],
dict: ['status', 'fileinfo', 'metadata', 'utilities', 'overwrite_queuedata', 'sizes', 'preprocess',
Expand Down
16 changes: 11 additions & 5 deletions pilot/user/atlas/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def get_and_verify_proxy(x509: str, voms_role: str = '', proxy_type: str = '', w
return exit_code, diagnostics, x509


def verify_proxy(limit: int = None, x509: bool = None, proxy_id: str = "pilot", test: bool = False) -> tuple[int, str]:
def verify_proxy(limit: int = None, x509: bool = None, proxy_id: str = "pilot", test: bool = False, pilotstartup: bool = False) -> tuple[int, str]:
"""
Check for a valid voms/grid proxy longer than N hours.
Expand All @@ -100,8 +100,11 @@ def verify_proxy(limit: int = None, x509: bool = None, proxy_id: str = "pilot",
:param x509: points to the proxy file. If not set (=None) - get proxy file from X509_USER_PROXY environment (bool)
:param proxy_id: proxy id (str)
:param test: free Boolean test parameter (bool)
:param pilotstartup: free Boolean pilotstartup parameter (bool)
:return: exit code (NOPROXY or NOVOMSPROXY) (int), diagnostics (error diagnostics string) (str) (tuple).
"""
if pilotstartup:
limit = 72 # 3 days
if limit is None:
limit = 1

Expand Down Expand Up @@ -161,9 +164,6 @@ def verify_arcproxy(envsetup: str, limit: int, proxy_id: str = "pilot", test: bo
# validityLeft - duration of proxy validity left in seconds.
# vomsACvalidityEnd - timestamp when VOMS attribute validity ends.
# vomsACvalidityLeft - duration of VOMS attribute validity left in seconds.
cmd = f"{envsetup}arcproxy -i subject"
_exit_code, _, _ = execute(cmd, shell=True) # , usecontainer=True, copytool=True)

cmd = f"{envsetup}arcproxy -i validityEnd -i validityLeft -i vomsACvalidityEnd -i vomsACvalidityLeft"
_exit_code, stdout, stderr = execute_nothreads(cmd, shell=True) # , usecontainer=True, copytool=True)
if stdout is not None:
Expand All @@ -173,6 +173,7 @@ def verify_arcproxy(envsetup: str, limit: int, proxy_id: str = "pilot", test: bo
exit_code = -1
else:
exit_code, diagnostics, validity_end_cert, validity_end = interpret_proxy_info(_exit_code, stdout, stderr, limit)
# validity_end = int(time()) + 71 * 3600 # 71 hours test

if proxy_id and validity_end: # setup cache if requested
if exit_code == 0:
Expand Down Expand Up @@ -222,7 +223,12 @@ def check_time_left(proxyname: str, validity: int, limit: int) -> tuple[int, str
logger.info(f"cache: check {proxyname} validity: wanted={limit}h ({limit * 3600 - 20 * 60}s with grace) "
f"left={float(seconds_left) / 3600:.2f}h (now={tnow} validity={validity} left={seconds_left}s)")

if seconds_left < limit * 3600 - 20 * 60:
# special case for limit=72h (3 days) for pilot startup
if limit == 72 and seconds_left < limit * 3600 - 20 * 60:
diagnostics = f'proxy is too short for pilot startup: {float(seconds_left) / 3600:.2f}h'
logger.warning(diagnostics)
exit_code = errors.PROXYTOOSHORT
elif seconds_left < limit * 3600 - 20 * 60:
diagnostics = f'cert/proxy is about to expire: {float(seconds_left) / 3600:.2f}h'
logger.warning(diagnostics)
exit_code = errors.CERTIFICATEHASEXPIRED if proxyname == 'cert' else errors.VOMSPROXYABOUTTOEXPIRE
Expand Down
3 changes: 2 additions & 1 deletion pilot/user/generic/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
logger = logging.getLogger(__name__)


def verify_proxy(limit: int = None, x509: str = None, proxy_id: str = "pilot", test: bool = False) -> (int, str):
def verify_proxy(limit: int = None, x509: str = None, proxy_id: str = "pilot", test: bool = False, pilotstartup: bool = False) -> (int, str):
"""
Check for a valid voms/grid proxy longer than N hours.
Expand All @@ -37,6 +37,7 @@ def verify_proxy(limit: int = None, x509: str = None, proxy_id: str = "pilot", t
:param x509: points to the proxy file. If not set (=None) - get proxy file from X509_USER_PROXY environment (str)
:param proxy_id: proxy id (str)
:param test: free Boolean test parameter (bool)
:param pilotstartup: free Boolean pilotstartup parameter (bool)
:return: exit code (NOPROXY or NOVOMSPROXY), diagnostics (error diagnostics string) (int, str).
"""
if limit or x509 or proxy_id or test: # to bypass pylint score 0
Expand Down
Loading

0 comments on commit 3440240

Please sign in to comment.