diff --git a/PILOTVERSION b/PILOTVERSION index 5afe5cc8d..92014e3dc 100644 --- a/PILOTVERSION +++ b/PILOTVERSION @@ -1 +1 @@ -3.2.2.22 \ No newline at end of file +3.2.3.27 \ No newline at end of file diff --git a/pilot.py b/pilot.py index 0b5995c59..0f0b25a13 100755 --- a/pilot.py +++ b/pilot.py @@ -248,6 +248,10 @@ def get_args(): dest='queuedata_url', default='', help='Queuedata server URL') + arg_parser.add_argument('--storagedata-url', + dest='storagedata_url', + default='', + help='URL for downloading DDM end points data') # Country group arg_parser.add_argument('--country-group', @@ -458,6 +462,7 @@ def set_environment_variables(): # keep track of the server urls environ['PANDA_SERVER_URL'] = get_panda_server(args.url, args.port) environ['QUEUEDATA_SERVER_URL'] = '%s' % args.queuedata_url + environ['STORAGEDATA_SERVER_URL'] = '%s' % args.storagedata_url def wrap_up(): diff --git a/pilot/control/job.py b/pilot/control/job.py index b3c55fe28..d1011d1da 100644 --- a/pilot/control/job.py +++ b/pilot/control/job.py @@ -477,6 +477,12 @@ def handle_backchannel_command(res, job, args, test_tobekilled=False): logger.info('faking a \'tobekilled\' command') res['command'] = 'tobekilled' + if 'pilotSecrets' in res: + try: + job.pilotsecrets = res.get('pilotSecrets') + except Exception as exc: + logger.warning(f'failed to parse pilotSecrets: {exc}') + if 'command' in res and res.get('command') != 'NULL': # warning: server might return comma-separated string, 'debug,tobekilled' cmd = res.get('command') @@ -1463,6 +1469,8 @@ def getjob_server_command(url, port): url = 'https://' + url logger.warning('detected missing protocol in server url (added)') + # randomize server name + url = https.get_panda_server(url, port) return '{pandaserver}/server/panda/getJob'.format(pandaserver=url) diff --git a/pilot/control/payload.py b/pilot/control/payload.py index 6f4b569c7..e5ed4f230 100644 --- a/pilot/control/payload.py +++ b/pilot/control/payload.py @@ -22,7 +22,7 @@ from pilot.util.auxiliary import set_pilot_state from pilot.util.processes import get_cpu_consumption_time from pilot.util.config import config -from pilot.util.filehandling import read_file, remove_core_dumps, get_guid, extract_lines_from_file +from pilot.util.filehandling import read_file, remove_core_dumps, get_guid, extract_lines_from_file, find_file from pilot.util.processes import threads_aborted from pilot.util.queuehandling import put_in_queue from pilot.common.errorcodes import ErrorCodes @@ -321,33 +321,32 @@ def get_transport(catchall): return transport -def get_logging_info(realtimelogging, catchall, realtime_logname, realtime_logging_server): +def get_logging_info(job, args): """ Extract the logging type/protocol/url/port from catchall if present, or from args fields. Returns a dictionary with the format: {'logging_type': .., 'protocol': .., 'url': .., 'port': .., 'logname': ..} + If the provided debug_command contains a tail instruction ('tail log_file_name'), the pilot will locate + the log file and use that for RT logging (full path). + Note: the returned dictionary can be built with either args (has priority) or catchall info. - :param realtimelogging: True if real-time logging was activated by server/job definition (Boolean). - :param catchall: PQ.catchall field (string). - :param realtime_logname from pilot args: (string). - :param realtime_logging_server from pilot args: (string). + :param job: job object. + :param args: args object. :return: info dictionary (logging_type (string), protocol (string), url (string), port (int)). """ info_dic = {} - if 'logging=' not in catchall and not realtimelogging: - #logger.debug(f'catchall={catchall}') - #logger.debug(f'realtimelogging={realtimelogging}') + if 'logging=' not in job.infosys.queuedata.catchall and not job.realtimelogging: return {} # args handling - info_dic['logname'] = realtime_logname if realtime_logname else "pilot-log" - logserver = realtime_logging_server if realtime_logging_server else "" + info_dic['logname'] = args.realtime_logname if args.realtime_logname else "pilot-log" + logserver = args.realtime_logging_server if args.realtime_logging_server else "" pattern = r'logging\=(\S+)\;(\S+)\:\/\/(\S+)\:(\d+)' - info = findall(pattern, catchall) + info = findall(pattern, job.infosys.queuedata.catchall) if not logserver and not info: logger.warning('not enough info available for activating real-time logging') @@ -363,10 +362,26 @@ def get_logging_info(realtimelogging, catchall, realtime_logname, realtime_loggi print(f'exception caught: {exc}') info_dic = {} else: - # experiment specific, move to relevant code - - # ATLAS (testing; get info from debug parameter later) - info_dic['logfiles'] = [config.Payload.payloadstdout] + path = None + if 'tail' in job.debug_command: + filename = job.debug_command.split(' ')[-1] + logger.debug(f'filename={filename}') + counter = 0 + path = None + maxwait = 5 * 60 + while counter < maxwait and not args.graceful_stop.is_set(): + path = find_file(filename, job.workdir) + if not path: + logger.debug(f'file {filename} not found, waiting for max {maxwait} s') + time.sleep(10) + else: + break + counter += 10 + if not path: + logger.warning(f'file {filename} was not found for {maxwait} s, using default') + logf = path if path else config.Payload.payloadstdout + logger.info(f'using {logf} for real-time logging') + info_dic['logfiles'] = [logf] else: items = logserver.split(':') info_dic['logging_type'] = items[0].lower() @@ -414,14 +429,15 @@ def run_realtimelog(queues, traces, args): except queue.Empty: continue - # wait with proceeding until the job is running, or max 5 * 60 s - counter = 0 - while counter < 5 * 60 and not args.graceful_stop.is_set(): + # wait with proceeding until the job is running + while not args.graceful_stop.is_set(): if job.state == 'running': logger.debug('job is running, time to start real-time logger [if needed]') break + if job.state == 'stageout' or job.state == 'failed': + logger.debug(f'job is in state {job.state}, continue to next job') + continue time.sleep(1) - counter += 1 if args.use_realtime_logging: # always do real-time logging @@ -431,8 +447,8 @@ def run_realtimelog(queues, traces, args): logger.debug(f'debug={job.debug}') logger.debug(f'debug_command={job.debug_command}') logger.debug(f'args.use_realtime_logging={args.use_realtime_logging}') - if job.debug and (not job.debug_command or job.debug_command == 'debug') and not args.use_realtime_logging: - logger.info('turning on real-time logging since debug flag is true and debug_command is not set') + if job.debug and (not job.debug_command or job.debug_command == 'debug' or 'tail' in job.debug_command) and not args.use_realtime_logging: + logger.info('turning on real-time logging') job.realtimelogging = True # testing @@ -441,10 +457,7 @@ def run_realtimelog(queues, traces, args): if not job.realtimelogging: info_dic = None # only set info_dic once per job (the info will not change) - info_dic = get_logging_info(job.realtimelogging, - job.infosys.queuedata.catchall, - args.realtime_logname, - args.realtime_logging_server) if not info_dic and job.realtimelogging else info_dic + info_dic = get_logging_info(job, args) if not info_dic and job.realtimelogging else info_dic logger.debug(f'info_dic={info_dic}') if info_dic: args.use_realtime_logging = True diff --git a/pilot/info/extinfo.py b/pilot/info/extinfo.py index 89db5629a..8a1b72418 100644 --- a/pilot/info/extinfo.py +++ b/pilot/info/extinfo.py @@ -5,7 +5,7 @@ # # Authors: # - Alexey Anisenkov, anisyonk@cern.ch, 2018-2021 -# - Paul Nilsson, paul.nilsson@cern.ch, 2018-2019 +# - Paul Nilsson, paul.nilsson@cern.ch, 2018-2022 """ Information provider from external source(s) @@ -128,8 +128,7 @@ def jsonparser_panda(c): return {pandaqueue: dat} queuedata_url = (os.environ.get('QUEUEDATA_SERVER_URL') or getattr(config.Information, 'queuedata_url', '')).format(**{'pandaqueue': pandaqueues[0]}) - - cric_url = getattr(config.Information, 'queues_url', None) or 'https://atlas-cric.cern.ch/cache/schedconfig/{pandaqueue}.json' + cric_url = getattr(config.Information, 'queues_url', None) cric_url = cric_url.format(pandaqueue=pandaqueues[0] if len(pandaqueues) == 1 else 'pandaqueues') cvmfs_path = self.get_cvmfs_path(getattr(config.Information, 'queuedata_cvmfs', None), 'cric_pandaqueues.json') @@ -178,11 +177,13 @@ def load_storage_data(self, ddmendpoints=[], priority=[], cache_time=60): cache_dir = os.environ.get('PILOT_HOME', '.') # list of sources to fetch ddmconf data from + _storagedata_url = os.environ.get('QUEUEDATA_SERVER_URL', '') + storagedata_url = _storagedata_url if _storagedata_url else getattr(config.Information, 'storages_url', None) cvmfs_path = self.get_cvmfs_path(config.Information.storages_cvmfs, 'cric_ddmendpoints.json') sources = {'CVMFS': {'url': cvmfs_path, 'nretry': 1, 'fname': os.path.join(cache_dir, getattr(config.Information, 'storages_cache', None) or 'agis_ddmendpoints.json')}, - 'CRIC': {'url': (getattr(config.Information, 'storages_url', None) or 'https://atlas-cric.cern.ch/cache/ddmendpoints.json'), + 'CRIC': {'url': storagedata_url, 'nretry': 3, 'sleep_time': lambda: 15 + random.randint(0, 30), ## max sleep time 45 seconds between retries 'cache_time': 3 * 60 * 60, # 3 hours diff --git a/pilot/user/atlas/common.py b/pilot/user/atlas/common.py index c396e62cd..3f3fc8c3b 100644 --- a/pilot/user/atlas/common.py +++ b/pilot/user/atlas/common.py @@ -1905,7 +1905,7 @@ def get_redundants(): "home", "o..pacman..o", "pacman-*", - "python", + "python*", "runAthena*", "share", "sources.*", @@ -1940,16 +1940,22 @@ def get_redundants(): "pandawnutil/*", "src/*", "singularity_cachedir", + "apptainer_cachedir", "_joproxy15", "HAHM_*", "Process", "merged_lhef._0.events-new", "panda_secrets.json", "singularity/*", + "apptainer/*", "/cores", + "/panda_pilot*", "/work", + "README*", + "MANIFEST*", "*.part*", "docs/", + "/venv/", "/pilot3"] return dir_list diff --git a/pilot/user/atlas/cpu.py b/pilot/user/atlas/cpu.py index 3633ec6da..411b97412 100644 --- a/pilot/user/atlas/cpu.py +++ b/pilot/user/atlas/cpu.py @@ -12,7 +12,7 @@ # from .utilities import get_memory_values from pilot.util.container import execute - +from .utilities import get_memory_values logger = logging.getLogger(__name__) @@ -60,11 +60,11 @@ def add_core_count(corecount, core_counts=[]): return core_counts -def set_core_counts(job): +def set_core_counts(**kwargs): """ Set the number of used cores. - :param job: job object. + :param kwargs: kwargs (dictionary). :return: """ @@ -86,12 +86,32 @@ def set_core_counts(job): #else: # logger.debug('no summary_dictionary') - if job.pgrp: - # for debugging - #cmd = "ps axo pgid,psr,comm,args | grep %d" % job.pgrp - #exit_code, stdout, stderr = execute(cmd, mute=True) - #logger.debug('%s:\n%s\n', cmd, stdout) + job = kwargs.get('job', None) + walltime = kwargs.get('walltime', None) + + if job and walltime: + summary_dictionary = get_memory_values(job.workdir, name=job.memorymonitor) + if summary_dictionary: + time_dictionary = summary_dictionary.get('Time', None) + if time_dictionary: + stime = time_dictionary.get('stime', None) + utime = time_dictionary.get('utime', None) + if stime and utime: + logger.debug(f'stime={stime}') + logger.debug(f'utime={utime}') + logger.debug(f'walltime={walltime}') + cores = float(stime + utime) / float(walltime) + logger.debug(f'number of cores={cores}') + else: + logger.debug('no stime/utime') + else: + logger.debug('no time dictionary') + else: + logger.debug('no summary dictionary') + else: + logger.debug(f'failed to calculate number of cores (walltime={walltime})') + if job and job.pgrp: # ps axo pgid,psr -> 154628 8 \n 154628 9 \n 1546280 1 .. # sort is redundant; uniq removes any duplicate lines; wc -l gives the final count # awk is added to get the pgrp list only and then grep -x makes sure that false positives are removed, e.g. 1546280 diff --git a/pilot/user/atlas/diagnose.py b/pilot/user/atlas/diagnose.py index 5df666a3a..9472dc648 100644 --- a/pilot/user/atlas/diagnose.py +++ b/pilot/user/atlas/diagnose.py @@ -610,7 +610,7 @@ def process_job_report(job): job.piloterrordiag = diagnostics else: # extract Frontier errors - errmsg = get_more_details(job.metadata) + errmsg = get_frontier_details(job.metadata) if errmsg: msg = f'Frontier error: {errmsg}' job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.FRONTIER, msg=msg) @@ -634,7 +634,7 @@ def process_job_report(job): job.piloterrordiag = diagnostics -def get_more_details(job_report_dictionary): +def get_frontier_details(job_report_dictionary): """ Extract special Frontier related errors from the job report. @@ -642,7 +642,12 @@ def get_more_details(job_report_dictionary): :return: extracted error message (string). """ - error_details = job_report_dictionary['executor'][0]['logfileReport']['details'] + try: + error_details = job_report_dictionary['executor'][0]['logfileReport']['details'] + except KeyError as exc: + logger.warning(f'key error: {exc} (ignore detailed Frontier analysis)') + return "" + patterns = {'abnormalLines': r'Cannot\sfind\sa\svalid\sfrontier\sconnection(.*)', 'lastNormalLine': r'Using\sfrontier\sconnection\sfrontier(.*)'} errmsg = '' diff --git a/pilot/user/atlas/utilities.py b/pilot/user/atlas/utilities.py index 564b543cb..dd61397ff 100644 --- a/pilot/user/atlas/utilities.py +++ b/pilot/user/atlas/utilities.py @@ -127,57 +127,6 @@ def get_memory_monitor_setup(pid, pgrp, jobid, workdir, command, setup="", use_c return cmd, pid -def get_memory_monitor_setup_old(pid, pgrp, jobid, workdir, command, setup="", use_container=True, transformation="", outdata=None, dump_ps=False): - """ - Return the proper setup for the memory monitor. - If the payload release is provided, the memory monitor can be setup with the same release. Until early 2018, the - memory monitor was still located in the release area. After many problems with the memory monitor, it was decided - to use a fixed version for the setup. Currently, release 21.0.22 is used. - - :param pid: job process id (int). - :param pgrp: process group id (int). - :param jobid: job id (int). - :param workdir: job work directory (string). - :param command: payload command (string). - :param setup: optional setup in case asetup can not be used, which uses infosys (string). - :param use_container: optional boolean. - :param transformation: optional name of transformation, e.g. Sim_tf.py (string). - :param outdata: optional list of output fspec objects (list). - :param dump_ps: should ps output be dumped when identifying prmon process? (Boolean). - :return: job work directory (string), pid for process inside container (int). - """ - - # try to get the pid from a pid.txt file which might be created by a container_script - pid = get_proper_pid(pid, pgrp, jobid, command=command, transformation=transformation, outdata=outdata, use_container=use_container, dump_ps=dump_ps) - if pid == -1: - logger.warning('process id was not identified before payload finished - will not launch memory monitor') - return "", pid - - release = "22.0.1" - platform = "x86_64-centos7-gcc8-opt" - if not setup: - setup = get_asetup() + " Athena," + release + " --platform " + platform - interval = 60 - if not setup.endswith(';'): - setup += ';' - # Decide which version of the memory monitor should be used - cmd = "%swhich prmon" % setup - exit_code, stdout, stderr = execute(cmd) - if stdout and "Command not found" not in stdout: - _cmd = "prmon " - else: - logger.warning('failed to find prmon, defaulting to old memory monitor: %d, %s' % (exit_code, stderr)) - _cmd = "MemoryMonitor " - setup = setup.replace(release, "21.0.22") - setup = setup.replace(platform, "x86_64-slc6-gcc62-opt") - - options = "--pid %d --filename %s --json-summary %s --interval %d" %\ - (pid, get_memory_monitor_output_filename(), get_memory_monitor_summary_filename(), interval) - _cmd = "cd " + workdir + ";" + setup + _cmd + options - - return _cmd, pid - - def get_proper_pid(pid, pgrp, jobid, command="", transformation="", outdata="", use_container=True, dump_ps=False): """ Return a pid from the proper source to be used with the memory monitor. @@ -607,7 +556,7 @@ def get_average_summary_dictionary_prmon(path): if dictionary: # Calculate averages and store all values - summary_dictionary = {"Max": {}, "Avg": {}, "Other": {}} + summary_dictionary = {"Max": {}, "Avg": {}, "Other": {}, "Time": {}} def filter_value(value): """ Inline function used to remove any string or None values from data. """ @@ -632,11 +581,16 @@ def filter_value(value): # add the last of the rchar, .., values keys = ['rchar', 'wchar', 'read_bytes', 'write_bytes', 'nprocs'] + time_keys = ['stime', 'utime'] + keys = keys + time_keys # warning: should read_bytes/write_bytes be reported as rbytes/wbytes? for key in keys: value = get_last_value(dictionary.get(key, None)) if value: - summary_dictionary["Other"][key] = value + if key in time_keys: + summary_dictionary["Time"][key] = value + else: + summary_dictionary["Other"][key] = value return summary_dictionary diff --git a/pilot/user/generic/cpu.py b/pilot/user/generic/cpu.py index 1f92fab38..b42ce0d87 100644 --- a/pilot/user/generic/cpu.py +++ b/pilot/user/generic/cpu.py @@ -36,15 +36,16 @@ def add_core_count(corecount, core_counts=[]): return core_counts.append(corecount) -def set_core_counts(job): +def set_core_counts(**kwargs): """ Set the number of used cores. - :param job: job object. + :param kwargs: kwargs (dictionary). :return: """ - if job.pgrp: + job = kwargs.get('job', None) + if job and job.pgrp: cmd = "ps axo pgid,psr | sort | grep %d | uniq | awk '{print $1}' | grep -x %d | wc -l" % (job.pgrp, job.pgrp) exit_code, stdout, stderr = execute(cmd, mute=True) logger.debug('%s: %s' % (cmd, stdout)) diff --git a/pilot/user/rubin/cpu.py b/pilot/user/rubin/cpu.py index 38e211936..ec09d786f 100644 --- a/pilot/user/rubin/cpu.py +++ b/pilot/user/rubin/cpu.py @@ -36,15 +36,16 @@ def add_core_count(corecount, core_counts=[]): return core_counts.append(corecount) -def set_core_counts(job): +def set_core_counts(**kwargs): """ Set the number of used cores. - :param job: job object. + :param kwargs: kwargs (dictionary). :return: """ - if job.pgrp: + job = kwargs.get('job', None) + if job and job.pgrp: cmd = "ps axo pgid,psr | sort | grep %d | uniq | awk '{print $1}' | grep -x %d | wc -l" % (job.pgrp, job.pgrp) exit_code, stdout, stderr = execute(cmd, mute=True) logger.debug('%s: %s' % (cmd, stdout)) diff --git a/pilot/user/sphenix/cpu.py b/pilot/user/sphenix/cpu.py index 3f16cc109..c0d3a3354 100644 --- a/pilot/user/sphenix/cpu.py +++ b/pilot/user/sphenix/cpu.py @@ -36,15 +36,16 @@ def add_core_count(corecount, core_counts=[]): return core_counts.append(corecount) -def set_core_counts(job): +def set_core_counts(**kwargs): """ Set the number of used cores. - :param job: job object. + :param kwargs: kwargs (dictionary). :return: """ - if job.pgrp: + job = kwargs.get('job', None) + if job and job.pgrp: cmd = "ps axo pgid,psr | sort | grep %d | uniq | awk '{print $1}' | grep -x %d | wc -l" % (job.pgrp, job.pgrp) exit_code, stdout, stderr = execute(cmd, mute=True) logger.debug('%s: %s' % (cmd, stdout)) diff --git a/pilot/util/constants.py b/pilot/util/constants.py index c6d72915c..fd87f2bcc 100644 --- a/pilot/util/constants.py +++ b/pilot/util/constants.py @@ -13,8 +13,8 @@ # Pilot version RELEASE = '3' # released number should be fixed at 3 for Pilot 3 VERSION = '2' # version number is '1' for first release, '0' until then, increased for bigger updates -REVISION = '2' # revision number should be reset to '0' for every new version release, increased for small updates -BUILD = '22' # build number should be reset to '1' for every new development cycle +REVISION = '3' # revision number should be reset to '0' for every new version release, increased for small updates +BUILD = '27' # build number should be reset to '1' for every new development cycle SUCCESS = 0 FAILURE = 1 diff --git a/pilot/util/filehandling.py b/pilot/util/filehandling.py index 675887323..443cfcec3 100644 --- a/pilot/util/filehandling.py +++ b/pilot/util/filehandling.py @@ -20,6 +20,7 @@ from glob import glob from json import load, JSONDecodeError from json import dump as dumpjson +from pathlib import Path from shutil import copy2, rmtree from zlib import adler32 from functools import partial @@ -1169,3 +1170,22 @@ def extract_lines_from_file(pattern, filename): logger.warning(f'exception caught opening file: {exc}') return _lines + + +def find_file(filename, startdir): + """ + Locate a file in a subdirectory to the given start directory. + + :param filename: file name (string). + :param startdir: start directory for search (string). + :return: full path (string). + """ + + logger.debug(f'looking for {filename} in start dir {startdir}') + _path = None + for path in Path(startdir).rglob(filename): + logger.debug(f'located file at: {path}') + _path = path.as_posix() + break + + return _path diff --git a/pilot/util/https.py b/pilot/util/https.py index 498304fad..4b86eeac9 100644 --- a/pilot/util/https.py +++ b/pilot/util/https.py @@ -388,7 +388,14 @@ def send_update(update_function, data, url, port, job=None): if job: txt += f' for job {job.jobid}' logger.info(txt) + # hide sensitive info + pilotsecrets = '' + if res and 'pilotSecrets' in res: + pilotsecrets = res['pilotSecrets'] + res['pilotSecrets'] = '********' logger.info(f'server responded with: res = {res}') + if pilotsecrets: + res['pilotSecrets'] = pilotsecrets attempt += 1 return res diff --git a/pilot/util/monitoring.py b/pilot/util/monitoring.py index 4e921df95..637696eed 100644 --- a/pilot/util/monitoring.py +++ b/pilot/util/monitoring.py @@ -18,13 +18,14 @@ from pilot.common.exception import PilotException from pilot.util.auxiliary import set_pilot_state, show_memory_usage from pilot.util.config import config +from pilot.util.constants import PILOT_PRE_PAYLOAD from pilot.util.container import execute from pilot.util.filehandling import get_disk_usage, remove_files, get_local_file_size, read_file from pilot.util.loopingjob import looping_job from pilot.util.math import convert_mb_to_b, human2bytes from pilot.util.parameters import convert_to_int, get_maximum_input_sizes from pilot.util.processes import get_current_cpu_consumption_time, kill_processes, get_number_of_child_processes -from pilot.util.timing import get_time_since_start +from pilot.util.timing import get_time_since from pilot.util.workernode import get_local_disk_space, check_hz import logging @@ -67,7 +68,8 @@ def job_monitor_tasks(job, mt, args): logger.info(f'CPU consumption time for pid={job.pid}: {cpuconsumptiontime} (rounded to {job.cpuconsumptiontime})') # check how many cores the payload is using - set_number_used_cores(job) + time_since_start = get_time_since(job.jobid, PILOT_PRE_PAYLOAD, args) # payload walltime + set_number_used_cores(job, time_since_start) # check memory usage (optional) for jobs in running state exit_code, diagnostics = verify_memory_usage(current_time, mt, job) @@ -170,18 +172,24 @@ def get_exception_error_code(diagnostics): return exit_code -def set_number_used_cores(job): +def set_number_used_cores(job, walltime): """ Set the number of cores used by the payload. The number of actual used cores is reported with job metrics (if set). + The walltime can be used to estimate the number of used cores in combination with memory monitor output, + (utime+stime)/walltime. If memory momitor information is not available, a ps command is used (not reliable for + multi-core jobs). :param job: job object. + :param walltime: wall time for payload in seconds (int). :return: """ pilot_user = os.environ.get('PILOT_USER', 'generic').lower() cpu = __import__('pilot.user.%s.cpu' % pilot_user, globals(), locals(), [pilot_user], 0) # Python 2/3 - cpu.set_core_counts(job) + + kwargs = {'job': job, 'walltime': walltime} + cpu.set_core_counts(**kwargs) def verify_memory_usage(current_time, mt, job): @@ -292,11 +300,11 @@ def verify_looping_job(current_time, mt, job, args): logger.debug('looping check not desired') return 0, "" - time_since_start = int(get_time_since_start(args)) + time_since_start = get_time_since(job.jobid, PILOT_PRE_PAYLOAD, args) # payload walltime looping_verification_time = convert_to_int(config.Pilot.looping_verification_time, default=600) if time_since_start < looping_verification_time: - logger.debug(f'no point in running looping job algorithm since time_since_start={time_since_start} s < ' - f'looping_verification_time={looping_verification_time} s') + logger.debug(f'no point in running looping job algorithm since time since last payload start={time_since_start} s < ' + f'looping verification time={looping_verification_time} s') return 0, "" if current_time - mt.get('ct_looping') > looping_verification_time: diff --git a/pilot/util/realtimelogger.py b/pilot/util/realtimelogger.py index c495d7f41..00c9a5715 100644 --- a/pilot/util/realtimelogger.py +++ b/pilot/util/realtimelogger.py @@ -129,7 +129,11 @@ def __init__(self, args, info_dic, workdir, secrets, level=INFO): # cert=(crt, key) #) + #logger.debug("{secrets.get('logstash_login', 'unknown_login')}:{secrets.get('logstash_password', 'unknown_password'}") # login+password method: + if isinstance(secrets, str): + secrets = json.loads(secrets) + transport = HttpTransport( server, port, @@ -178,7 +182,7 @@ def send_with_jobinfo(self, msg): logobj.update(msg) except Exception: logobj["message"] = msg - # logger.debug(f'message: {msg}') + self.info(logobj) def add_logfiles(self, job_or_filenames, reset=True): @@ -235,7 +239,7 @@ def sending_logs(self, args, job): self.openfiles[logfile] = openfile logger.debug(f'opened logfile: {logfile}') - # logger.debug(f'real-time logging: sending logs for state={job.state} [1]') + logger.debug(f'real-time logging: sending logs for state={job.state} [1]') self.send_loginfiles() elif job.state == 'stagein' or job.state == 'stageout': logger.debug('no real-time logging during stage-in/out') diff --git a/pilot/util/transport.py b/pilot/util/transport.py index 1a56c4e1d..9f4790189 100644 --- a/pilot/util/transport.py +++ b/pilot/util/transport.py @@ -12,9 +12,9 @@ import socket import ssl -from requests.auth import HTTPBasicAuth -import requests try: + from requests.auth import HTTPBasicAuth + import requests import pylogbeat from logstash_async.utils import ichunked except ImportError: @@ -365,7 +365,12 @@ def send(self, events: list, **kwargs): :param events: A list of events :type events: list """ - self.__session = requests.Session() + try: + self.__session = requests.Session() + except Exception: + logger.warning('no requests module') + return + #print(self._cert) for batch in self.__batches(events): if self._use_logging: