Skip to content

Commit

Permalink
Merge pull request PanDAWMS#154 from PanDAWMS/next
Browse files Browse the repository at this point in the history
3.9.2.41
  • Loading branch information
PalNilsson authored Dec 5, 2024
2 parents 41ed500 + 9edf4e8 commit 95f7af1
Show file tree
Hide file tree
Showing 37 changed files with 790 additions and 305 deletions.
2 changes: 1 addition & 1 deletion PILOTVERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.9.1.13
3.9.2.41
14 changes: 11 additions & 3 deletions pilot.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def main() -> int:
return error.get_error_code()

# update the OIDC token if necessary (after queuedata has been downloaded, since PQ.catchall can contain instruction to prevent token renewal)
if 'no_token_renewal' in infosys.queuedata.catchall:
if 'no_token_renewal' in infosys.queuedata.catchall or args.token_renewal is False:
logger.info("OIDC token will not be renewed by the pilot")
else:
update_local_oidc_token_info(args.url, args.port)
Expand Down Expand Up @@ -182,8 +182,6 @@ def main() -> int:
f"pilot.workflow.{args.workflow}", globals(), locals(), [args.workflow], 0
)

# check if real-time logging is requested for this queue
#rtloggingtype
# update the pilot heartbeat file
update_pilot_heartbeat(time.time())

Expand Down Expand Up @@ -451,6 +449,16 @@ def get_args() -> Any:
help="Maximum number of getjob request failures in Harvester mode",
)

# no_token_renewal
arg_parser.add_argument(
"-y",
"--notokenrenewal",
dest="token_renewal",
action="store_false",
default=True,
help="Disable token renewal",
)

arg_parser.add_argument(
"--subscribe-to-msgsvc",
dest="subscribe_to_msgsvc",
Expand Down
52 changes: 36 additions & 16 deletions pilot/api/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
# - Mario Lassnig, [email protected], 2017
# - Paul Nilsson, [email protected], 2017-2024
# - Tobias Wegner, [email protected], 2017-2018
# - Alexey Anisenkov, [email protected], 2018-2019
# - Alexey Anisenkov, [email protected], 2018-2024

"""API for data transfers."""

Expand Down Expand Up @@ -1072,16 +1072,20 @@ class StageOutClient(StagingClient):

mode = "stage-out"

def prepare_destinations(self, files: list, activities: list or str) -> list:
def prepare_destinations(self, files: list, activities: list or str, alt_exclude: list = None) -> list:
"""
Resolve destination RSE (filespec.ddmendpoint) for each entry from `files` according to requested `activities`.
Apply Pilot-side logic to choose proper destination.
:param files: list of FileSpec objects to be processed (list)
:param activities: ordered list of activities to be used to resolve astorages (list or str)
:param alt_exclude: global list of destinations that should be excluded / not used for alternative stage-out
:return: updated fspec entries (list).
"""
if alt_exclude is None: # to bypass pylint complaint if declared as [] above
alt_exclude = []

if not self.infosys.queuedata: # infosys is not initialized: not able to fix destination if need, nothing to do
return files

Expand All @@ -1108,11 +1112,26 @@ def prepare_destinations(self, files: list, activities: list or str) -> list:
raise PilotException(f"Failed to resolve destination: no associated storages defined for activity={activity} ({act})",
code=ErrorCodes.NOSTORAGE, state='NO_ASTORAGES_DEFINED')

# take the fist choice for now, extend the logic later if need
ddm = storages[0]
ddm_alt = storages[1] if len(storages) > 1 else None
def resolve_alt_destination(primary, exclude=None):
""" resolve alt destination as the next to primary entry not equal to `primary` and `exclude` """

cur = storages.index(primary) if primary in storages else 0
inext = (cur + 1) % len(storages) # cycle storages, take the first elem when reach end
exclude = set([primary] + list(exclude if exclude is not None else []))
alt = None
for attempt in range(len(exclude) or 1): # apply several tries to jump exclude entries (in case of dublicated data will stack)
inext = (cur + 1) % len(storages) # cycle storages, start from the beginning when reach end
if storages[inext] not in exclude:
alt = storages[inext]
break
cur += 1
return alt

# default destination
ddm = storages[0] # take the fist choice for now, extend the logic later if need
ddm_alt = resolve_alt_destination(ddm, exclude=alt_exclude)

self.logger.info(f"[prepare_destinations][{activity}]: allowed (local) destinations: {storages}")
self.logger.info(f"[prepare_destinations][{activity}]: allowed (local) destinations: {storages}, alt_exclude={alt_exclude}")
self.logger.info(f"[prepare_destinations][{activity}]: resolved default destination: ddm={ddm}, ddm_alt={ddm_alt}")

for e in files:
Expand All @@ -1121,17 +1140,18 @@ def prepare_destinations(self, files: list, activities: list or str) -> list:
" .. will use default ddm=%s as (local) destination; ddm_alt=%s", activity, e.lfn, ddm, ddm_alt)
e.ddmendpoint = ddm
e.ddmendpoint_alt = ddm_alt
elif e.ddmendpoint not in storages: # fspec.ddmendpoint is not in associated storages => assume it as final (non local) alternative destination
#elif e.ddmendpoint not in storages and is_unified: ## customize nucleus logic if need
# pass
elif e.ddmendpoint not in storages: # fspec.ddmendpoint is not in associated storages => use it as (non local) alternative destination
self.logger.info("[prepare_destinations][%s]: Requested fspec.ddmendpoint=%s is not in the list of allowed (local) destinations"
" .. will consider default ddm=%s for transfer and tag %s as alt. location", activity, e.ddmendpoint, ddm, e.ddmendpoint)
e.ddmendpoint_alt = e.ddmendpoint # verify me
e.ddmendpoint = ddm
else: # set corresponding ddmendpoint_alt if exist (next entry in available storages list)
cur = storages.index(e.ddmendpoint)
ddm_next = storages[cur + 1] if (cur + 1) < len(storages) else storages[0] # cycle storages, take the first elem when reach end
e.ddmendpoint_alt = ddm_next if e.ddmendpoint != ddm_next else None
self.logger.info("[prepare_destinations][%s]: set ddmendpoint_alt=%s for fspec.ddmendpoint=%s",
activity, e.ddmendpoint_alt, e.ddmendpoint)
" .. will consider default ddm=%s as primary and set %s as alt. location", activity, e.ddmendpoint, ddm, e.ddmendpoint)
e.ddmendpoint_alt = e.ddmendpoint if e.ddmendpoint not in alt_exclude else None
e.ddmendpoint = ddm # use default destination, check/verify nucleus case
else: # set corresponding ddmendpoint_alt if exist (next entry in cycled storages list)
e.ddmendpoint_alt = resolve_alt_destination(e.ddmendpoint, exclude=alt_exclude)

self.logger.info("[prepare_destinations][%s]: use ddmendpoint_alt=%s for fspec.ddmendpoint=%s",
activity, e.ddmendpoint_alt, e.ddmendpoint)

return files

Expand Down
2 changes: 2 additions & 0 deletions pilot/common/errorcodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ class ErrorCodes:
PREEMPTION = 1379
ARCPROXYFAILURE = 1380
ARCPROXYLIBFAILURE = 1381
PROXYTOOSHORT = 1382 # used at the beginning of the pilot to indicate that the proxy is too short

_error_messages = {
GENERALERROR: "General pilot error, consult batch log",
Expand Down Expand Up @@ -326,6 +327,7 @@ class ErrorCodes:
PREEMPTION: "Job was preempted",
ARCPROXYFAILURE: "General arcproxy failure",
ARCPROXYLIBFAILURE: "Arcproxy failure while loading shared libraries",
PROXYTOOSHORT: "Proxy is too short",
}

put_error_codes = [1135, 1136, 1137, 1141, 1152, 1181]
Expand Down
25 changes: 17 additions & 8 deletions pilot/control/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,8 @@ def _stage_in(args: object, job: JobData) -> bool:
logger.info(" -- lfn=%s, status_code=%s, status=%s", infile.lfn, infile.status_code, status)

# write time stamps to pilot timing file

# MOVE THIS TO AFTER REMOTE FILE OPEN HAS BEEN VERIFIED (actually just before the payload starts)
add_to_pilot_timing(job.jobid, PILOT_POST_STAGEIN, time.time(), args)

remain_files = [infile for infile in job.indata if infile.status not in ['remote_io', 'transferred', 'no_transfer']]
Expand Down Expand Up @@ -917,19 +919,22 @@ def _do_stageout(job: JobData, args: object, xdata: list, activity: list, title:
kwargs = {'workdir': job.workdir, 'cwd': job.workdir, 'usecontainer': False, 'job': job,
'output_dir': args.output_dir, 'catchall': job.infosys.queuedata.catchall,
'rucio_host': args.rucio_host} #, mode='stage-out')
is_unified = job.infosys.queuedata.type == 'unified'
#is_unified = job.infosys.queuedata.type == 'unified'
# prod analy unification: use destination preferences from PanDA server for unified queues
if not is_unified:
client.prepare_destinations(xdata, activity) ## FIX ME LATER: split activities: for astorages and for copytools (to unify with ES workflow)
#if not is_unified:
# client.prepare_destinations(xdata, activity) ## FIX ME LATER: split activities: for astorages and for copytools (to unify with ES workflow)

## FIX ME LATER: split activities: for `astorages` and `copytools` (to unify with ES workflow)
client.prepare_destinations(xdata, activity, alt_exclude=list(filter(None, [job.nucleus])))

altstageout = not is_unified and job.allow_altstageout() # do not use alt stage-out for unified queues
altstageout = job.allow_altstageout()
client.transfer(xdata, activity, raise_exception=not altstageout, **kwargs)
remain_files = [entry for entry in xdata if entry.require_transfer()]
# check if alt stageout can be applied (all remain files must have alt storage declared ddmendpoint_alt)
has_altstorage = all(entry.ddmendpoint_alt and entry.ddmendpoint != entry.ddmendpoint_alt for entry in remain_files)

logger.info('alt stage-out settings: %s, is_unified=%s, altstageout=%s, remain_files=%s, has_altstorage=%s',
activity, is_unified, altstageout, len(remain_files), has_altstorage)
logger.info('alt stage-out settings: %s, allow_altstageout=%s, remain_files=%s, has_altstorage=%s',
activity, altstageout, len(remain_files), has_altstorage)

if altstageout and remain_files and has_altstorage: # apply alternative stageout for failed transfers
for entry in remain_files:
Expand Down Expand Up @@ -992,8 +997,12 @@ def _stage_out_new(job: JobData, args: object) -> bool:
logger.info('this job does not have any output files, only stage-out log file')
job.stageout = 'log'

is_unified = job.infosys.queuedata.type == 'unified'
is_analysis = job.is_analysis()
activities = ['write_lan_analysis', 'write_lan', 'w'] if is_unified and is_analysis else ['write_lan', 'w']

if job.stageout != 'log': ## do stage-out output files
if not _do_stageout(job, args, job.outdata, ['pw', 'w'], title='output',
if not _do_stageout(job, args, job.outdata, activities, title='output',
ipv=args.internet_protocol_version):
is_success = False
logger.warning('transfer of output file(s) failed')
Expand Down Expand Up @@ -1037,7 +1046,7 @@ def _stage_out_new(job: JobData, args: object) -> bool:
# write time stamps to pilot timing file
add_to_pilot_timing(job.jobid, PILOT_POST_LOG_TAR, time.time(), args)

if not _do_stageout(job, args, [logfile], ['pl', 'pw', 'w'], title='log',
if not _do_stageout(job, args, [logfile], ['pl'] + activities, title='log',
ipv=args.internet_protocol_version):
is_success = False
logger.warning('log transfer failed')
Expand Down
15 changes: 9 additions & 6 deletions pilot/control/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -1550,13 +1550,13 @@ def proceed_with_getjob(timefloor: int, starttime: int, jobnumber: int, getjob_r
if verify_proxy:
userproxy = __import__(f'pilot.user.{pilot_user}.proxy', globals(), locals(), [pilot_user], 0)

# is the proxy still valid?
exit_code, diagnostics = userproxy.verify_proxy(test=False)
# is the proxy still valid? at pilot startup, the proxy lifetime must be at least 72h
exit_code, diagnostics = userproxy.verify_proxy(test=False, pilotstartup=True)
if traces.pilot['error_code'] == 0: # careful so we don't overwrite another error code
traces.pilot['error_code'] = exit_code
if exit_code == errors.ARCPROXYLIBFAILURE:
logger.warning("currently ignoring arcproxy library failure")
if exit_code in {errors.NOPROXY, errors.NOVOMSPROXY, errors.CERTIFICATEHASEXPIRED}:
if exit_code in {errors.NOPROXY, errors.NOVOMSPROXY, errors.CERTIFICATEHASEXPIRED, errors.PROXYTOOSHORT}:
logger.warning(diagnostics)
return False

Expand Down Expand Up @@ -1684,10 +1684,13 @@ def get_job_definition_from_server(args: Any, taskid: str = "") -> str:
cmd = https.get_server_command(args.url, args.port)
if cmd != "":
logger.info(f'executing server command: {cmd}')
res = https.request2(cmd, data=data, panda=True) # will be a dictionary
logger.debug(f"request2 response: {res}") # should be StatusCode=0 if all is ok
if not res: # fallback to curl solution
if "curlgetjob" in infosys.queuedata.catchall:
res = https.request(cmd, data=data)
else:
res = https.request2(cmd, data=data, panda=True) # will be a dictionary
logger.debug(f"request2 response: {res}") # should be StatusCode=0 if all is ok
if not res: # fallback to curl solution
res = https.request(cmd, data=data)

return res

Expand Down
2 changes: 1 addition & 1 deletion pilot/control/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def control(queues: namedtuple, traces: Any, args: object): # noqa: C901
if tokendownloadchecktime and queuedata:
if int(time.time() - last_token_check) > tokendownloadchecktime:
last_token_check = time.time()
if 'no_token_renewal' in queuedata.catchall:
if 'no_token_renewal' in queuedata.catchall or args.token_renewal is False:
logger.info("OIDC token will not be renewed by the pilot")
else:
update_local_oidc_token_info(args.url, args.port)
Expand Down
25 changes: 24 additions & 1 deletion pilot/control/payload.py
Original file line number Diff line number Diff line change
Expand Up @@ -744,7 +744,30 @@ def scan_for_memory_errors(subprocesses: list) -> str:
if search_str in line:
diagnostics = line[line.find(search_str):]
logger.warning(f'found memory error: {diagnostics}')
break

# make sure that this message is for a true subprocess of the pilot
# extract the pid from the message and compare it to the subprocesses list
match = search(r'Killed process (\d+)', diagnostics)
if match:
try:
found_pid = int(match.group(1))
logger.info(f"extracted PID: {found_pid}")

# is it a known subprocess?
if found_pid in subprocesses:
logger.info("PID found in the list of subprocesses")
break
else:
logger.warning("the extracted PID is not a known subprocess of the payload")
diagnostics = ""
# is the extracted PID a subprocess of the main pilot process itself?

except (ValueError, TypeError, AttributeError) as e:
logger.warning(f"failed to extract PID from the message: {e}")
diagnostics = ""
else:
logger.warning("PID could not be extracted from the message")
diagnostics = ""

if diagnostics:
break
Expand Down
Loading

0 comments on commit 95f7af1

Please sign in to comment.