diff --git a/PILOTVERSION b/PILOTVERSION index 27d4f6b57..42ab9ff9b 100644 --- a/PILOTVERSION +++ b/PILOTVERSION @@ -1 +1 @@ -3.1.0.63 \ No newline at end of file +3.1.1.10 \ No newline at end of file diff --git a/pilot.py b/pilot.py index f774aad07..e99ccfd4f 100755 --- a/pilot.py +++ b/pilot.py @@ -7,7 +7,7 @@ # Authors: # - Mario Lassnig, mario.lassnig@cern.ch, 2016-2017 # - Daniel Drizhuk, d.drizhuk@gmail.com, 2017 -# - Paul Nilsson, paul.nilsson@cern.ch, 2017-2021 +# - Paul Nilsson, paul.nilsson@cern.ch, 2017-2022 from __future__ import print_function # Python 2 (2to3 complains about this) from __future__ import absolute_import @@ -196,7 +196,7 @@ def get_args(): default=True, help='Disable server updates') - arg_parser.add_argument('-t', '--verifyproxy', + arg_parser.add_argument('-t', '--noproxyverification', dest='verify_proxy', action='store_false', default=True, diff --git a/pilot/api/analytics.py b/pilot/api/analytics.py index 8af4f4e12..34f1252e8 100644 --- a/pilot/api/analytics.py +++ b/pilot/api/analytics.py @@ -5,7 +5,7 @@ # http://www.apache.org/licenses/LICENSE-2.0 # # Authors: -# - Paul Nilsson, paul.nilsson@cern.ch, 2018-2021 +# - Paul Nilsson, paul.nilsson@cern.ch, 2018-2022 from .services import Services from pilot.common.exception import NotDefined, NotSameLength, UnknownException @@ -138,6 +138,13 @@ def get_fitted_data(self, filename, x_name='Time', y_name='pss+swap', precision= # remove tails if desired # this is useful e.g. for memory monitor data where the first and last values # represent allocation and de-allocation, ie not interesting + + itmet = False + if len(x) >= 100: + logger.debug('tails will not be removed for large data sample - iterative method will be used instead') + tails = True + itmet = True + if not tails and len(x) > 7 and len(y) > 7: logger.debug('removing tails from data to be fitted') x = x[5:] @@ -145,24 +152,94 @@ def get_fitted_data(self, filename, x_name='Time', y_name='pss+swap', precision= y = y[5:] y = y[:-2] - if (len(x) > 7 and len(y) > 7) and len(x) == len(y): + if not (len(x) > 7 and len(y) > 7) and len(x) == len(y): + logger.warning('wrong length of table data, x=%s, y=%s (must be same and length>=4)', str(x), str(y)) + else: logger.info('fitting %s vs %s', y_name, x_name) + + if itmet: + norg = len(x) + fit = self.fit(x, y) + _slope = self.slope() + _chi2_org = fit.chi2() + + # determine the removable right region ("right side limit") + _x = x + _y = y + right_limit = self.find_limit(_x, _y, _chi2_org, norg, edge="right") + + # determine the removable left region ("left side limit") + _x = x + _y = y + left_limit = self.find_limit(_x, _y, _chi2_org, norg, edge="left") + + # final fit adjusted for removable regions + x = x[left_limit:right_limit] + y = y[left_limit:right_limit] + try: fit = self.fit(x, y) _slope = self.slope() - except Exception as e: - logger.warning('failed to fit data, x=%s, y=%s: %s', str(x), str(y), e) + except Exception as exc: + logger.warning('failed to fit data, x=%s, y=%s: %s', str(x), str(y), exc) else: if _slope: slope = float_to_rounded_string(fit.slope(), precision=precision) chi2 = float_to_rounded_string(fit.chi2(), precision=precision) if slope != "": logger.info('current memory leak: %s B/s (using %d data points, chi2=%s)', slope, len(x), chi2) - else: - logger.warning('wrong length of table data, x=%s, y=%s (must be same and length>=4)', str(x), str(y)) return {"slope": slope, "chi2": chi2} + def find_limit(self, _x, _y, _chi2_org, norg, change_limit=0.25, edge="right", steps=5): + """ + Use an iterative approach to find the limits of the distributions that can be used for the final fit. + """ + + limit = 0 + _chi2_prev = _chi2_org + found = False + iterations = 0 + while len(_x) > 2 * norg / 3: + iterations += 1 + if edge == "right": + _x = _x[:-steps] + _y = _y[:-steps] + else: # left edge + _x = _x[steps:] + _y = _y[steps:] + try: + fit = self.fit(_x, _y) + except Exception as exc: + logger.warning(f'caught exception: {exc}') + break + + _chi2 = fit.chi2() + change = (_chi2_prev - _chi2) / _chi2_prev + logger.info(f'current chi2={_chi2} (change={change * 100} %)') + if change < change_limit: + found = True + break + else: + _chi2_prev = _chi2 + + if edge == "right": + if not found: + limit = norg - 1 + logger.warning('right removable region not found') + else: + limit = len(_x) - 1 + logger.info(f'right removable region: {limit}') + else: + if not found: + limit = 0 + logger.info('left removable region not found') + else: + limit = iterations * 10 + logger.info(f'left removable region: {limit}') + + return limit + def extract_from_table(self, table, x_name, y_name): """ diff --git a/pilot/control/payload.py b/pilot/control/payload.py index 243546375..878d62d47 100644 --- a/pilot/control/payload.py +++ b/pilot/control/payload.py @@ -348,8 +348,8 @@ def get_logging_info(realtimelogging, catchall, realtime_logname, realtime_loggi info_dic = {} if 'logging=' not in catchall or not realtimelogging: - logger.debug(f'catchall={catchall}') - logger.debug(f'realtimelogging={realtimelogging}') + #logger.debug(f'catchall={catchall}') + #logger.debug(f'realtimelogging={realtimelogging}') return {} # args handling @@ -424,7 +424,7 @@ def run_realtimelog(queues, traces, args): job.realtimelogging = True # testing - job.realtimelogging = True + # job.realtimelogging = True # reset info_dic if real-time logging is not wanted by the job if not job.realtimelogging: info_dic = None @@ -433,7 +433,7 @@ def run_realtimelog(queues, traces, args): job.infosys.queuedata.catchall, args.realtime_logname, args.realtime_logging_server) if not info_dic and job.realtimelogging else info_dic - logger.debug(f'info_dic={info_dic}') + # logger.debug(f'info_dic={info_dic}') if info_dic: args.use_realtime_logging = True realtime_logger = get_realtime_logger(args, info_dic) diff --git a/pilot/user/atlas/common.py b/pilot/user/atlas/common.py index 2e4e6271e..1a71734c6 100644 --- a/pilot/user/atlas/common.py +++ b/pilot/user/atlas/common.py @@ -1887,6 +1887,7 @@ def get_redundants(): "singularity/*", "/cores", "/work", + "*.part*", "docs/", "/pilot3"] diff --git a/pilot/user/atlas/loopingjob_definitions.py b/pilot/user/atlas/loopingjob_definitions.py index 21488a7cf..1a368b15b 100644 --- a/pilot/user/atlas/loopingjob_definitions.py +++ b/pilot/user/atlas/loopingjob_definitions.py @@ -5,7 +5,9 @@ # http://www.apache.org/licenses/LICENSE-2.0 # # Authors: -# - Paul Nilsson, paul.nilsson@cern.ch, 2018 +# - Paul Nilsson, paul.nilsson@cern.ch, 2018-2022 + +from os.path import join def allow_loopingjob_detection(): @@ -34,6 +36,7 @@ def remove_unwanted_files(workdir, files): _files = [] for _file in files: if not (workdir == _file or + _file == join(workdir, 'workDir') or "prmon" in _file or "/pilot" in _file or "/pandawnutil" in _file or diff --git a/pilot/user/atlas/proxy.py b/pilot/user/atlas/proxy.py index 2f5496aa6..2aec15b72 100644 --- a/pilot/user/atlas/proxy.py +++ b/pilot/user/atlas/proxy.py @@ -5,7 +5,7 @@ # http://www.apache.org/licenses/LICENSE-2.0 # # Authors: -# - Paul Nilsson, paul.nilsson@cern.ch, 2018 +# - Paul Nilsson, paul.nilsson@cern.ch, 2018-2022 # - Alexander Bogdanchikov, Alexander.Bogdanchikov@cern.ch, 2020 # from pilot.util.container import execute @@ -13,7 +13,7 @@ import os import logging -from pilot.user.atlas.setup import get_file_system_root_path +# from pilot.user.atlas.setup import get_file_system_root_path from pilot.util.container import execute from pilot.common.errorcodes import ErrorCodes from time import time @@ -44,11 +44,11 @@ def verify_proxy(limit=None, x509=None, proxy_id="pilot", test=False): else: envsetup = '' - envsetup += ". %s/atlas.cern.ch/repo/ATLASLocalRootBase/user/atlasLocalSetup.sh --quiet;" % get_file_system_root_path() - if os.environ.get('ALRB_noGridMW', '').lower() != "yes": - envsetup += "lsetup emi;" - else: - logger.warning('Skipping "lsetup emi" as ALRB_noGridMW=YES') + # envsetup += ". %s/atlas.cern.ch/repo/ATLASLocalRootBase/user/atlasLocalSetup.sh --quiet;" % get_file_system_root_path() + #if os.environ.get('ALRB_noGridMW', '').lower() != "yes": + # envsetup += "lsetup emi;" + #else: + # logger.warning('Skipping "lsetup emi" as ALRB_noGridMW=YES') # first try to use arcproxy since voms-proxy-info is not working properly on SL6 # (memory issues on queues with limited memory) diff --git a/pilot/util/constants.py b/pilot/util/constants.py index 34644997b..5591745d2 100644 --- a/pilot/util/constants.py +++ b/pilot/util/constants.py @@ -6,15 +6,15 @@ # # Authors: # - Mario Lassnig, mario.lassnig@cern.ch, 2017 -# - Paul Nilsson, paul.nilsson@cern.ch, 2018-2021 +# - Paul Nilsson, paul.nilsson@cern.ch, 2018-2022 from os import environ # Pilot version RELEASE = '3' # released number should be fixed at 3 for Pilot 3 VERSION = '1' # version number is '1' for first release, '0' until then, increased for bigger updates -REVISION = '0' # revision number should be reset to '0' for every new version release, increased for small updates -BUILD = '63' # build number should be reset to '1' for every new development cycle +REVISION = '1' # revision number should be reset to '0' for every new version release, increased for small updates +BUILD = '10' # build number should be reset to '1' for every new development cycle SUCCESS = 0 FAILURE = 1 diff --git a/pilot/util/math.py b/pilot/util/math.py index 33ec2e2d1..3449eecec 100644 --- a/pilot/util/math.py +++ b/pilot/util/math.py @@ -82,7 +82,7 @@ def chi2(observed, expected): if 0 in expected: return 0.0 - return sum((_o - _e) ** 2 / _e for _o, _e in zip(observed, expected)) + return sum((_o - _e) ** 2 / _e ** 2 for _o, _e in zip(observed, expected)) def float_to_rounded_string(num, precision=3): diff --git a/pilot/util/monitoring.py b/pilot/util/monitoring.py index 436672a16..057701b13 100644 --- a/pilot/util/monitoring.py +++ b/pilot/util/monitoring.py @@ -5,7 +5,7 @@ # http://www.apache.org/licenses/LICENSE-2.0 # # Authors: -# - Paul Nilsson, paul.nilsson@cern.ch, 2018-2021 +# - Paul Nilsson, paul.nilsson@cern.ch, 2018-2022 # This module contains implementations of job monitoring tasks @@ -631,7 +631,7 @@ def get_max_allowed_work_dir_size(): # grace margin, as discussed in https://its.cern.ch/jira/browse/ATLASPANDA-482 margin = 10.0 # percent, read later from somewhere maxwdirsize = int(maxwdirsize * (1 + margin / 100.0)) - logger.info(f"work directory size check will use {maxwdirsize} B as a max limit (10%% grace limit added)") + logger.info(f"work directory size check will use {maxwdirsize} B as a max limit (10% grace limit added)") return maxwdirsize @@ -692,6 +692,10 @@ def check_output_file_sizes(job): max_fsize = human2bytes(config.Pilot.maximum_output_file_size) if fsize and fsize < max_fsize: logger.info(f'output file {path} is within allowed size limit ({fsize} B < {max_fsize} B)') + elif fsize == 0: + exit_code = errors.EMPTYOUTPUTFILE + diagnostics = f'zero size output file detected: {path}' + logger.warning(diagnostics) else: exit_code = errors.OUTPUTFILETOOLARGE diagnostics = f'output file {path} is not within allowed size limit ({fsize} B > {max_fsize} B)' diff --git a/pilot/util/processes.py b/pilot/util/processes.py index a158d3f98..14342438a 100644 --- a/pilot/util/processes.py +++ b/pilot/util/processes.py @@ -5,7 +5,7 @@ # http://www.apache.org/licenses/LICENSE-2.0 # # Authors: -# - Paul Nilsson, paul.nilsson@cern.ch, 2018-2021 +# - Paul Nilsson, paul.nilsson@cern.ch, 2018-2022 import os import time @@ -523,9 +523,12 @@ def get_instant_cpu_consumption_time(pid): if pid and hz and hz > 0: path = "/proc/%d/stat" % pid if os.path.exists(path): - with open(path) as fp: - fields = fp.read().split(' ')[13:17] - utime, stime, cutime, cstime = [(float(f) / hz) for f in fields] + try: + with open(path) as fp: + fields = fp.read().split(' ')[13:17] + utime, stime, cutime, cstime = [(float(f) / hz) for f in fields] + except (FileNotFoundError, IOError) as exc: + logger.warning(f'exception caught: {exc} (ignored)') if utime and stime and cutime and cstime: # sum up all the user+system times for both the main process (pid) and the child processes diff --git a/pilot/util/realtimelogger.py b/pilot/util/realtimelogger.py index c6f569bea..4649294b8 100644 --- a/pilot/util/realtimelogger.py +++ b/pilot/util/realtimelogger.py @@ -107,18 +107,16 @@ def __init__(self, args, info_dic, level=INFO): server, port, timeout=5.0, - ssl_enable=False, + ssl_enable=True, ssl_verify=False, - user='pilot', + username='pilot', password='XXX' ) # Create the handler _handler = AsynchronousLogstashHandler( - host=server, - port=port, + server, + port, transport=transport, - ssl_enable=False, - ssl_verify=False, database_path='logstash_test.db') else: logger.warning(f'unknown logtype: {logtype}') @@ -137,7 +135,8 @@ def set_jobinfo(self, job): self.jobinfo = {"TaskID": job.taskid, "PandaJobID": job.jobid} if 'HARVESTER_WORKER_ID' in os.environ: self.jobinfo["Harvester_WorkerID"] = os.environ.get('HARVESTER_WORKER_ID') - logger.debug('set_jobinfo with PandaJobID=%s', self.jobinfo["PandaJobID"]) + if 'HARVESTER_ID' in os.environ: + self.jobinfo["Harvester_ID"] = os.environ.get('HARVESTER_ID') # prepend some panda job info # check if the msg is a dict-based object via isinstance(msg,dict),