diff --git a/PILOTVERSION b/PILOTVERSION index 6b5cb158d..7858b9c11 100644 --- a/PILOTVERSION +++ b/PILOTVERSION @@ -1 +1 @@ -3.5.0.31 \ No newline at end of file +3.5.1.17 \ No newline at end of file diff --git a/pilot/control/monitor.py b/pilot/control/monitor.py index 32a7ca24a..00de98a1b 100644 --- a/pilot/control/monitor.py +++ b/pilot/control/monitor.py @@ -23,6 +23,7 @@ from pilot.util.config import config from pilot.util.constants import MAX_KILL_WAIT_TIME # from pilot.util.container import execute +from pilot.util.features import MachineFeatures from pilot.util.queuehandling import get_queuedata_from_job, abort_jobs_in_queues from pilot.util.timing import get_time_since_start @@ -31,7 +32,7 @@ # Monitoring of threads functions -def control(queues, traces, args): +def control(queues, traces, args): # noqa: C901 """ Main control function, run from the relevant workflow module. @@ -50,6 +51,7 @@ def control(queues, traces, args): # for CPU usage debugging cpuchecktime = int(config.Pilot.cpu_check) tcpu = t_0 + last_minute_check = t_0 queuedata = get_queuedata_from_job(queues) max_running_time = get_max_running_time(args.lifetime, queuedata) @@ -59,7 +61,7 @@ def control(queues, traces, args): niter = 0 while not args.graceful_stop.is_set(): - # every seconds, run the monitoring checks + # every few seconds, run the monitoring checks if args.graceful_stop.wait(1) or args.graceful_stop.is_set(): logger.warning('aborting monitor loop since graceful_stop has been set') break @@ -83,6 +85,16 @@ def control(queues, traces, args): else: if niter % 60 == 0: logger.info(f'{time_since_start}s have passed since pilot start') + + # every minute run the following check + if time.time() - last_minute_check > 60: + reached_maxtime = run_shutdowntime_minute_check(time_since_start) + if reached_maxtime: + reached_maxtime_abort(args) + break + last_minute_check = time.time() + + # take a nap time.sleep(1) # time to check the CPU usage? @@ -116,6 +128,50 @@ def control(queues, traces, args): logger.info('[monitor] control thread has ended') +def run_shutdowntime_minute_check(time_since_start): + """ + Run checks on machine features shutdowntime once a minute. + + :param time_since_start: how many seconds have lapsed since the pilot started (int). + :return: True if reached max time, False it not (or if shutdowntime not known) (Boolean). + """ + + # check machine features if present for shutdowntime + machinefeatures = MachineFeatures().get() + if machinefeatures: + grace_time = 10 * 60 + try: + now = int(time.time()) + except (TypeError, ValueError) as exc: + logger.warning(f'failed to read current time: {exc}') + return False # will be ignored + + # ignore shutdowntime if not known + try: + shutdowntime = int(machinefeatures.get('shutdowntime')) + except (TypeError, ValueError) as exc: + logger.debug(f'failed to convert shutdowntime: {exc}') + return False # will be ignored + logger.debug(f'machinefeatures shutdowntime={shutdowntime} - now={now}') + if not shutdowntime: + logger.debug('ignoring shutdowntime since it is not set') + return False # will be ignored + + # ignore shutdowntime if in the past (= set before the pilot started) + if shutdowntime < (now - time_since_start): + logger.debug(f'shutdowntime ({shutdowntime}) was set before pilot started - ignore it ' + f'(now - time since start = {now - time_since_start})') + return False # will be ignored + + # did we pass, or are we close to the shutdowntime? + if now > shutdowntime - grace_time: + logger.fatal(f'now={now}s - shutdowntime ({shutdowntime}s) minus grace time ({grace_time}s) has been ' + f'exceeded - time to abort pilot') + return True + + return False + + def reached_maxtime_abort(args): """ Max time has been reached, set REACHED_MAXTIME and graceful_stop, close any ActiveMQ connections. diff --git a/pilot/copytool/gs.py b/pilot/copytool/gs.py index 505da28de..6f8a2e743 100644 --- a/pilot/copytool/gs.py +++ b/pilot/copytool/gs.py @@ -5,7 +5,7 @@ # http://www.apache.org/licenses/LICENSE-2.0 # # Authors: -# - Paul Nilsson, paul.nilsson@cern.ch, 2021 +# - Paul Nilsson, paul.nilsson@cern.ch, 2021-2023 # - Shuwei import os @@ -18,7 +18,10 @@ try: from google.cloud import storage except Exception: - pass + storage_client = storage.Client() +else: + storage_client = None + try: import pathlib # Python 3 except Exception: @@ -127,10 +130,9 @@ def download_file(path, surl, object_name=None): object_name = os.path.basename(path) try: - client = storage.Client() target = pathlib.Path(object_name) with target.open(mode="wb") as downloaded_file: - client.download_blob_to_file(surl, downloaded_file) + storage_client.download_blob_to_file(surl, downloaded_file) except Exception as error: diagnostics = 'exception caught in gs client: %s' % error logger.critical(diagnostics) @@ -234,8 +236,7 @@ def upload_file(file_name, bucket, object_name=None, content_type=None): # upload the file try: - client = storage.Client() - gs_bucket = client.get_bucket(bucket) + gs_bucket = storage_client.get_bucket(bucket) # remove any leading slash(es) in object_name object_name = object_name.lstrip('/') logger.info('uploading a file to bucket=%s in full path=%s in content_type=%s', bucket, object_name, content_type) diff --git a/pilot/user/atlas/common.py b/pilot/user/atlas/common.py index e24c27367..aa90c450f 100644 --- a/pilot/user/atlas/common.py +++ b/pilot/user/atlas/common.py @@ -214,7 +214,7 @@ def open_remote_files(indata, workdir, nthreads): cmd = create_root_container_command(workdir, _cmd) show_memory_usage() - timeout = len(indata) * 30 + 600 + timeout = get_timeout_for_remoteio(indata) logger.info('executing file open verification script (timeout=%d):\n\n\'%s\'\n\n', timeout, cmd) exitcode, stdout, stderr = execute(cmd, usecontainer=False, timeout=timeout) @@ -252,6 +252,18 @@ def open_remote_files(indata, workdir, nthreads): return exitcode, diagnostics, not_opened +def get_timeout_for_remoteio(indata): + """ + Calculate a proper timeout to be used for remote i/o files. + + :param indata: indata object. + :return: timeout in seconds (int). + """ + + remote_io = [fspec.status == 'remote_io' for fspec in indata] + return len(remote_io) * 30 + 600 + + def parse_remotefileverification_dictionary(workdir): """ Verify that all files could be remotely opened. diff --git a/pilot/user/atlas/jobmetrics.py b/pilot/user/atlas/jobmetrics.py index bfa208ead..26652f1a6 100644 --- a/pilot/user/atlas/jobmetrics.py +++ b/pilot/user/atlas/jobmetrics.py @@ -113,7 +113,7 @@ def add_sub_features(job_metrics, features_dic, add=[]): # correct hs06 for corecount: hs06*perf_scale/total_cpu*corecount hs06 = machinefeatures.get('hs06', 0) total_cpu = machinefeatures.get('total_cpu', 0) - if hs06 and total_cpu: + if hs06 and total_cpu and (total_cpu != '0' or total_cpu != 0): perf_scale = 1 try: machinefeatures_hs06 = 1.0 * int(float(hs06)) * perf_scale * corecount / (1.0 * int(float(total_cpu))) diff --git a/pilot/util/constants.py b/pilot/util/constants.py index 05d843991..76bf639dd 100644 --- a/pilot/util/constants.py +++ b/pilot/util/constants.py @@ -13,8 +13,8 @@ # Pilot version RELEASE = '3' # released number should be fixed at 3 for Pilot 3 VERSION = '5' # version number is '1' for first release, '0' until then, increased for bigger updates -REVISION = '0' # revision number should be reset to '0' for every new version release, increased for small updates -BUILD = '31' # build number should be reset to '1' for every new development cycle +REVISION = '1' # revision number should be reset to '0' for every new version release, increased for small updates +BUILD = '17' # build number should be reset to '1' for every new development cycle SUCCESS = 0 FAILURE = 1 diff --git a/pilot/util/features.py b/pilot/util/features.py index 8d5635cb3..a3fd9f9c7 100644 --- a/pilot/util/features.py +++ b/pilot/util/features.py @@ -60,8 +60,12 @@ def set(self, path, label): if path and os.path.exists(path): data_members = self.get_data_members() for member in data_members: + # ignore if file doesn't exist + filename = os.path.join(path, member) + if not os.path.exists(filename): + continue try: - value = read_file(os.path.join(path, member)) + value = read_file(filename) except FileHandlingFailure as exc: logger.warning(f'failed to process {member}: {exc}') value = None diff --git a/pilot/util/jobmetrics.py b/pilot/util/jobmetrics.py index 483a98380..a4f6be223 100644 --- a/pilot/util/jobmetrics.py +++ b/pilot/util/jobmetrics.py @@ -5,7 +5,7 @@ # http://www.apache.org/licenses/LICENSE-2.0 # # Authors: -# - Paul Nilsson, paul.nilsson@cern.ch, 2018-2022 +# - Paul Nilsson, paul.nilsson@cern.ch, 2018-2023 from os import environ @@ -16,7 +16,7 @@ def get_job_metrics_entry(name, value): """ Get a formatted job metrics entry. - Return a a job metrics substring with the format 'name=value ' (return empty entry if value is not set). + Return a job metrics substring with the format 'name=value ' (return empty entry if value is not set). :param name: job metrics parameter name (string). :param value: job metrics parameter value (string). diff --git a/pilot/util/timing.py b/pilot/util/timing.py index f73b95c8e..f0f2f25ee 100644 --- a/pilot/util/timing.py +++ b/pilot/util/timing.py @@ -272,7 +272,7 @@ def get_time_since(job_id, timing_constant, args): if time_measurement_dictionary: time_measurement = get_time_measurement(timing_constant, time_measurement_dictionary, args.timing) if time_measurement: - diff = time.time() - time_measurement + diff = int(time.time() - time_measurement) else: logger.warning(f'failed to extract time measurement dictionary from {args.timing}') else: diff --git a/pilot/util/workernode.py b/pilot/util/workernode.py index 87202c32a..a1a33696b 100644 --- a/pilot/util/workernode.py +++ b/pilot/util/workernode.py @@ -5,11 +5,12 @@ # http://www.apache.org/licenses/LICENSE-2.0 # # Authors: -# - Paul Nilsson, paul.nilsson@cern.ch, 2017-2022 +# - Paul Nilsson, paul.nilsson@cern.ch, 2017-2023 import os import re import logging +from shutil import which #from subprocess import getoutput @@ -253,6 +254,27 @@ def get_cpu_model(): return modelstring +def lscpu(): + """ + Execute lscpu command. + + :return: exit code (int), stdout (string). + """ + + cmd = 'lscpu' + if not which(cmd): + logger.warning('command={cmd} does not exist - cannot check number of available cores') + return 1, "" + + ec, stdout, _ = execute(cmd) + if isinstance(stdout, bytes): + stdout = stdout.decode("utf-8") + + logger.debug(f'lscpu:\n{stdout}') + + return ec, stdout + + def get_cpu_cores(modelstring): """ Get core count from /proc/cpuinfo and update modelstring (CPU model). @@ -263,29 +285,51 @@ def get_cpu_cores(modelstring): """ number_of_cores = 0 - re_cores = re.compile(r'^cpu cores\s+:\s+(\d+)') - with open("/proc/cpuinfo", "r") as _fp: + ec, stdout = lscpu() + if ec: + return modelstring - # loop over all lines in cpuinfo - for line in _fp.readlines(): + cores_per_socket = 0 + sockets = 0 + for line in stdout.split('\n'): - # try to grab core count from current line - cores = re_cores.search(line) - if cores: - # found core count - try: - number_of_cores += int(cores.group(1)) - except Exception: - pass - - if number_of_cores > 0 and '-Core' not in modelstring: - if 'Core Processor' in modelstring: - modelstring = modelstring.replace('Core', '%d-Core' % number_of_cores) - elif 'Processor' in modelstring: - modelstring = modelstring.replace('Processor', '%d-Core Processor' % number_of_cores) - else: - modelstring += ' %d-Core Processor' + try: + pattern = r'Core\(s\)\ per\ socket\:\ +(\d+)' + _cores = re.findall(pattern, line) + if _cores: + cores_per_socket = int(_cores[0]) + continue + except Exception as exc: + logger.warning(f'exception caught: {exc}') + + try: + pattern = r'Socket\(s\)\:\ +(\d+)' + _sockets = re.findall(pattern, line) + if _sockets: + sockets = int(_sockets[0]) + break + except Exception as exc: + logger.warning(f'exception caught: {exc}') + + if cores_per_socket and sockets: + number_of_cores = cores_per_socket * sockets + logger.info(f'found {number_of_cores} cores ({cores_per_socket} cores per socket, {sockets} sockets)') + + logger.debug(f'current model string: {modelstring}') + if number_of_cores > 0 and '-Core' not in modelstring: + if '-Core Processor' in modelstring: # NN-Core info already in string - update it + pattern = r'(\d+)\-Core Processor' + _nn = re.findall(pattern, modelstring) + if _nn: + modelstring = modelstring.replace(f'{_nn[0]}-Core', f'{number_of_cores}-Core') + if 'Core Processor' in modelstring: + modelstring = modelstring.replace('Core', '%d-Core' % number_of_cores) + elif 'Processor' in modelstring: + modelstring = modelstring.replace('Processor', '%d-Core Processor' % number_of_cores) + else: + modelstring += ' %d-Core Processor' % number_of_cores + logger.debug(f'updated model string: {modelstring}') return modelstring diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 000000000..c5cd10174 --- /dev/null +++ b/setup.cfg @@ -0,0 +1,3 @@ +# config file for pypi +[metadata] +description-file = README.md diff --git a/setup.py b/setup.py index e857fc654..fac7c5feb 100644 --- a/setup.py +++ b/setup.py @@ -6,7 +6,7 @@ # # Authors: # - Fernando Barreiro Megino, fernando.harald.barreiro.megino@cern.ch, 2019 -# - Paul Nilsson, paul.nilsson@cern.ch, 2019-2021 +# - Paul Nilsson, paul.nilsson@cern.ch, 2019-2023 import sys from setuptools import setup, find_packages @@ -26,10 +26,11 @@ author='PanDA Team', author_email='atlas-adc-panda@cern.ch', url='https://github.com/PanDAWMS/pilot3/wiki', - python_requires='>=2.7', + python_requires='>=3.6', packages=find_packages(), install_requires=[], data_files=[], + package_data={'': ['PILOTVERSION']}, include_package_data=True, - scripts=['pilot.py'] + scripts=[] ) \ No newline at end of file