diff --git a/PILOTVERSION b/PILOTVERSION index 4b3244f5..e88f7664 100644 --- a/PILOTVERSION +++ b/PILOTVERSION @@ -1 +1 @@ -3.8.1.66 \ No newline at end of file +3.8.2.8 \ No newline at end of file diff --git a/pilot/api/data.py b/pilot/api/data.py index 3305b47f..93cc76f1 100644 --- a/pilot/api/data.py +++ b/pilot/api/data.py @@ -87,8 +87,7 @@ def __init__(self, default_copytools: str = 'rucio', trace_report: dict = None, ipv: str = 'IPv6', - workdir: str = "", - altstageout: str = None): + workdir: str = ""): """ Set default/init values. @@ -113,7 +112,6 @@ def __init__(self, self.infosys = infosys_instance or infosys self.ipv = ipv self.workdir = workdir - self.altstageout = altstageout if isinstance(acopytools, str): acopytools = {'default': [acopytools]} if acopytools else {} @@ -133,7 +131,7 @@ def __init__(self, self.trace_report = trace_report if trace_report else TraceReport(pq=os.environ.get('PILOT_SITENAME', ''), ipv=self.ipv, workdir=self.workdir) if not self.acopytools: - msg = f'failed to initilize StagingClient: no acopytools options found, acopytools={self.acopytools}' + msg = f'failed to initialize StagingClient: no acopytools options found, acopytools={self.acopytools}' logger.error(msg) self.trace_report.update(clientState='BAD_COPYTOOL', stateReason=msg) self.trace_report.send() @@ -522,7 +520,7 @@ def transfer_files(self, copytool: Any, files: list, activity: list, **kwargs: d """ raise NotImplementedError() - def transfer(self, files: list, activity: list or str = 'default', **kwargs: dict) -> list: # noqa: C901 + def transfer(self, files: list, activity: list or str = 'default', raise_exception: bool = True, **kwargs: dict) -> list: # noqa: C901 """ Perform file transfer. @@ -530,8 +528,9 @@ def transfer(self, files: list, activity: list or str = 'default', **kwargs: dic :param files: list of `FileSpec` objects (list) :param activity: list of activity names used to determine appropriate copytool (list or str) + :param raise_exception: boolean flag used to ignore transfer errors :param kwargs: extra kwargs to be passed to copytool transfer handler (dict) - :raise: PilotException in case of controlled error + :raise: PilotException in case of controlled error if `raise_exception` is `True` :return: list of processed `FileSpec` objects (list). """ self.trace_report.update(relativeStart=time.time(), transferStart=time.time()) @@ -548,7 +547,8 @@ def transfer(self, files: list, activity: list or str = 'default', **kwargs: dic break if not copytools: - raise PilotException(f'failed to resolve copytool by preferred activities={activity}, acopytools={self.acopytools}') + raise PilotException(f'failed to resolve copytool by preferred activities={activity}, acopytools={self.acopytools}', + code=ErrorCodes.UNKNOWNCOPYTOOL) # populate inputddms if needed self.prepare_inputddms(files) @@ -633,7 +633,8 @@ def transfer(self, files: list, activity: list or str = 'default', **kwargs: dic code = ErrorCodes.STAGEINFAILED if self.mode == 'stage-in' else ErrorCodes.STAGEOUTFAILED # is it stage-in/out? details = str(caught_errors) + ":" + f'failed to transfer files using copytools={copytools}' self.logger.fatal(details) - raise PilotException(details, code=code) + if raise_exception: + raise PilotException(details, code=code) return files @@ -1109,20 +1110,28 @@ def prepare_destinations(self, files: list, activities: list or str) -> list: # take the fist choice for now, extend the logic later if need ddm = storages[0] + ddm_alt = storages[1] if len(storages) > 1 else None self.logger.info(f"[prepare_destinations][{activity}]: allowed (local) destinations: {storages}") - self.logger.info(f"[prepare_destinations][{activity}]: resolved default destination ddm={ddm}") + self.logger.info(f"[prepare_destinations][{activity}]: resolved default destination: ddm={ddm}, ddm_alt={ddm_alt}") for e in files: if not e.ddmendpoint: # no preferences => use default destination self.logger.info("[prepare_destinations][%s]: fspec.ddmendpoint is not set for lfn=%s" - " .. will use default ddm=%s as (local) destination", activity, e.lfn, ddm) + " .. will use default ddm=%s as (local) destination; ddm_alt=%s", activity, e.lfn, ddm, ddm_alt) e.ddmendpoint = ddm + e.ddmendpoint_alt = ddm_alt elif e.ddmendpoint not in storages: # fspec.ddmendpoint is not in associated storages => assume it as final (non local) alternative destination self.logger.info("[prepare_destinations][%s]: Requested fspec.ddmendpoint=%s is not in the list of allowed (local) destinations" " .. will consider default ddm=%s for transfer and tag %s as alt. location", activity, e.ddmendpoint, ddm, e.ddmendpoint) + e.ddmendpoint_alt = e.ddmendpoint # verify me e.ddmendpoint = ddm - e.ddmendpoint_alt = e.ddmendpoint # consider me later + else: # set corresponding ddmendpoint_alt if exist (next entry in available storages list) + cur = storages.index(e.ddmendpoint) + ddm_next = storages[cur + 1] if (cur + 1) < len(storages) else storages[0] # cycle storages, take the first elem when reach end + e.ddmendpoint_alt = ddm_next if e.ddmendpoint != ddm_next else None + self.logger.info("[prepare_destinations][%s]: set ddmendpoint_alt=%s for fspec.ddmendpoint=%s", + activity, e.ddmendpoint_alt, e.ddmendpoint) return files diff --git a/pilot/control/data.py b/pilot/control/data.py index 3c76a9b9..28db36a3 100644 --- a/pilot/control/data.py +++ b/pilot/control/data.py @@ -285,11 +285,13 @@ def _stage_in(args: object, job: JobData) -> bool: logger.info('stage-in will not be done in a container') client, activity = get_stagein_client(job, args, label) + logger.info(f'activity={activity}') use_pcache = job.infosys.queuedata.use_pcache - + logger.debug(f'use_pcache={use_pcache}') # get the proper input file destination (normally job.workdir unless stager workflow) jobworkdir = job.workdir # there is a distinction for mv copy tool on ND vs non-ATLAS workdir = get_proper_input_destination(job.workdir, args.input_destination_dir) + logger.debug(f'workdir={workdir}') kwargs = {'workdir': workdir, 'cwd': job.workdir, 'usecontainer': False, @@ -301,7 +303,9 @@ def _stage_in(args: object, job: JobData) -> bool: 'rucio_host': args.rucio_host, 'jobworkdir': jobworkdir, 'args': args} + logger.debug(f'kwargs={kwargs}') client.prepare_sources(job.indata) + logger.info('prepared sources - will now transfer files') client.transfer(job.indata, activity=activity, **kwargs) except PilotException as error: error_msg = traceback.format_exc() @@ -909,14 +913,33 @@ def _do_stageout(job: JobData, args: object, xdata: list, activity: list, title: # create the trace report trace_report = create_trace_report(job, label=label) - client = StageOutClient(job.infosys, logger=logger, trace_report=trace_report, ipv=ipv, workdir=job.workdir, altstageout=job.altstageout) + client = StageOutClient(job.infosys, logger=logger, trace_report=trace_report, ipv=ipv, workdir=job.workdir) kwargs = {'workdir': job.workdir, 'cwd': job.workdir, 'usecontainer': False, 'job': job, 'output_dir': args.output_dir, 'catchall': job.infosys.queuedata.catchall, 'rucio_host': args.rucio_host} #, mode='stage-out') + is_unified = job.infosys.queuedata.type == 'unified' # prod analy unification: use destination preferences from PanDA server for unified queues - if job.infosys.queuedata.type != 'unified': + if not is_unified: client.prepare_destinations(xdata, activity) ## FIX ME LATER: split activities: for astorages and for copytools (to unify with ES workflow) - client.transfer(xdata, activity, **kwargs) + + altstageout = not is_unified and job.allow_altstageout() # do not use alt stage-out for unified queues + client.transfer(xdata, activity, raise_exception=not altstageout, **kwargs) + remain_files = [entry for entry in xdata if entry.require_transfer()] + # check if alt stageout can be applied (all remain files must have alt storage declared ddmendpoint_alt) + has_altstorage = all(entry.ddmendpoint_alt and entry.ddmendpoint != entry.ddmendpoint_alt for entry in remain_files) + + logger.info('alt stage-out settings: %s, is_unified=%s, altstageout=%s, remain_files=%s, has_altstorage=%s', + activity, is_unified, altstageout, len(remain_files), has_altstorage) + + if altstageout and remain_files and has_altstorage: # apply alternative stageout for failed transfers + for entry in remain_files: + entry.ddmendpoint = entry.ddmendpoint_alt + entry.ddmendpoint_alt = None + entry.is_altstaged = True + + logger.info('alt stage-out will be applied for remain=%s files (previously failed)', len(remain_files)) + client.transfer(xdata, activity, **kwargs) + except PilotException as error: error_msg = traceback.format_exc() logger.error(error_msg) @@ -936,14 +959,10 @@ def _do_stageout(job: JobData, args: object, xdata: list, activity: list, title: logger.info(f'switched back proxy on unified dispatch queue: X509_USER_PROXY={x509_org} (reset X509_UNIFIED_DISPATCH)') logger.info('summary of transferred files:') - for iofile in xdata: - if not iofile.status: - status = "(not transferred)" - else: - status = iofile.status - logger.info(" -- lfn=%s, status_code=%s, status=%s", iofile.lfn, iofile.status_code, status) + for entry in xdata: + logger.info(" -- lfn=%s, status_code=%s, status=%s", entry.lfn, entry.status_code, entry.status or "(not transferred)") - remain_files = [iofile for iofile in xdata if iofile.status not in ['transferred']] + remain_files = [entry for entry in xdata if entry.status not in ['transferred']] return not remain_files @@ -1059,12 +1078,18 @@ def generate_fileinfo(job: JobData) -> dict: """ fileinfo = {} checksum_type = config.File.checksum_type if config.File.checksum_type == 'adler32' else 'md5sum' - for iofile in job.outdata + job.logdata: - if iofile.status in {'transferred'}: - fileinfo[iofile.lfn] = {'guid': iofile.guid, - 'fsize': iofile.filesize, - f'{checksum_type}': iofile.checksum.get(config.File.checksum_type), - 'surl': iofile.turl} + for entry in job.outdata + job.logdata: + if entry.status in {'transferred'}: + dat = { + 'guid': entry.guid, + 'fsize': entry.filesize, + f'{checksum_type}': entry.checksum.get(config.File.checksum_type), + 'surl': entry.turl + } + if entry.is_altstaged: + dat['ddmendpoint'] = entry.ddmendpoint + + fileinfo[entry.lfn] = dat return fileinfo diff --git a/pilot/control/job.py b/pilot/control/job.py index d01d9b45..f9b80c25 100644 --- a/pilot/control/job.py +++ b/pilot/control/job.py @@ -2365,7 +2365,7 @@ def create_job(dispatcher_response: dict, queuename: str) -> Any: job = JobData(dispatcher_response) jobinfosys = InfoService() jobinfosys.init(queuename, infosys.confinfo, infosys.extinfo, JobInfoProvider(job)) - job.init(infosys) + job.init(jobinfosys) logger.info(f'received job: {job.jobid} (sleep until the job has finished)') diff --git a/pilot/control/payload.py b/pilot/control/payload.py index 25adce15..7d091073 100644 --- a/pilot/control/payload.py +++ b/pilot/control/payload.py @@ -376,13 +376,21 @@ def extract_error_info(error: str) -> (int, str): return error_code, diagnostics -def get_rtlogging() -> str: +def get_rtlogging(catchall: str) -> str: """ - Return the proper rtlogging value from the experiment specific plug-in or the config file. + Return the proper rtlogging value from PQ.catchall, the experiment specific plug-in or the config file. + :param catchall: catchall field from queuedata (str) :return: rtlogging (str). """ + if catchall: + _rtlogging = findall(r'logging=([^,]+)', catchall) + if _rtlogging and ";" in _rtlogging[0]: + logger.info(f"found rtlogging in catchall: {_rtlogging[0]}") + return _rtlogging[0] + rtlogging = None + pilot_user = os.environ.get('PILOT_USER', 'generic').lower() try: user = __import__(f'pilot.user.{pilot_user}.common', globals(), locals(), [pilot_user], 0) @@ -419,7 +427,13 @@ def get_logging_info(job: JobData, args: object) -> dict: info_dic['logname'] = args.realtime_logname if args.realtime_logname else "pilot-log" logserver = args.realtime_logging_server if args.realtime_logging_server else "" - info = findall(r'(\S+)\;(\S+)\:\/\/(\S+)\:(\d+)', get_rtlogging()) + try: + catchall = job.infosys.queuedata.catchall + except Exception as exc: + logger.warning(f'exception caught: {exc}') + catchall = "" + + info = findall(r'(\S+)\;(\S+)\:\/\/(\S+)\:(\d+)', get_rtlogging(catchall)) if not logserver and not info: logger.warning(f"not enough info available for activating real-time logging (info='{info}', logserver='{logserver}')") return {} diff --git a/pilot/info/filespec.py b/pilot/info/filespec.py index d7e8e906..ef4700ed 100644 --- a/pilot/info/filespec.py +++ b/pilot/info/filespec.py @@ -77,6 +77,7 @@ class FileSpec(BaseData): is_tar = False # whether it's a tar file or not ddm_activity = None # DDM activity names (e.g. [read_lan, read_wan]) which should be used to resolve appropriate protocols from StorageData.arprotocols checkinputsize = True + is_altstaged = None # indicates if file was transferred using alternative method (altstageout) # specify the type of attributes for proper data validation and casting _keys = {int: ['filesize', 'mtime', 'status_code'], @@ -216,3 +217,10 @@ def get_storage_id_and_path_convention(self) -> (str, str): logger.info(f'storage_id: {storage_id}, path_convention: {path_convention}') return storage_id, path_convention + + def require_transfer(self) -> bool: + """ + Check if File needs to be transferred (in error state or never has been started) + """ + + return self.status not in ['remote_io', 'transferred', 'no_transfer'] diff --git a/pilot/info/jobdata.py b/pilot/info/jobdata.py index f936dbbf..2cc149be 100644 --- a/pilot/info/jobdata.py +++ b/pilot/info/jobdata.py @@ -169,7 +169,7 @@ class JobData(BaseData): dask_scheduler_ip = '' # enhanced job definition for Dask jobs jupyter_session_ip = '' # enhanced job definition for Dask jobs minramcount = 0 # minimum number of RAM required by the payload - altstageout = None # alternative stage-out method, on, off, force + altstageout = None # alternative stage-out method: boolean, on (True), off (False) # home package string with additional payload release information; does not need to be added to # the conversion function since it's already lower case homepackage = "" # home package for TRF @@ -575,6 +575,26 @@ def has_remoteio(self) -> bool: """ return any(fspec.status == 'remote_io' for fspec in self.indata) + def allow_altstageout(self): + """ + Resolve if alternative stageout is allowed for given job taking into account `queuedata` settings as well. + `queuedata` specific settings overwrites job preferences for altstageout. + + :return: boolean. + """ + + # consider first the queue specific settings (if any) + if self.infosys and self.infosys.queuedata: + qval = self.infosys.queuedata.altstageout + if qval is not None: + return qval + else: + logger.info('job.infosys.queuedata is not initialized: PandaQueue specific settings for altstageout will not be considered') + + # apply additional job specific checks here if need + + return bool(self.altstageout) + def clean(self): """ Validate and finally clean up required data values (object properties) if needed. @@ -628,6 +648,19 @@ def clean__platform(self, raw: Any, value: str) -> str: return v + def clean__altstageout(self, raw: Any, value: str) -> Any: + """ + Verify and validate value for the altstageout key. + + :param raw: raw value (Any) + :param value: parsed value (str) + :return: cleaned value (bool or None). + """ + if value == 'on': + return True + if value == 'off': + return False + def clean__jobparams(self, raw: Any, value: str) -> str: """ Verify and validate value for the jobparams key. diff --git a/pilot/info/queuedata.py b/pilot/info/queuedata.py index 3e3ee1b7..5508c260 100644 --- a/pilot/info/queuedata.py +++ b/pilot/info/queuedata.py @@ -96,6 +96,8 @@ class QueueData(BaseData): is_cvmfs = True # has cvmfs installed memkillgrace = 100 # memory kill grace value in percentage + altstageout = None # allow altstageout: force (True) or disable (False) or no preferences (None) + # specify the type of attributes for proper data validation and casting _keys = {int: ['timefloor', 'maxwdir', 'pledgedcpu', 'es_stageout_gap', 'corecount', 'maxrss', 'maxtime', 'maxinputsize', 'memkillgrace'], @@ -171,6 +173,16 @@ def resolve_allowed_schemas(self, activity: str or list, copytool: str = None) - return adat.get(copytool) or [] + def allow_altstageout(self): + """ + Resolve if alternative stageout should be forced (True) or disabled (False); None value means no preferences defined. + :return: boolean or None + """ + + val = self.params.get('allow_altstageout', None) + if val is not None: # cast to bool + return bool(val) + def clean(self): """Validate and finally clean up required data values (required object properties) if needed.""" # validate es_stageout_gap value @@ -197,6 +209,9 @@ def clean(self): self.container_options = self.container_options.replace(" --contain", ",${workdir} --contain") logger.info(f"note: added missing $workdir to container_options: {self.container_options}") + # set altstageout settings + self.altstageout = self.allow_altstageout() + ## custom function pattern to apply extra validation to the key values ##def clean__keyname(self, raw, value): ## :param raw: raw value passed from ext source as input diff --git a/pilot/user/atlas/utilities.py b/pilot/user/atlas/utilities.py index c73df92a..27d91384 100644 --- a/pilot/user/atlas/utilities.py +++ b/pilot/user/atlas/utilities.py @@ -41,7 +41,10 @@ ) from pilot.util.parameters import convert_to_int from pilot.util.processes import is_process_running -from pilot.util.psutils import get_command_by_pid +from pilot.util.psutils import ( + get_command_by_pid, + find_process_by_jobid +) from .setup import get_asetup @@ -158,11 +161,9 @@ def get_proper_pid(pid: int, jobid: str, use_container: bool = True) -> int: if not is_process_running(pid): return -1 - ps = get_ps_info() - - # lookup the process id using ps aux + # lookup the process id using ps command or psutils logger.debug(f'attempting to identify pid from job id ({jobid})') - _pid = get_pid_for_jobid(ps, jobid) + _pid = get_pid_for_jobid(jobid) if _pid: logger.debug(f'discovered pid={_pid} for job id {jobid}') cmd = get_command_by_pid(_pid) @@ -188,6 +189,8 @@ def get_ps_info(whoami: str = None, options: str = 'axfo pid,user,args') -> str: """ Return ps info for the given user. + Note: this is a fallback solution in case the pid cannot be found in the psutils lookup. + :param whoami: username (str) :param options: ps options (str) :return: ps aux for given user (str). @@ -200,16 +203,19 @@ def get_ps_info(whoami: str = None, options: str = 'axfo pid,user,args') -> str: return stdout -def get_pid_for_jobid(ps: str, jobid: str) -> int or None: +def get_pid_for_jobid(jobid: str) -> int or None: """ Return the process id for the ps entry that contains the job id. - :param ps: ps command output (str) :param jobid: PanDA job id (str). :return: pid (int) or None if no such process (int or None). """ - pid = None + pid = find_process_by_jobid(jobid) + if pid: + return pid + # fallback to ps command + ps = get_ps_info() for line in ps.split('\n'): if jobid in line and 'xrootd' not in line: # extract pid diff --git a/pilot/util/constants.py b/pilot/util/constants.py index 0a4003b3..e497600c 100644 --- a/pilot/util/constants.py +++ b/pilot/util/constants.py @@ -27,8 +27,8 @@ # Pilot version RELEASE = '3' # released number should be fixed at 3 for Pilot 3 VERSION = '8' # version number is '1' for first release, '0' until then, increased for bigger updates -REVISION = '1' # revision number should be reset to '0' for every new version release, increased for small updates -BUILD = '66' # build number should be reset to '1' for every new development cycle +REVISION = '2' # revision number should be reset to '0' for every new version release, increased for small updates +BUILD = '8' # build number should be reset to '1' for every new development cycle SUCCESS = 0 FAILURE = 1 diff --git a/pilot/util/https.py b/pilot/util/https.py index 9a2da0d2..a26c75ce 100644 --- a/pilot/util/https.py +++ b/pilot/util/https.py @@ -646,7 +646,7 @@ def get_panda_server(url: str, port: int, update_server: bool = True) -> str: if default in pandaserver: try: rnd = random.choice([socket.getfqdn(vv) for vv in set([v[-1][0] for v in socket.getaddrinfo(default, 25443, socket.AF_INET)])]) - except socket.herror as exc: + except (socket.herror, socket.gaierror) as exc: logger.warning(f'failed to get address from socket: {exc} - will use default server ({pandaserver})') else: pandaserver = pandaserver.replace(default, rnd) diff --git a/pilot/util/networking.py b/pilot/util/networking.py index 5e03368d..d081b784 100644 --- a/pilot/util/networking.py +++ b/pilot/util/networking.py @@ -35,7 +35,7 @@ def dump_ipv6_info() -> None: """Dump the IPv6 info to the log.""" cmd = 'ifconfig' if not is_command_available(cmd): - _cmd = '/usr/sbin/ifconfig' + _cmd = '/usr/sbin/ifconfig -a' if not is_command_available(_cmd): logger.warning(f'command {cmd} is not available - this WN might not support IPv6') return @@ -43,15 +43,35 @@ def dump_ipv6_info() -> None: _, stdout, stderr = execute(cmd, timeout=10) if stdout: - ipv6 = extract_ipv6(stdout) + ipv6 = extract_ipv6_addresses(stdout) if ipv6: logger.info(f'IPv6 addresses: {ipv6}') else: - logger.warning('no IPv6 addresses found - this WN does not support IPv6') + logger.warning('no IPv6 addresses were found') else: logger.warning(f'failed to run ifconfig: {stderr}') +def extract_ipv6_addresses(ifconfig_output: str) -> list: + """Extracts IPv6 addresses from ifconfig output. + + Args: + ifconfig_output: The output of the ifconfig command. + + Returns: + A list of IPv6 addresses. + """ + + ipv6_addresses = [] + for line in ifconfig_output.splitlines(): + line = line.strip().replace("\t", " ").replace("\r", "").replace("\n", "") + match = re.search(r"inet6 (.*?)\s", line) + if match and match.group(1) != "::1": # skip loopback address + ipv6_addresses.append(match.group(1)) + + return ipv6_addresses + + def extract_ipv6(ifconfig: str) -> str: """ Extract the IPv6 address from the ifconfig output. diff --git a/pilot/util/psutils.py b/pilot/util/psutils.py index f9b606e2..eb70f263 100644 --- a/pilot/util/psutils.py +++ b/pilot/util/psutils.py @@ -267,3 +267,27 @@ def get_command_by_pid(pid: int) -> str or None: except psutil.NoSuchProcess: logger.warning(f"process with PID {pid} not found") return None + + +def find_process_by_jobid(jobid: int) -> int or None: + """ + Find the process ID of a process whose command arguments contain the given job ID. + + :param jobid: the job ID to search for (int) + :return: the process ID of the matching process, or None if no match is found (int or None). + """ + if not _is_psutil_available: + logger.warning('find_process_by_jobid(): psutil not available - aborting') + return None + + for proc in psutil.process_iter(): + try: + cmd_line = proc.cmdline() + except psutil.NoSuchProcess: + continue + + for arg in cmd_line: + if str(jobid) in arg and 'xrootd' not in arg: + return proc.pid + + return None diff --git a/pilot/util/realtimelogger.py b/pilot/util/realtimelogger.py index d06984fd..df085a29 100644 --- a/pilot/util/realtimelogger.py +++ b/pilot/util/realtimelogger.py @@ -121,7 +121,7 @@ def __init__(self, args: Any, info_dic: dict, workdir: str, secrets: str, level: if workdir: # bypass pylint warning - keep workdir for possible future development pass if not info_dic: - logger.warning('info dictionary not set - add \'logging=type:protocol://host:port\' to PQ.catchall)') + logger.warning('info dictionary not set - add \'logging=type;protocol://host:port\' to PQ.catchall)') RealTimeLogger.glogger = None return