Skip to content

Commit

Permalink
Merge pull request #13 from PanDAWMS/next
Browse files Browse the repository at this point in the history
3.1.1.10
  • Loading branch information
PalNilsson authored Jan 25, 2022
2 parents 9af04c2 + 52e104b commit f2ce9f3
Show file tree
Hide file tree
Showing 12 changed files with 125 additions and 38 deletions.
2 changes: 1 addition & 1 deletion PILOTVERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.1.0.63
3.1.1.10
4 changes: 2 additions & 2 deletions pilot.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
# Authors:
# - Mario Lassnig, [email protected], 2016-2017
# - Daniel Drizhuk, [email protected], 2017
# - Paul Nilsson, [email protected], 2017-2021
# - Paul Nilsson, [email protected], 2017-2022

from __future__ import print_function # Python 2 (2to3 complains about this)
from __future__ import absolute_import
Expand Down Expand Up @@ -196,7 +196,7 @@ def get_args():
default=True,
help='Disable server updates')

arg_parser.add_argument('-t', '--verifyproxy',
arg_parser.add_argument('-t', '--noproxyverification',
dest='verify_proxy',
action='store_false',
default=True,
Expand Down
89 changes: 83 additions & 6 deletions pilot/api/analytics.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
# http://www.apache.org/licenses/LICENSE-2.0
#
# Authors:
# - Paul Nilsson, [email protected], 2018-2021
# - Paul Nilsson, [email protected], 2018-2022

from .services import Services
from pilot.common.exception import NotDefined, NotSameLength, UnknownException
Expand Down Expand Up @@ -138,31 +138,108 @@ def get_fitted_data(self, filename, x_name='Time', y_name='pss+swap', precision=
# remove tails if desired
# this is useful e.g. for memory monitor data where the first and last values
# represent allocation and de-allocation, ie not interesting

itmet = False
if len(x) >= 100:
logger.debug('tails will not be removed for large data sample - iterative method will be used instead')
tails = True
itmet = True

if not tails and len(x) > 7 and len(y) > 7:
logger.debug('removing tails from data to be fitted')
x = x[5:]
x = x[:-2]
y = y[5:]
y = y[:-2]

if (len(x) > 7 and len(y) > 7) and len(x) == len(y):
if not (len(x) > 7 and len(y) > 7) and len(x) == len(y):
logger.warning('wrong length of table data, x=%s, y=%s (must be same and length>=4)', str(x), str(y))
else:
logger.info('fitting %s vs %s', y_name, x_name)

if itmet:
norg = len(x)
fit = self.fit(x, y)
_slope = self.slope()
_chi2_org = fit.chi2()

# determine the removable right region ("right side limit")
_x = x
_y = y
right_limit = self.find_limit(_x, _y, _chi2_org, norg, edge="right")

# determine the removable left region ("left side limit")
_x = x
_y = y
left_limit = self.find_limit(_x, _y, _chi2_org, norg, edge="left")

# final fit adjusted for removable regions
x = x[left_limit:right_limit]
y = y[left_limit:right_limit]

try:
fit = self.fit(x, y)
_slope = self.slope()
except Exception as e:
logger.warning('failed to fit data, x=%s, y=%s: %s', str(x), str(y), e)
except Exception as exc:
logger.warning('failed to fit data, x=%s, y=%s: %s', str(x), str(y), exc)
else:
if _slope:
slope = float_to_rounded_string(fit.slope(), precision=precision)
chi2 = float_to_rounded_string(fit.chi2(), precision=precision)
if slope != "":
logger.info('current memory leak: %s B/s (using %d data points, chi2=%s)', slope, len(x), chi2)
else:
logger.warning('wrong length of table data, x=%s, y=%s (must be same and length>=4)', str(x), str(y))

return {"slope": slope, "chi2": chi2}

def find_limit(self, _x, _y, _chi2_org, norg, change_limit=0.25, edge="right", steps=5):
"""
Use an iterative approach to find the limits of the distributions that can be used for the final fit.
"""

limit = 0
_chi2_prev = _chi2_org
found = False
iterations = 0
while len(_x) > 2 * norg / 3:
iterations += 1
if edge == "right":
_x = _x[:-steps]
_y = _y[:-steps]
else: # left edge
_x = _x[steps:]
_y = _y[steps:]
try:
fit = self.fit(_x, _y)
except Exception as exc:
logger.warning(f'caught exception: {exc}')
break

_chi2 = fit.chi2()
change = (_chi2_prev - _chi2) / _chi2_prev
logger.info(f'current chi2={_chi2} (change={change * 100} %)')
if change < change_limit:
found = True
break
else:
_chi2_prev = _chi2

if edge == "right":
if not found:
limit = norg - 1
logger.warning('right removable region not found')
else:
limit = len(_x) - 1
logger.info(f'right removable region: {limit}')
else:
if not found:
limit = 0
logger.info('left removable region not found')
else:
limit = iterations * 10
logger.info(f'left removable region: {limit}')

return limit

def extract_from_table(self, table, x_name, y_name):
"""
Expand Down
8 changes: 4 additions & 4 deletions pilot/control/payload.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,8 +348,8 @@ def get_logging_info(realtimelogging, catchall, realtime_logname, realtime_loggi
info_dic = {}

if 'logging=' not in catchall or not realtimelogging:
logger.debug(f'catchall={catchall}')
logger.debug(f'realtimelogging={realtimelogging}')
#logger.debug(f'catchall={catchall}')
#logger.debug(f'realtimelogging={realtimelogging}')
return {}

# args handling
Expand Down Expand Up @@ -424,7 +424,7 @@ def run_realtimelog(queues, traces, args):
job.realtimelogging = True

# testing
job.realtimelogging = True
# job.realtimelogging = True
# reset info_dic if real-time logging is not wanted by the job
if not job.realtimelogging:
info_dic = None
Expand All @@ -433,7 +433,7 @@ def run_realtimelog(queues, traces, args):
job.infosys.queuedata.catchall,
args.realtime_logname,
args.realtime_logging_server) if not info_dic and job.realtimelogging else info_dic
logger.debug(f'info_dic={info_dic}')
# logger.debug(f'info_dic={info_dic}')
if info_dic:
args.use_realtime_logging = True
realtime_logger = get_realtime_logger(args, info_dic)
Expand Down
1 change: 1 addition & 0 deletions pilot/user/atlas/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -1887,6 +1887,7 @@ def get_redundants():
"singularity/*",
"/cores",
"/work",
"*.part*",
"docs/",
"/pilot3"]

Expand Down
5 changes: 4 additions & 1 deletion pilot/user/atlas/loopingjob_definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
# http://www.apache.org/licenses/LICENSE-2.0
#
# Authors:
# - Paul Nilsson, [email protected], 2018
# - Paul Nilsson, [email protected], 2018-2022

from os.path import join


def allow_loopingjob_detection():
Expand Down Expand Up @@ -34,6 +36,7 @@ def remove_unwanted_files(workdir, files):
_files = []
for _file in files:
if not (workdir == _file or
_file == join(workdir, 'workDir') or
"prmon" in _file or
"/pilot" in _file or
"/pandawnutil" in _file or
Expand Down
14 changes: 7 additions & 7 deletions pilot/user/atlas/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@
# http://www.apache.org/licenses/LICENSE-2.0
#
# Authors:
# - Paul Nilsson, [email protected], 2018
# - Paul Nilsson, [email protected], 2018-2022
# - Alexander Bogdanchikov, [email protected], 2020

# from pilot.util.container import execute

import os
import logging

from pilot.user.atlas.setup import get_file_system_root_path
# from pilot.user.atlas.setup import get_file_system_root_path
from pilot.util.container import execute
from pilot.common.errorcodes import ErrorCodes
from time import time
Expand Down Expand Up @@ -44,11 +44,11 @@ def verify_proxy(limit=None, x509=None, proxy_id="pilot", test=False):
else:
envsetup = ''

envsetup += ". %s/atlas.cern.ch/repo/ATLASLocalRootBase/user/atlasLocalSetup.sh --quiet;" % get_file_system_root_path()
if os.environ.get('ALRB_noGridMW', '').lower() != "yes":
envsetup += "lsetup emi;"
else:
logger.warning('Skipping "lsetup emi" as ALRB_noGridMW=YES')
# envsetup += ". %s/atlas.cern.ch/repo/ATLASLocalRootBase/user/atlasLocalSetup.sh --quiet;" % get_file_system_root_path()
#if os.environ.get('ALRB_noGridMW', '').lower() != "yes":
# envsetup += "lsetup emi;"
#else:
# logger.warning('Skipping "lsetup emi" as ALRB_noGridMW=YES')

# first try to use arcproxy since voms-proxy-info is not working properly on SL6
# (memory issues on queues with limited memory)
Expand Down
6 changes: 3 additions & 3 deletions pilot/util/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@
#
# Authors:
# - Mario Lassnig, [email protected], 2017
# - Paul Nilsson, [email protected], 2018-2021
# - Paul Nilsson, [email protected], 2018-2022

from os import environ

# Pilot version
RELEASE = '3' # released number should be fixed at 3 for Pilot 3
VERSION = '1' # version number is '1' for first release, '0' until then, increased for bigger updates
REVISION = '0' # revision number should be reset to '0' for every new version release, increased for small updates
BUILD = '63' # build number should be reset to '1' for every new development cycle
REVISION = '1' # revision number should be reset to '0' for every new version release, increased for small updates
BUILD = '10' # build number should be reset to '1' for every new development cycle

SUCCESS = 0
FAILURE = 1
Expand Down
2 changes: 1 addition & 1 deletion pilot/util/math.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def chi2(observed, expected):
if 0 in expected:
return 0.0

return sum((_o - _e) ** 2 / _e for _o, _e in zip(observed, expected))
return sum((_o - _e) ** 2 / _e ** 2 for _o, _e in zip(observed, expected))


def float_to_rounded_string(num, precision=3):
Expand Down
8 changes: 6 additions & 2 deletions pilot/util/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
# http://www.apache.org/licenses/LICENSE-2.0
#
# Authors:
# - Paul Nilsson, [email protected], 2018-2021
# - Paul Nilsson, [email protected], 2018-2022

# This module contains implementations of job monitoring tasks

Expand Down Expand Up @@ -631,7 +631,7 @@ def get_max_allowed_work_dir_size():
# grace margin, as discussed in https://its.cern.ch/jira/browse/ATLASPANDA-482
margin = 10.0 # percent, read later from somewhere
maxwdirsize = int(maxwdirsize * (1 + margin / 100.0))
logger.info(f"work directory size check will use {maxwdirsize} B as a max limit (10%% grace limit added)")
logger.info(f"work directory size check will use {maxwdirsize} B as a max limit (10% grace limit added)")

return maxwdirsize

Expand Down Expand Up @@ -692,6 +692,10 @@ def check_output_file_sizes(job):
max_fsize = human2bytes(config.Pilot.maximum_output_file_size)
if fsize and fsize < max_fsize:
logger.info(f'output file {path} is within allowed size limit ({fsize} B < {max_fsize} B)')
elif fsize == 0:
exit_code = errors.EMPTYOUTPUTFILE
diagnostics = f'zero size output file detected: {path}'
logger.warning(diagnostics)
else:
exit_code = errors.OUTPUTFILETOOLARGE
diagnostics = f'output file {path} is not within allowed size limit ({fsize} B > {max_fsize} B)'
Expand Down
11 changes: 7 additions & 4 deletions pilot/util/processes.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
# http://www.apache.org/licenses/LICENSE-2.0
#
# Authors:
# - Paul Nilsson, [email protected], 2018-2021
# - Paul Nilsson, [email protected], 2018-2022

import os
import time
Expand Down Expand Up @@ -523,9 +523,12 @@ def get_instant_cpu_consumption_time(pid):
if pid and hz and hz > 0:
path = "/proc/%d/stat" % pid
if os.path.exists(path):
with open(path) as fp:
fields = fp.read().split(' ')[13:17]
utime, stime, cutime, cstime = [(float(f) / hz) for f in fields]
try:
with open(path) as fp:
fields = fp.read().split(' ')[13:17]
utime, stime, cutime, cstime = [(float(f) / hz) for f in fields]
except (FileNotFoundError, IOError) as exc:
logger.warning(f'exception caught: {exc} (ignored)')

if utime and stime and cutime and cstime:
# sum up all the user+system times for both the main process (pid) and the child processes
Expand Down
13 changes: 6 additions & 7 deletions pilot/util/realtimelogger.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,18 +107,16 @@ def __init__(self, args, info_dic, level=INFO):
server,
port,
timeout=5.0,
ssl_enable=False,
ssl_enable=True,
ssl_verify=False,
user='pilot',
username='pilot',
password='XXX'
)
# Create the handler
_handler = AsynchronousLogstashHandler(
host=server,
port=port,
server,
port,
transport=transport,
ssl_enable=False,
ssl_verify=False,
database_path='logstash_test.db')
else:
logger.warning(f'unknown logtype: {logtype}')
Expand All @@ -137,7 +135,8 @@ def set_jobinfo(self, job):
self.jobinfo = {"TaskID": job.taskid, "PandaJobID": job.jobid}
if 'HARVESTER_WORKER_ID' in os.environ:
self.jobinfo["Harvester_WorkerID"] = os.environ.get('HARVESTER_WORKER_ID')
logger.debug('set_jobinfo with PandaJobID=%s', self.jobinfo["PandaJobID"])
if 'HARVESTER_ID' in os.environ:
self.jobinfo["Harvester_ID"] = os.environ.get('HARVESTER_ID')

# prepend some panda job info
# check if the msg is a dict-based object via isinstance(msg,dict),
Expand Down

0 comments on commit f2ce9f3

Please sign in to comment.