Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

alt stage-out for unified queues #152

Merged
merged 28 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
528a6b0
Corrected a few type hints, removed unused code
Oct 18, 2024
3e04dc6
execute() now using thread synchronization to protect against polling…
Oct 18, 2024
17f1b0a
Using start_new_session instead of preexec_fn in subprocess.Popen(). …
Oct 21, 2024
f58d93c
Various tests for oom score, now out-commented
Oct 28, 2024
8556790
Added new option for turning off token renewal
Oct 28, 2024
831cfa4
Added new option for turning off token renewal
Oct 28, 2024
162dc16
Now locating correct payload pid for oom score
PalNilsson Oct 29, 2024
f488558
Testing oom write
Oct 30, 2024
b93f3cc
Removed useless arcproxy -i subject
PalNilsson Oct 31, 2024
b1c4549
Cleanup
PalNilsson Oct 31, 2024
0b5ff5f
Added PROXYTOOSHORT
PalNilsson Oct 31, 2024
dae81f3
Added PROXYTOOSHORT as pilot exit code 80
PalNilsson Oct 31, 2024
15037f9
Implementing and testing proxy too short
PalNilsson Oct 31, 2024
5ab7204
Removed test
PalNilsson Oct 31, 2024
d89faa3
Updated build number
PalNilsson Oct 31, 2024
5ece001
Removed proxy name check
Oct 31, 2024
3d499d8
Pylint updates
Nov 1, 2024
5a02856
Pylint updates
Nov 1, 2024
f495201
Pylint updates
Nov 1, 2024
fc76bce
Pylint updates
Nov 1, 2024
cdfc37f
Pylint updates
Nov 1, 2024
ca8683d
Pylint updates
Nov 1, 2024
e20a831
Pylint updates
Nov 1, 2024
f60beed
Pylint updates
Nov 4, 2024
cbdced9
IPv4 support in request2()
PalNilsson Nov 6, 2024
b9825c4
Added support for http_proxy
Nov 7, 2024
87123d3
enable altstageout for unified queues; nucleus use-case should be che…
anisyonk Nov 8, 2024
024cd70
modify altstage out logic (exclude nucleus)
anisyonk Nov 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion PILOTVERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.9.1.13
3.9.2.23b
14 changes: 11 additions & 3 deletions pilot.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def main() -> int:
return error.get_error_code()

# update the OIDC token if necessary (after queuedata has been downloaded, since PQ.catchall can contain instruction to prevent token renewal)
if 'no_token_renewal' in infosys.queuedata.catchall:
if 'no_token_renewal' in infosys.queuedata.catchall or args.token_renewal is False:
logger.info("OIDC token will not be renewed by the pilot")
else:
update_local_oidc_token_info(args.url, args.port)
Expand Down Expand Up @@ -182,8 +182,6 @@ def main() -> int:
f"pilot.workflow.{args.workflow}", globals(), locals(), [args.workflow], 0
)

# check if real-time logging is requested for this queue
#rtloggingtype
# update the pilot heartbeat file
update_pilot_heartbeat(time.time())

Expand Down Expand Up @@ -451,6 +449,16 @@ def get_args() -> Any:
help="Maximum number of getjob request failures in Harvester mode",
)

# no_token_renewal
arg_parser.add_argument(
"-y",
"--notokenrenewal",
dest="token_renewal",
action="store_false",
default=True,
help="Disable token renewal",
)

arg_parser.add_argument(
"--subscribe-to-msgsvc",
dest="subscribe_to_msgsvc",
Expand Down
49 changes: 33 additions & 16 deletions pilot/api/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
# - Mario Lassnig, [email protected], 2017
# - Paul Nilsson, [email protected], 2017-2024
# - Tobias Wegner, [email protected], 2017-2018
# - Alexey Anisenkov, [email protected], 2018-2019
# - Alexey Anisenkov, [email protected], 2018-2024

"""API for data transfers."""

Expand Down Expand Up @@ -1072,14 +1072,15 @@ class StageOutClient(StagingClient):

mode = "stage-out"

def prepare_destinations(self, files: list, activities: list or str) -> list:
def prepare_destinations(self, files: list, activities: list or str, alt_exclude: list = []) -> list:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pylint will complain with "W0102: Dangerous default value [] as argument (dangerous-default-value)" here. I will change it to alt_exclude: list = None, and then add "if alt_exclude is None: alt_exclude = []"

"""
Resolve destination RSE (filespec.ddmendpoint) for each entry from `files` according to requested `activities`.

Apply Pilot-side logic to choose proper destination.

:param files: list of FileSpec objects to be processed (list)
:param activities: ordered list of activities to be used to resolve astorages (list or str)
:param alt_exclude: global list of destinations that should be excluded / not used for alternative stage-out
:return: updated fspec entries (list).
"""
if not self.infosys.queuedata: # infosys is not initialized: not able to fix destination if need, nothing to do
Expand Down Expand Up @@ -1108,11 +1109,26 @@ def prepare_destinations(self, files: list, activities: list or str) -> list:
raise PilotException(f"Failed to resolve destination: no associated storages defined for activity={activity} ({act})",
code=ErrorCodes.NOSTORAGE, state='NO_ASTORAGES_DEFINED')

# take the fist choice for now, extend the logic later if need
ddm = storages[0]
ddm_alt = storages[1] if len(storages) > 1 else None
def resolve_alt_destination(primary, exclude=None):
""" resolve alt destination as the next to primary entry not equal to `primary` and `exclude` """

cur = storages.index(primary) if primary in storages else 0
inext = (cur + 1) % len(storages) # cycle storages, take the first elem when reach end
exclude = set([primary] + list(exclude if exclude is not None else []))
alt = None
for attempt in range(len(exclude) or 1): # apply several tries to jump exclude entries (in case of dublicated data will stack)
inext = (cur + 1) % len(storages) # cycle storages, start from the beginning when reach end
if storages[inext] not in exclude:
alt = storages[inext]
break
cur += 1
return alt

# default destination
ddm = storages[0] # take the fist choice for now, extend the logic later if need
ddm_alt = resolve_alt_destination(ddm, exclude=alt_exclude)

self.logger.info(f"[prepare_destinations][{activity}]: allowed (local) destinations: {storages}")
self.logger.info(f"[prepare_destinations][{activity}]: allowed (local) destinations: {storages}, alt_exclude={alt_exclude}")
self.logger.info(f"[prepare_destinations][{activity}]: resolved default destination: ddm={ddm}, ddm_alt={ddm_alt}")

for e in files:
Expand All @@ -1121,17 +1137,18 @@ def prepare_destinations(self, files: list, activities: list or str) -> list:
" .. will use default ddm=%s as (local) destination; ddm_alt=%s", activity, e.lfn, ddm, ddm_alt)
e.ddmendpoint = ddm
e.ddmendpoint_alt = ddm_alt
elif e.ddmendpoint not in storages: # fspec.ddmendpoint is not in associated storages => assume it as final (non local) alternative destination
#elif e.ddmendpoint not in storages and is_unified: ## customize nucleus logic if need
# pass
elif e.ddmendpoint not in storages: # fspec.ddmendpoint is not in associated storages => use it as (non local) alternative destination
self.logger.info("[prepare_destinations][%s]: Requested fspec.ddmendpoint=%s is not in the list of allowed (local) destinations"
" .. will consider default ddm=%s for transfer and tag %s as alt. location", activity, e.ddmendpoint, ddm, e.ddmendpoint)
e.ddmendpoint_alt = e.ddmendpoint # verify me
e.ddmendpoint = ddm
else: # set corresponding ddmendpoint_alt if exist (next entry in available storages list)
cur = storages.index(e.ddmendpoint)
ddm_next = storages[cur + 1] if (cur + 1) < len(storages) else storages[0] # cycle storages, take the first elem when reach end
e.ddmendpoint_alt = ddm_next if e.ddmendpoint != ddm_next else None
self.logger.info("[prepare_destinations][%s]: set ddmendpoint_alt=%s for fspec.ddmendpoint=%s",
activity, e.ddmendpoint_alt, e.ddmendpoint)
" .. will consider default ddm=%s as primary and set %s as alt. location", activity, e.ddmendpoint, ddm, e.ddmendpoint)
e.ddmendpoint_alt = e.ddmendpoint if e.ddmendpoint not in alt_exclude else None
e.ddmendpoint = ddm # use default destination, check/verify nucleus case
else: # set corresponding ddmendpoint_alt if exist (next entry in cycled storages list)
e.ddmendpoint_alt = resolve_alt_destination(e.ddmendpoint, exclude=alt_exclude)

self.logger.info("[prepare_destinations][%s]: use ddmendpoint_alt=%s for fspec.ddmendpoint=%s",
activity, e.ddmendpoint_alt, e.ddmendpoint)

return files

Expand Down
2 changes: 2 additions & 0 deletions pilot/common/errorcodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ class ErrorCodes:
PREEMPTION = 1379
ARCPROXYFAILURE = 1380
ARCPROXYLIBFAILURE = 1381
PROXYTOOSHORT = 1382 # used at the beginning of the pilot to indicate that the proxy is too short

_error_messages = {
GENERALERROR: "General pilot error, consult batch log",
Expand Down Expand Up @@ -326,6 +327,7 @@ class ErrorCodes:
PREEMPTION: "Job was preempted",
ARCPROXYFAILURE: "General arcproxy failure",
ARCPROXYLIBFAILURE: "Arcproxy failure while loading shared libraries",
PROXYTOOSHORT: "Proxy is too short",
}

put_error_codes = [1135, 1136, 1137, 1141, 1152, 1181]
Expand Down
23 changes: 15 additions & 8 deletions pilot/control/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -917,19 +917,22 @@ def _do_stageout(job: JobData, args: object, xdata: list, activity: list, title:
kwargs = {'workdir': job.workdir, 'cwd': job.workdir, 'usecontainer': False, 'job': job,
'output_dir': args.output_dir, 'catchall': job.infosys.queuedata.catchall,
'rucio_host': args.rucio_host} #, mode='stage-out')
is_unified = job.infosys.queuedata.type == 'unified'
#is_unified = job.infosys.queuedata.type == 'unified'
# prod analy unification: use destination preferences from PanDA server for unified queues
if not is_unified:
client.prepare_destinations(xdata, activity) ## FIX ME LATER: split activities: for astorages and for copytools (to unify with ES workflow)
#if not is_unified:
# client.prepare_destinations(xdata, activity) ## FIX ME LATER: split activities: for astorages and for copytools (to unify with ES workflow)

altstageout = not is_unified and job.allow_altstageout() # do not use alt stage-out for unified queues
## FIX ME LATER: split activities: for `astorages` and `copytools` (to unify with ES workflow)
client.prepare_destinations(xdata, activity, alt_exclude=list(filter(None, [job.nucleus])))

altstageout = job.allow_altstageout()
client.transfer(xdata, activity, raise_exception=not altstageout, **kwargs)
remain_files = [entry for entry in xdata if entry.require_transfer()]
# check if alt stageout can be applied (all remain files must have alt storage declared ddmendpoint_alt)
has_altstorage = all(entry.ddmendpoint_alt and entry.ddmendpoint != entry.ddmendpoint_alt for entry in remain_files)

logger.info('alt stage-out settings: %s, is_unified=%s, altstageout=%s, remain_files=%s, has_altstorage=%s',
activity, is_unified, altstageout, len(remain_files), has_altstorage)
logger.info('alt stage-out settings: %s, allow_altstageout=%s, remain_files=%s, has_altstorage=%s',
activity, altstageout, len(remain_files), has_altstorage)

if altstageout and remain_files and has_altstorage: # apply alternative stageout for failed transfers
for entry in remain_files:
Expand Down Expand Up @@ -992,8 +995,12 @@ def _stage_out_new(job: JobData, args: object) -> bool:
logger.info('this job does not have any output files, only stage-out log file')
job.stageout = 'log'

is_unified = job.infosys.queuedata.type == 'unified'
is_analysis = job.is_analysis()
activities = ['write_lan_analysis', 'write_lan', 'w'] if is_unified and is_analysis else ['write_lan', 'w']

if job.stageout != 'log': ## do stage-out output files
if not _do_stageout(job, args, job.outdata, ['pw', 'w'], title='output',
if not _do_stageout(job, args, job.outdata, activities, title='output',
ipv=args.internet_protocol_version):
is_success = False
logger.warning('transfer of output file(s) failed')
Expand Down Expand Up @@ -1037,7 +1044,7 @@ def _stage_out_new(job: JobData, args: object) -> bool:
# write time stamps to pilot timing file
add_to_pilot_timing(job.jobid, PILOT_POST_LOG_TAR, time.time(), args)

if not _do_stageout(job, args, [logfile], ['pl', 'pw', 'w'], title='log',
if not _do_stageout(job, args, [logfile], ['pl'] + activities, title='log',
ipv=args.internet_protocol_version):
is_success = False
logger.warning('log transfer failed')
Expand Down
6 changes: 3 additions & 3 deletions pilot/control/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -1550,13 +1550,13 @@ def proceed_with_getjob(timefloor: int, starttime: int, jobnumber: int, getjob_r
if verify_proxy:
userproxy = __import__(f'pilot.user.{pilot_user}.proxy', globals(), locals(), [pilot_user], 0)

# is the proxy still valid?
exit_code, diagnostics = userproxy.verify_proxy(test=False)
# is the proxy still valid? at pilot startup, the proxy lifetime must be at least 72h
exit_code, diagnostics = userproxy.verify_proxy(test=False, pilotstartup=True)
if traces.pilot['error_code'] == 0: # careful so we don't overwrite another error code
traces.pilot['error_code'] = exit_code
if exit_code == errors.ARCPROXYLIBFAILURE:
logger.warning("currently ignoring arcproxy library failure")
if exit_code in {errors.NOPROXY, errors.NOVOMSPROXY, errors.CERTIFICATEHASEXPIRED}:
if exit_code in {errors.NOPROXY, errors.NOVOMSPROXY, errors.CERTIFICATEHASEXPIRED, errors.PROXYTOOSHORT}:
logger.warning(diagnostics)
return False

Expand Down
2 changes: 1 addition & 1 deletion pilot/control/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def control(queues: namedtuple, traces: Any, args: object): # noqa: C901
if tokendownloadchecktime and queuedata:
if int(time.time() - last_token_check) > tokendownloadchecktime:
last_token_check = time.time()
if 'no_token_renewal' in queuedata.catchall:
if 'no_token_renewal' in queuedata.catchall or args.token_renewal is False:
logger.info("OIDC token will not be renewed by the pilot")
else:
update_local_oidc_token_info(args.url, args.port)
Expand Down
2 changes: 1 addition & 1 deletion pilot/control/payloads/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -842,7 +842,7 @@ def run(self) -> tuple[int, str]: # noqa: C901
if stdout and stderr
else "General payload setup verification error (check setup logs)"
)
# check for special errors in thw output
# check for special errors in the output
exit_code = errors.resolve_transform_error(exit_code, diagnostics)
diagnostics = errors.format_diagnostics(exit_code, diagnostics)
return exit_code, diagnostics
Expand Down
5 changes: 3 additions & 2 deletions pilot/info/jobdata.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# under the License.
#
# Authors:
# - Alexey Anisenkov, [email protected], 2018-19
# - Alexey Anisenkov, [email protected], 2018-24
# - Paul Nilsson, [email protected], 2018-24
# - Wen Guan, [email protected], 2018

Expand Down Expand Up @@ -177,6 +177,7 @@ class JobData(BaseData):
noexecstrcnv = None # server instruction to the pilot if it should take payload setup from job parameters
swrelease = "" # software release string
writetofile = "" #
nucleus = ""

# cmtconfig encoded info
alrbuserplatform = "" # ALRB_USER_PLATFORM encoded in platform/cmtconfig value
Expand All @@ -195,7 +196,7 @@ class JobData(BaseData):
'swrelease', 'zipmap', 'imagename', 'imagename_jobdef', 'accessmode', 'transfertype',
'datasetin', ## TO BE DEPRECATED: moved to FileSpec (job.indata)
'infilesguids', 'memorymonitor', 'allownooutput', 'pandasecrets', 'prodproxy', 'alrbuserplatform',
'debug_command', 'dask_scheduler_ip', 'jupyter_session_ip', 'altstageout'],
'debug_command', 'dask_scheduler_ip', 'jupyter_session_ip', 'altstageout', 'nucleus'],
list: ['piloterrorcodes', 'piloterrordiags', 'workdirsizes', 'zombies', 'corecounts', 'subprocesses',
'logdata', 'outdata', 'indata'],
dict: ['status', 'fileinfo', 'metadata', 'utilities', 'overwrite_queuedata', 'sizes', 'preprocess',
Expand Down
16 changes: 11 additions & 5 deletions pilot/user/atlas/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def get_and_verify_proxy(x509: str, voms_role: str = '', proxy_type: str = '', w
return exit_code, diagnostics, x509


def verify_proxy(limit: int = None, x509: bool = None, proxy_id: str = "pilot", test: bool = False) -> tuple[int, str]:
def verify_proxy(limit: int = None, x509: bool = None, proxy_id: str = "pilot", test: bool = False, pilotstartup: bool = False) -> tuple[int, str]:
"""
Check for a valid voms/grid proxy longer than N hours.

Expand All @@ -100,8 +100,11 @@ def verify_proxy(limit: int = None, x509: bool = None, proxy_id: str = "pilot",
:param x509: points to the proxy file. If not set (=None) - get proxy file from X509_USER_PROXY environment (bool)
:param proxy_id: proxy id (str)
:param test: free Boolean test parameter (bool)
:param pilotstartup: free Boolean pilotstartup parameter (bool)
:return: exit code (NOPROXY or NOVOMSPROXY) (int), diagnostics (error diagnostics string) (str) (tuple).
"""
if pilotstartup:
limit = 72 # 3 days
if limit is None:
limit = 1

Expand Down Expand Up @@ -161,9 +164,6 @@ def verify_arcproxy(envsetup: str, limit: int, proxy_id: str = "pilot", test: bo
# validityLeft - duration of proxy validity left in seconds.
# vomsACvalidityEnd - timestamp when VOMS attribute validity ends.
# vomsACvalidityLeft - duration of VOMS attribute validity left in seconds.
cmd = f"{envsetup}arcproxy -i subject"
_exit_code, _, _ = execute(cmd, shell=True) # , usecontainer=True, copytool=True)

cmd = f"{envsetup}arcproxy -i validityEnd -i validityLeft -i vomsACvalidityEnd -i vomsACvalidityLeft"
_exit_code, stdout, stderr = execute_nothreads(cmd, shell=True) # , usecontainer=True, copytool=True)
if stdout is not None:
Expand All @@ -173,6 +173,7 @@ def verify_arcproxy(envsetup: str, limit: int, proxy_id: str = "pilot", test: bo
exit_code = -1
else:
exit_code, diagnostics, validity_end_cert, validity_end = interpret_proxy_info(_exit_code, stdout, stderr, limit)
# validity_end = int(time()) + 71 * 3600 # 71 hours test

if proxy_id and validity_end: # setup cache if requested
if exit_code == 0:
Expand Down Expand Up @@ -222,7 +223,12 @@ def check_time_left(proxyname: str, validity: int, limit: int) -> tuple[int, str
logger.info(f"cache: check {proxyname} validity: wanted={limit}h ({limit * 3600 - 20 * 60}s with grace) "
f"left={float(seconds_left) / 3600:.2f}h (now={tnow} validity={validity} left={seconds_left}s)")

if seconds_left < limit * 3600 - 20 * 60:
# special case for limit=72h (3 days) for pilot startup
if limit == 72 and seconds_left < limit * 3600 - 20 * 60:
diagnostics = f'proxy is too short for pilot startup: {float(seconds_left) / 3600:.2f}h'
logger.warning(diagnostics)
exit_code = errors.PROXYTOOSHORT
elif seconds_left < limit * 3600 - 20 * 60:
diagnostics = f'cert/proxy is about to expire: {float(seconds_left) / 3600:.2f}h'
logger.warning(diagnostics)
exit_code = errors.CERTIFICATEHASEXPIRED if proxyname == 'cert' else errors.VOMSPROXYABOUTTOEXPIRE
Expand Down
3 changes: 2 additions & 1 deletion pilot/user/generic/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
logger = logging.getLogger(__name__)


def verify_proxy(limit: int = None, x509: str = None, proxy_id: str = "pilot", test: bool = False) -> (int, str):
def verify_proxy(limit: int = None, x509: str = None, proxy_id: str = "pilot", test: bool = False, pilotstartup: bool = False) -> (int, str):
"""
Check for a valid voms/grid proxy longer than N hours.

Expand All @@ -37,6 +37,7 @@ def verify_proxy(limit: int = None, x509: str = None, proxy_id: str = "pilot", t
:param x509: points to the proxy file. If not set (=None) - get proxy file from X509_USER_PROXY environment (str)
:param proxy_id: proxy id (str)
:param test: free Boolean test parameter (bool)
:param pilotstartup: free Boolean pilotstartup parameter (bool)
:return: exit code (NOPROXY or NOVOMSPROXY), diagnostics (error diagnostics string) (int, str).
"""
if limit or x509 or proxy_id or test: # to bypass pylint score 0
Expand Down
Loading
Loading