diff --git a/.github/workflows/flake8-workflow.yml b/.github/workflows/flake8-workflow.yml new file mode 100644 index 00000000..a4aa5278 --- /dev/null +++ b/.github/workflows/flake8-workflow.yml @@ -0,0 +1,44 @@ +# This workflow will install Python dependencies, run tests and lint with a variety of Python versions +# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-python + +name: flake8-workflow + +on: + push: + branches: [ "master", "next" ] + pull_request: + branches: [ "master", "next" ] + +jobs: + build: + + runs-on: ubuntu-latest + continue-on-error: true + strategy: + fail-fast: false + matrix: + python-version: ["3.8", "3.9", "3.10", "3.11"] + env: + FLAKE8_VERSION: "==6.1.0" + FLAKE8_CONFIG: ".flake8" + steps: + - uses: actions/checkout@v3 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v3 + with: + python-version: ${{ matrix.python-version }} + architecture: x64 + - name: Install dependencies + run: | + python -m pip install --upgrade pip + python -m pip install "flake8${{ env.FLAKE8_VERSION }}" 'pep8-naming' 'flake8-blind-except' + if [ -f requirements.txt ]; then pip install -r requirements.txt; fi + - name: Setup env + run: | + pwd + ls -lah + pip freeze + flake8 --version + if [[ ${{ env.FLAKE8_CONFIG }} != ".flake8" ]]; then rm .flake8; fi + - name: Flake8 + run: flake8 --config ${{ env.FLAKE8_CONFIG}} pilot.py pilot/ diff --git a/.github/workflows/unit-tests.yml b/.github/workflows/unit-tests.yml index 33d99290..88d5d932 100644 --- a/.github/workflows/unit-tests.yml +++ b/.github/workflows/unit-tests.yml @@ -12,40 +12,15 @@ jobs: strategy: matrix: python-version: ['3.8', '3.9', '3.10', '3.11'] - env: - FLAKE8_VERSION: "==3.8.4" - FLAKE8_CONFIG: ".flake8" steps: - name: Checkout Pilot3 repo uses: actions/checkout@v3 - # - name: Hack me some python - # run: | - # Hack to get setup-python to work on act - #if [ ! -f "/etc/lsb-release" ] ; then - # echo "DISTRIB_RELEASE=18.04" > /etc/lsb-release - # fi - - name: Setup Python3 uses: actions/setup-python@v4 with: python-version: ${{ matrix.python-version }} architecture: x64 - - name: Pip install - run: pip install "flake8${{ env.FLAKE8_VERSION }}" 'pep8-naming' 'flake8-blind-except' - - - name: Setup env - run: | - pwd - ls -lah - pip freeze - flake8 --version - if [[ ${{ env.FLAKE8_CONFIG }} != ".flake8" ]]; then rm .flake8; fi - - - name: Run flake8 - run: flake8 --config ${{ env.FLAKE8_CONFIG}} pilot.py pilot/ - - name: Run unit tests run: python -m unittest - diff --git a/PILOTVERSION b/PILOTVERSION index 2d892f10..e20a74ab 100644 --- a/PILOTVERSION +++ b/PILOTVERSION @@ -1 +1 @@ -3.6.7.10 \ No newline at end of file +3.6.8.29 \ No newline at end of file diff --git a/pilot/api/data.py b/pilot/api/data.py index 3e014b58..3bfe31da 100644 --- a/pilot/api/data.py +++ b/pilot/api/data.py @@ -50,6 +50,7 @@ class StagingClient(object): """ ipv = "IPv6" + workdir = '' mode = "" # stage-in/out, set by the inheritor of the class copytool_modules = {'rucio': {'module_name': 'rucio'}, 'gfal': {'module_name': 'gfal'}, @@ -69,7 +70,7 @@ class StagingClient(object): # list of allowed schemas to be used for transfers from REMOTE sites remoteinput_allowed_schemas = ['root', 'gsiftp', 'dcap', 'srm', 'storm', 'https'] - def __init__(self, infosys_instance=None, acopytools=None, logger=None, default_copytools='rucio', trace_report=None, ipv='IPv6'): + def __init__(self, infosys_instance=None, acopytools=None, logger=None, default_copytools='rucio', trace_report=None, ipv='IPv6', workdir=''): """ If `acopytools` is not specified then it will be automatically resolved via infosys. In this case `infosys` requires initialization. :param acopytools: dict of copytool names per activity to be used for transfers. Accepts also list of names or string value without activity passed. @@ -87,6 +88,7 @@ def __init__(self, infosys_instance=None, acopytools=None, logger=None, default_ self.logger = logger self.infosys = infosys_instance or infosys self.ipv = ipv + self.workdir = workdir if isinstance(acopytools, str): acopytools = {'default': [acopytools]} if acopytools else {} @@ -103,7 +105,7 @@ def __init__(self, infosys_instance=None, acopytools=None, logger=None, default_ self.acopytools['default'] = self.get_default_copytools(default_copytools) # get an initialized trace report (has to be updated for get/put if not defined before) - self.trace_report = trace_report if trace_report else TraceReport(pq=os.environ.get('PILOT_SITENAME', ''), ipv=self.ipv) + self.trace_report = trace_report if trace_report else TraceReport(pq=os.environ.get('PILOT_SITENAME', ''), ipv=self.ipv, workdir=self.workdir) if not self.acopytools: msg = 'failed to initilize StagingClient: no acopytools options found, acopytools=%s' % self.acopytools diff --git a/pilot/control/data.py b/pilot/control/data.py index d0f33e97..8ab502f7 100644 --- a/pilot/control/data.py +++ b/pilot/control/data.py @@ -185,7 +185,7 @@ def create_trace_report(job, label='stage-in'): event_type, localsite, remotesite = get_trace_report_variables(job, label=label) trace_report = TraceReport(pq=os.environ.get('PILOT_SITENAME', ''), localSite=localsite, remoteSite=remotesite, - dataset="", eventType=event_type) + dataset="", eventType=event_type, workdir=job.workdir) trace_report.init(job) return trace_report @@ -239,7 +239,7 @@ def _stage_in(args, job): client = StageInESClient(job.infosys, logger=logger, trace_report=trace_report) activity = 'es_events_read' else: - client = StageInClient(job.infosys, logger=logger, trace_report=trace_report, ipv=args.internet_protocol_version) + client = StageInClient(job.infosys, logger=logger, trace_report=trace_report, ipv=args.internet_protocol_version, workdir=job.workdir) activity = 'pr' use_pcache = job.infosys.queuedata.use_pcache # get the proper input file destination (normally job.workdir unless stager workflow) @@ -950,7 +950,7 @@ def _do_stageout(job, xdata, activity, queue, title, output_dir='', rucio_host=' # create the trace report trace_report = create_trace_report(job, label=label) - client = StageOutClient(job.infosys, logger=logger, trace_report=trace_report, ipv=ipv) + client = StageOutClient(job.infosys, logger=logger, trace_report=trace_report, ipv=ipv, workdir=job.workdir) kwargs = dict(workdir=job.workdir, cwd=job.workdir, usecontainer=False, job=job, output_dir=output_dir, catchall=job.infosys.queuedata.catchall, rucio_host=rucio_host) #, mode='stage-out') # prod analy unification: use destination preferences from PanDA server for unified queues diff --git a/pilot/control/job.py b/pilot/control/job.py index 46d4d480..08200f66 100644 --- a/pilot/control/job.py +++ b/pilot/control/job.py @@ -36,14 +36,23 @@ LOG_TRANSFER_IN_PROGRESS, LOG_TRANSFER_DONE, LOG_TRANSFER_FAILED, SERVER_UPDATE_TROUBLE, SERVER_UPDATE_FINAL, \ SERVER_UPDATE_UPDATING, SERVER_UPDATE_NOT_DONE from pilot.util.container import execute -from pilot.util.filehandling import find_text_files, tail, is_json, copy, remove, write_file, \ - create_symlink, write_json +from pilot.util.filehandling import ( + find_text_files, + tail, + is_json, + copy, + remove, + write_file, + create_symlink, + write_json, + get_total_input_size +) from pilot.util.harvester import request_new_jobs, remove_job_request_file, parse_job_definition_file, \ is_harvester_mode, get_worker_attributes_file, publish_job_report, publish_work_report, get_event_status_file, \ publish_stageout_files from pilot.util.jobmetrics import get_job_metrics from pilot.util.loggingsupport import establish_logging -from pilot.util.math import mean +from pilot.util.math import mean, float_to_rounded_string from pilot.util.middleware import containerise_general_command from pilot.util.monitoring import job_monitor_tasks, check_local_space from pilot.util.monitoringtime import MonitoringTime @@ -619,10 +628,6 @@ def get_data_structure(job, state, args, xml=None, metadata=None, final=False): if starttime: data['startTime'] = starttime - job_metrics = get_job_metrics(job) - if job_metrics: - data['jobMetrics'] = job_metrics - if xml is not None: data['xml'] = xml if metadata is not None: @@ -656,6 +661,7 @@ def get_data_structure(job, state, args, xml=None, metadata=None, final=False): cpumodel = get_cpu_cores(cpumodel) # add the CPU cores if not present data['cpuConsumptionUnit'] = job.cpuconsumptionunit + "+" + cpumodel + # CPU instruction set instruction_sets = has_instruction_sets(['AVX2']) product, vendor = get_display_info() if instruction_sets: @@ -666,6 +672,7 @@ def get_data_structure(job, state, args, xml=None, metadata=None, final=False): if product and vendor: logger.debug(f'cpuConsumptionUnit: could have added: product={product}, vendor={vendor}') + # CPU architecture cpu_arch = get_cpu_arch() if cpu_arch: logger.debug(f'cpu arch={cpu_arch}') @@ -673,6 +680,32 @@ def get_data_structure(job, state, args, xml=None, metadata=None, final=False): # add memory information if available add_memory_info(data, job.workdir, name=job.memorymonitor) + + # job metrics + + # add read_bytes from memory monitor to job metrics if available + extra = {} + if 'totRBYTES' in data: + _totalsize = get_total_input_size(job.indata, nolib=True) + logger.debug(f'_totalsize={_totalsize}') + logger.debug(f"current max read_bytes: {data.get('totRBYTES')}") + try: + readfrac = data.get('totRBYTES') / _totalsize + except (TypeError, ZeroDivisionError) as exc: + logger.warning(f"failed to calculate {data.get('totRBYTES')}/{_totalsize}: {exc}") + readfrac = None + else: + readfrac = float_to_rounded_string(readfrac, precision=2) + logger.debug(f'readbyterate={readfrac}') + if readfrac: + extra = {'readbyterate': readfrac} + else: + logger.debug('read_bytes info not yet available') + job_metrics = get_job_metrics(job, extra=extra) + if job_metrics: + data['jobMetrics'] = job_metrics + + # add timing info if finished or failed if state == 'finished' or state == 'failed': add_timing_and_extracts(data, job, state, args) https.add_error_codes(data, job) @@ -2836,103 +2869,102 @@ def job_monitor(queues, traces, args): # noqa: C901 # stop_monitoring = False # continue with the main loop (while cont) for i in range(len(jobs)): - current_id = jobs[i].jobid - #if current_id in no_monitoring: - # stop_monitoring = True - # delta = peeking_time - no_monitoring.get(current_id, 0) - # if delta > 60: - # logger.warning(f'job monitoring has waited {delta} s for job {current_id} to finish - aborting') - # break - - error_code = None - if abort_job and args.signal: - # if abort_job and a kill signal was set - error_code = get_signal_error(args.signal) - elif abort_job: # i.e. no kill signal - logger.info('tobekilled seen by job_monitor (error code should already be set) - abort job only') - elif os.environ.get('REACHED_MAXTIME', None): - # the batch system max time has been reached, time to abort (in the next step) - logger.info('REACHED_MAXTIME seen by job monitor - abort everything') - if not args.graceful_stop.is_set(): - logger.info('setting graceful_stop since it was not set already') - args.graceful_stop.set() - error_code = errors.REACHEDMAXTIME - if error_code: - jobs[i].state = 'failed' - jobs[i].piloterrorcodes, jobs[i].piloterrordiags = errors.add_error_code(error_code) - jobs[i].completed = True - if not jobs[i].completed: # job.completed gets set to True after a successful final server update: - send_state(jobs[i], args, jobs[i].state) - if jobs[i].pid: - logger.debug('killing payload processes') - kill_processes(jobs[i].pid) - - logger.info('monitor loop #%d: job %d:%s is in state \'%s\'', n, i, current_id, jobs[i].state) - if jobs[i].state == 'finished' or jobs[i].state == 'failed': - logger.info('will abort job monitoring soon since job state=%s (job is still in queue)', jobs[i].state) - if args.workflow == 'stager': # abort interactive stager pilot, this will trigger an abort of all threads - set_pilot_state(job=jobs[i], state="finished") - logger.info('ordering log transfer') - jobs[i].stageout = 'log' # only stage-out log file - put_in_queue(jobs[i], queues.data_out) - cont = False - # no_monitoring[current_id] = int(time.time()) - break - - # perform the monitoring tasks - exit_code, diagnostics = job_monitor_tasks(jobs[i], mt, args) - logger.debug(f'job_monitor_tasks returned {exit_code}, {diagnostics}') - if exit_code != 0: - # do a quick server update with the error diagnostics only - preliminary_server_update(jobs[i], args, diagnostics) - if exit_code == errors.VOMSPROXYABOUTTOEXPIRE: - # attempt to download a new proxy since it is about to expire - ec = download_new_proxy(role='production') - exit_code = ec if ec != 0 else 0 # reset the exit_code if success - if exit_code == errors.KILLPAYLOAD or exit_code == errors.NOVOMSPROXY or exit_code == errors.CERTIFICATEHASEXPIRED: - jobs[i].piloterrorcodes, jobs[i].piloterrordiags = errors.add_error_code(exit_code) - logger.debug('killing payload process') - kill_process(jobs[i].pid) - break - elif exit_code == errors.LEASETIME: # stager mode, order log stage-out - set_pilot_state(job=jobs[i], state="finished") - logger.info('ordering log transfer') - jobs[i].stageout = 'log' # only stage-out log file - put_in_queue(jobs[i], queues.data_out) - elif exit_code == 0: - # ie if download of new proxy was successful - diagnostics = "" - break - else: - try: - fail_monitored_job(jobs[i], exit_code, diagnostics, queues, traces) - except Exception as error: - logger.warning('(1) exception caught: %s (job id=%s)', error, current_id) + try: + current_id = jobs[i].jobid + error_code = None + if abort_job and args.signal: + # if abort_job and a kill signal was set + error_code = get_signal_error(args.signal) + elif abort_job: # i.e. no kill signal + logger.info('tobekilled seen by job_monitor (error code should already be set) - abort job only') + elif os.environ.get('REACHED_MAXTIME', None): + # the batch system max time has been reached, time to abort (in the next step) + logger.info('REACHED_MAXTIME seen by job monitor - abort everything') + if not args.graceful_stop.is_set(): + logger.info('setting graceful_stop since it was not set already') + args.graceful_stop.set() + error_code = errors.REACHEDMAXTIME + if error_code: + jobs[i].state = 'failed' + jobs[i].piloterrorcodes, jobs[i].piloterrordiags = errors.add_error_code(error_code) + jobs[i].completed = True + if not jobs[i].completed: # job.completed gets set to True after a successful final server update: + send_state(jobs[i], args, jobs[i].state) + if jobs[i].pid: + logger.debug('killing payload processes') + kill_processes(jobs[i].pid) + + logger.info('monitor loop #%d: job %d:%s is in state \'%s\'', n, i, current_id, jobs[i].state) + if jobs[i].state == 'finished' or jobs[i].state == 'failed': + logger.info('will abort job monitoring soon since job state=%s (job is still in queue)', jobs[i].state) + if args.workflow == 'stager': # abort interactive stager pilot, this will trigger an abort of all threads + set_pilot_state(job=jobs[i], state="finished") + logger.info('ordering log transfer') + jobs[i].stageout = 'log' # only stage-out log file + put_in_queue(jobs[i], queues.data_out) + cont = False + # no_monitoring[current_id] = int(time.time()) break - # run this check again in case job_monitor_tasks() takes a long time to finish (and the job object - # has expired in the meantime) - try: - _job = jobs[i] - except Exception: - logger.info('aborting job monitoring since job object (job id=%s) has expired', current_id) - break + # perform the monitoring tasks + exit_code, diagnostics = job_monitor_tasks(jobs[i], mt, args) + logger.debug(f'job_monitor_tasks returned {exit_code}, {diagnostics}') + if exit_code != 0: + # do a quick server update with the error diagnostics only + preliminary_server_update(jobs[i], args, diagnostics) + if exit_code == errors.VOMSPROXYABOUTTOEXPIRE: + # attempt to download a new proxy since it is about to expire + ec = download_new_proxy(role='production') + exit_code = ec if ec != 0 else 0 # reset the exit_code if success + if exit_code == errors.KILLPAYLOAD or exit_code == errors.NOVOMSPROXY or exit_code == errors.CERTIFICATEHASEXPIRED: + jobs[i].piloterrorcodes, jobs[i].piloterrordiags = errors.add_error_code(exit_code) + logger.debug('killing payload process') + kill_process(jobs[i].pid) + break + elif exit_code == errors.LEASETIME: # stager mode, order log stage-out + set_pilot_state(job=jobs[i], state="finished") + logger.info('ordering log transfer') + jobs[i].stageout = 'log' # only stage-out log file + put_in_queue(jobs[i], queues.data_out) + elif exit_code == 0: + # ie if download of new proxy was successful + diagnostics = "" + break + else: + try: + fail_monitored_job(jobs[i], exit_code, diagnostics, queues, traces) + except Exception as error: + logger.warning('(1) exception caught: %s (job id=%s)', error, current_id) + break + + # run this check again in case job_monitor_tasks() takes a long time to finish (and the job object + # has expired in the meantime) + try: + _job = jobs[i] + except Exception: + logger.info('aborting job monitoring since job object (job id=%s) has expired', current_id) + break - # send heartbeat if it is time (note that the heartbeat function might update the job object, e.g. - # by turning on debug mode, ie we need to get the heartbeat period in case it has changed) - try: - update_time = send_heartbeat_if_time(_job, args, update_time) - except Exception as error: - logger.warning('exception caught: %s (job id=%s)', error, current_id) - break - else: - # note: when sending a state change to the server, the server might respond with 'tobekilled' - if _job.state == 'failed': - logger.warning('job state is \'failed\' - order log transfer and abort job_monitor() (2)') - _job.stageout = 'log' # only stage-out log file - put_in_queue(_job, queues.data_out) - #abort = True + # send heartbeat if it is time (note that the heartbeat function might update the job object, e.g. + # by turning on debug mode, ie we need to get the heartbeat period in case it has changed) + try: + update_time = send_heartbeat_if_time(_job, args, update_time) + except Exception as error: + logger.warning('exception caught: %s (job id=%s)', error, current_id) break + else: + # note: when sending a state change to the server, the server might respond with 'tobekilled' + if _job.state == 'failed': + logger.warning('job state is \'failed\' - order log transfer and abort job_monitor() (2)') + _job.stageout = 'log' # only stage-out log file + put_in_queue(_job, queues.data_out) + #abort = True + break + + except Exception as exc: + # did the job object expire? + logger.warning(f'exception caught: {exc}') + continue #if stop_monitoring: # continue diff --git a/pilot/control/payloads/generic.py b/pilot/control/payloads/generic.py index 7d467076..6479464b 100644 --- a/pilot/control/payloads/generic.py +++ b/pilot/control/payloads/generic.py @@ -417,8 +417,8 @@ def run_command(self, cmd, label=None): try: out = open(os.path.join(self.__job.workdir, self.__coprocess_stdout_name), 'wb') err = open(os.path.join(self.__job.workdir, self.__coprocess_stderr_name), 'wb') - except Exception as error: - logger.warning('failed to open coprocess stdout/err: %s', error) + except IOError as error: + logger.warning(f'failed to open coprocess stdout/err: {error}') out = None err = None else: diff --git a/pilot/info/dataloader.py b/pilot/info/dataloader.py index 65ba69ff..c416345f 100644 --- a/pilot/info/dataloader.py +++ b/pilot/info/dataloader.py @@ -5,7 +5,7 @@ # # Authors: # - Alexey Anisenkov, anisyonk@cern.ch, 2018 -# - Paul Nilsson, paul.nilsson@cern.ch, 2019 +# - Paul Nilsson, paul.nilsson@cern.ch, 2019-2023 """ Base loader class to retrive data from Ext sources (file, url) @@ -202,7 +202,7 @@ def merge_dict_data(d1, d2, keys=[], common=True, left=True, right=True, rec=Fal ### TODO: verify and configure logic later - if not (type(d1) == type(d2) and type(d1) is dict): + if not (isinstance(d1, dict) and isinstance(d2, dict)): return d2 ret = d1.copy() diff --git a/pilot/scripts/stagein.py b/pilot/scripts/stagein.py index e5fe93fb..748fbae4 100644 --- a/pilot/scripts/stagein.py +++ b/pilot/scripts/stagein.py @@ -405,7 +405,7 @@ def extract_error_info(errc): # generate the trace report trace_report = TraceReport(pq=os.environ.get('PILOT_SITENAME', ''), localSite=args.localsite, remoteSite=args.remotesite, dataset="", - eventType=args.eventtype) + eventType=args.eventtype, workdir=args.workdir) job = Job(produserid=args.produserid, jobid=args.jobid, taskid=args.taskid, jobdefinitionid=args.jobdefinitionid) trace_report.init(job) @@ -423,7 +423,7 @@ def extract_error_info(errc): client = StageInESClient(infoservice, logger=logger, trace_report=trace_report) activity = 'es_events_read' else: - client = StageInClient(infoservice, logger=logger, trace_report=trace_report) + client = StageInClient(infoservice, logger=logger, trace_report=trace_report, workdir=args.workdir) activity = 'pr' kwargs = dict(workdir=args.workdir, cwd=args.workdir, usecontainer=False, use_pcache=args.usepcache, use_bulk=False, use_vp=args.usevp, input_dir=args.inputdir, catchall=args.catchall, rucio_host=args.rucio_host) diff --git a/pilot/scripts/stageout.py b/pilot/scripts/stageout.py index 33a802d2..83b1fa00 100644 --- a/pilot/scripts/stageout.py +++ b/pilot/scripts/stageout.py @@ -311,7 +311,7 @@ def extract_error_info(err): # generate the trace report trace_report = TraceReport(pq=os.environ.get('PILOT_SITENAME', ''), localSite=args.localsite, - remoteSite=args.remotesite, dataset="", eventType=args.eventtype) + remoteSite=args.remotesite, dataset="", eventType=args.eventtype, workdir=args.workdir) job = Job(produserid=args.produserid, jobid=args.jobid, taskid=args.taskid, jobdefinitionid=args.jobdefinitionid) trace_report.init(job) @@ -328,7 +328,7 @@ def extract_error_info(err): xfiles = None activity = 'pw' - client = StageOutClient(infoservice, logger=logger, trace_report=trace_report) + client = StageOutClient(infoservice, logger=logger, trace_report=trace_report, workdir=args.workdir) kwargs = dict(workdir=args.workdir, cwd=args.workdir, usecontainer=False, job=job, output_dir=args.outputdir, catchall=args.catchall, rucio_host=args.rucio_host) # , mode='stage-out') diff --git a/pilot/user/atlas/common.py b/pilot/user/atlas/common.py index 0ef10027..75ca3907 100644 --- a/pilot/user/atlas/common.py +++ b/pilot/user/atlas/common.py @@ -360,6 +360,8 @@ def process_remote_file_traces(path, job, not_opened_turls): logger.warning('failed to read back base trace report (cannot send trace reports)') else: # update and send the trace info + if 'workdir' not in base_trace_report: + base_trace_report['workdir'] = job.workdir for fspec in job.indata: if fspec.status == 'remote_io': base_trace_report.update(url=fspec.turl) @@ -1641,7 +1643,7 @@ def get_outfiles_records(subfiles): } nentries = subfile.get('nentries', 'UNDEFINED') - if type(nentries) == int: + if isinstance(nentries, int): res[subfile['name']]['nentries'] = nentries else: logger.warning("nentries is undefined in job report") diff --git a/pilot/user/atlas/jobmetrics.py b/pilot/user/atlas/jobmetrics.py index 4dc40303..02b59dcb 100644 --- a/pilot/user/atlas/jobmetrics.py +++ b/pilot/user/atlas/jobmetrics.py @@ -12,9 +12,11 @@ import logging from pilot.api import analytics +from pilot.common.exception import FileHandlingFailure +from pilot.util.config import config from pilot.util.jobmetrics import get_job_metrics_entry from pilot.util.features import MachineFeatures, JobFeatures -from pilot.util.filehandling import find_last_line +from pilot.util.filehandling import find_last_line, read_file from pilot.util.math import float_to_rounded_string from .cpu import get_core_count @@ -24,11 +26,12 @@ logger = logging.getLogger(__name__) -def get_job_metrics_string(job): +def get_job_metrics_string(job, extra={}): """ Get the job metrics string. - :param job: job object. + :param job: job object + :param extra: any extra information to be added (dict) :return: job metrics (string). """ @@ -72,6 +75,11 @@ def get_job_metrics_string(job): else: logger.info("will not add max space = %d B to job metrics", max_space) + # is there a detected rucio trace service error? + trace_exit_code = get_trace_exit_code(job.workdir) + if trace_exit_code != '0': + job_metrics += get_job_metrics_entry("rucioTraceError", trace_exit_code) + # add job and machine feature data if available job_metrics = add_features(job_metrics, corecount, add=['hs06']) @@ -86,9 +94,37 @@ def get_job_metrics_string(job): job_metrics += get_job_metrics_entry("schedulerIP", job.dask_scheduler_ip) job_metrics += get_job_metrics_entry("sessionIP", job.jupyter_session_ip) + # add any additional info + if extra: + for entry in extra: + job_metrics += get_job_metrics_entry(entry, extra.get(entry)) + return job_metrics +def get_trace_exit_code(workdir): + """ + Look for any rucio trace curl problems using an env var and a file. + + :param workdir: payload work directory (str) + :return: curl exit code (str). + """ + + trace_exit_code = os.environ.get('RUCIO_TRACE_ERROR', '0') + if trace_exit_code == '0': + # look for rucio_trace_error_file in case middleware container is used + path = os.path.join(workdir, config.Rucio.rucio_trace_error_file) + if os.path.exists(path): + try: + trace_exit_code = read_file(path) + except FileHandlingFailure as exc: + logger.warning(f'failed to read {path}: {exc}') + else: + logger.debug(f'read {trace_exit_code} from file {path}') + + return trace_exit_code + + def add_features(job_metrics, corecount, add=[]): """ Add job and machine feature data to the job metrics if available @@ -183,7 +219,7 @@ def add_event_number(job_metrics, workdir): return job_metrics -def get_job_metrics(job): +def get_job_metrics(job, extra={}): """ Return a properly formatted job metrics string. The format of the job metrics string is defined by the server. It will be reported to the server during updateJob. @@ -193,12 +229,13 @@ def get_job_metrics(job): Format: nEvents= nEventsW= vmPeakMax= vmPeakMean= RSSMean= hs06= shutdownTime= cpuFactor= cpuLimit= diskLimit= jobStart= memLimit= runLimit= - :param job: job object. + :param job: job object + :param extra: any extra information to be added (dict) :return: job metrics (string). """ # get job metrics string - job_metrics = get_job_metrics_string(job) + job_metrics = get_job_metrics_string(job, extra=extra) # correct for potential initial and trailing space job_metrics = job_metrics.lstrip().rstrip() diff --git a/pilot/user/atlas/utilities.py b/pilot/user/atlas/utilities.py index 5dc50343..492673bb 100644 --- a/pilot/user/atlas/utilities.py +++ b/pilot/user/atlas/utilities.py @@ -162,12 +162,15 @@ def get_proper_pid(pid, pgrp, jobid, command="", transformation="", outdata="", ps = get_ps_info(pgrp) #logger.debug('ps:\n%s' % ps) + #_pid = os.getpid() + #logger.debug(f'current pid={_pid}') + #logger.debug(f'current ppid={os.getppid()}') # /bin/bash parent process (parent to pilot and prmon, ..) # lookup the process id using ps aux logger.debug(f'attempting to identify pid from job id ({jobid})') _pid = get_pid_for_jobid(ps, jobid) if _pid: - logger.debug('discovered pid=%d for job id %s' % (_pid, jobid)) + logger.debug(f'discovered pid={_pid} for job id {jobid}') break #logger.debug('attempting to identify pid from transform name and its output') @@ -176,7 +179,7 @@ def get_proper_pid(pid, pgrp, jobid, command="", transformation="", outdata="", # logger.debug('discovered pid=%d for transform name \"%s\"' % (_pid, transformation)) # break - logger.warning('payload pid has not yet been identified (#%d/#%d)' % (i + 1, imax)) + logger.warning(f'payload pid has not yet been identified (#{i + 1}/#{imax})') # wait until the payload has launched time.sleep(5) @@ -185,7 +188,7 @@ def get_proper_pid(pid, pgrp, jobid, command="", transformation="", outdata="", if _pid: pid = _pid - logger.info('will use pid=%d for memory monitor' % pid) + logger.info(f'will use pid={pid} for memory monitor') return pid @@ -549,7 +552,7 @@ def get_average_summary_dictionary_prmon(path): def filter_value(value): """ Inline function used to remove any string or None values from data. """ - if type(value) == str or value is None: + if isinstance(value, str) or value is None: return False else: return True @@ -641,7 +644,7 @@ def convert_text_file_to_dictionary(path): _l = [_f for _f in _l.split('\t') if _f] # define dictionary keys - if type(_l[0]) == str and not header_locked: + if isinstance(_l[0], str) and not header_locked: summary_keys = _l for key in _l: dictionary[key] = [] diff --git a/pilot/user/generic/jobmetrics.py b/pilot/user/generic/jobmetrics.py index ec59877f..1ceceb6e 100644 --- a/pilot/user/generic/jobmetrics.py +++ b/pilot/user/generic/jobmetrics.py @@ -13,7 +13,7 @@ logger = logging.getLogger(__name__) -def get_job_metrics(job): +def get_job_metrics(job, extra={}): """ Return a properly formatted job metrics string. The format of the job metrics string is defined by the server. It will be reported to the server during updateJob. @@ -23,7 +23,8 @@ def get_job_metrics(job): Format: nEvents= nEventsW= vmPeakMax= vmPeakMean= RSSMean= hs06= shutdownTime= cpuFactor= cpuLimit= diskLimit= jobStart= memLimit= runLimit= - :param job: job object. + :param job: job object + :param extra: any extra information to be added (dict) :return: job metrics (string). """ diff --git a/pilot/user/rubin/jobmetrics.py b/pilot/user/rubin/jobmetrics.py index 3c9de969..abb4daef 100644 --- a/pilot/user/rubin/jobmetrics.py +++ b/pilot/user/rubin/jobmetrics.py @@ -13,7 +13,7 @@ logger = logging.getLogger(__name__) -def get_job_metrics(job): +def get_job_metrics(job, extra={}): """ Return a properly formatted job metrics string. The format of the job metrics string is defined by the server. It will be reported to the server during updateJob. @@ -23,7 +23,8 @@ def get_job_metrics(job): Format: nEvents= nEventsW= vmPeakMax= vmPeakMean= RSSMean= hs06= shutdownTime= cpuFactor= cpuLimit= diskLimit= jobStart= memLimit= runLimit= - :param job: job object. + :param job: job object + :param extra: any extra information to be added (dict) :return: job metrics (string). """ diff --git a/pilot/user/rubin/utilities.py b/pilot/user/rubin/utilities.py index 023517cd..020f5f35 100644 --- a/pilot/user/rubin/utilities.py +++ b/pilot/user/rubin/utilities.py @@ -250,7 +250,7 @@ def get_average_summary_dictionary_prmon(path): def filter_value(value): """ Inline function used to remove any string or None values from data. """ - if type(value) == str or value is None: + if isinstance(value, str) or value is None: return False else: return True @@ -342,7 +342,7 @@ def convert_text_file_to_dictionary(path): _l = [_f for _f in _l.split('\t') if _f] # define dictionary keys - if type(_l[0]) == str and not header_locked: + if isinstance(_l[0], str) and not header_locked: summary_keys = _l for key in _l: dictionary[key] = [] diff --git a/pilot/user/sphenix/jobmetrics.py b/pilot/user/sphenix/jobmetrics.py index 43b34663..816d6d10 100644 --- a/pilot/user/sphenix/jobmetrics.py +++ b/pilot/user/sphenix/jobmetrics.py @@ -13,7 +13,7 @@ logger = logging.getLogger(__name__) -def get_job_metrics(job): +def get_job_metrics(job, extra={}): """ Return a properly formatted job metrics string. The format of the job metrics string is defined by the server. It will be reported to the server during updateJob. @@ -23,7 +23,8 @@ def get_job_metrics(job): Format: nEvents= nEventsW= vmPeakMax= vmPeakMean= RSSMean= hs06= shutdownTime= cpuFactor= cpuLimit= diskLimit= jobStart= memLimit= runLimit= - :param job: job object. + :param job: job object + :param extra: any extra information to be added (dict) :return: job metrics (string). """ diff --git a/pilot/user/sphenix/utilities.py b/pilot/user/sphenix/utilities.py index 6bcc0409..d51cc0b4 100644 --- a/pilot/user/sphenix/utilities.py +++ b/pilot/user/sphenix/utilities.py @@ -527,7 +527,7 @@ def get_average_summary_dictionary_prmon(path): def filter_value(value): """ Inline function used to remove any string or None values from data. """ - if type(value) == str or value is None: + if isinstance(value, str) or value is None: return False else: return True @@ -619,7 +619,7 @@ def convert_text_file_to_dictionary(path): _l = [_f for _f in _l.split('\t') if _f] # define dictionary keys - if type(_l[0]) == str and not header_locked: + if isinstance(_l[0], str) and not header_locked: summary_keys = _l for key in _l: dictionary[key] = [] diff --git a/pilot/util/constants.py b/pilot/util/constants.py index 687562c9..8d1433f9 100644 --- a/pilot/util/constants.py +++ b/pilot/util/constants.py @@ -13,8 +13,8 @@ # Pilot version RELEASE = '3' # released number should be fixed at 3 for Pilot 3 VERSION = '6' # version number is '1' for first release, '0' until then, increased for bigger updates -REVISION = '7' # revision number should be reset to '0' for every new version release, increased for small updates -BUILD = '10' # build number should be reset to '1' for every new development cycle +REVISION = '8' # revision number should be reset to '0' for every new version release, increased for small updates +BUILD = '29' # build number should be reset to '1' for every new development cycle SUCCESS = 0 FAILURE = 1 diff --git a/pilot/util/container.py b/pilot/util/container.py index a5da7f18..88b77699 100644 --- a/pilot/util/container.py +++ b/pilot/util/container.py @@ -15,10 +15,10 @@ from os import environ, getcwd, getpgid, kill #, setpgrp, getpgid #setsid from time import sleep from signal import SIGTERM, SIGKILL -from typing import Any +from typing import Any, TextIO from pilot.common.errorcodes import ErrorCodes -from pilot.util.loggingsupport import flush_handler +#from pilot.util.loggingsupport import flush_handler from pilot.util.processgroups import kill_process_group logger = logging.getLogger(__name__) @@ -71,6 +71,7 @@ def execute(executable: Any, **kwargs: dict) -> Any: exit_code = 0 stdout = '' stderr = '' + # Acquire the lock before creating the subprocess with execute_lock: process = subprocess.Popen(exe, @@ -89,17 +90,27 @@ def execute(executable: Any, **kwargs: dict) -> Any: stdout, stderr = process.communicate(timeout=timeout) except subprocess.TimeoutExpired as exc: # make sure that stdout buffer gets flushed - in case of time-out exceptions - flush_handler(name="stream_handler") + # flush_handler(name="stream_handler") stderr += f'subprocess communicate sent TimeoutExpired: {exc}' logger.warning(stderr) exit_code = errors.COMMANDTIMEDOUT stderr = kill_all(process, stderr) + except Exception as exc: + logger.warning(f'exception caused when executing command: {executable}: {exc}') + exit_code = errors.UNKNOWNEXCEPTION + stderr = kill_all(process, str(exc)) else: exit_code = process.poll() # wait for the process to finish # (not strictly necessary when process.communicate() is used) - process.wait() + try: + # wait for the process to complete with a timeout of 60 seconds + process.wait(timeout=60) + except subprocess.TimeoutExpired: + # Handle the case where the process did not complete within the timeout + print("process did not complete within the timeout of 60s - terminating") + process.terminate() # remove any added \n if stdout and stdout.endswith('\n'): @@ -108,6 +119,66 @@ def execute(executable: Any, **kwargs: dict) -> Any: return exit_code, stdout, stderr +def execute2(executable: Any, stdout_file: TextIO, stderr_file: TextIO, timeout_seconds: int, **kwargs: dict) -> int: + + exit_code = None + + def _timeout_handler(): + # This function is called when the timeout occurs + nonlocal exit_code # Use nonlocal to modify the outer variable + logger.warning("subprocess execution timed out") + exit_code = -2 + process.terminate() # Terminate the subprocess if it's still running + logger.info(f'process terminated after {timeout_seconds}s') + + obscure = kwargs.get('obscure', '') # if this string is set, hide it in the log message + if not kwargs.get('mute', False): + print_executable(executable, obscure=obscure) + + exe = ['/usr/bin/python'] + executable.split() if kwargs.get('mode', 'bash') == 'python' else ['/bin/bash', '-c', executable] + + # Create the subprocess with stdout and stderr redirection to files + process = subprocess.Popen(exe, + stdout=stdout_file, + stderr=stderr_file, + cwd=kwargs.get('cwd', os.getcwd()), + preexec_fn=os.setsid, + encoding='utf-8', + errors='replace') + + # Set up a timer for the timeout + timeout_timer = threading.Timer(timeout_seconds, _timeout_handler) + + try: + # Start the timer + timeout_timer.start() + + # wait for the process to finish + try: + # wait for the process to complete with a timeout (this will likely never happen since a timer is used) + process.wait(timeout=timeout_seconds + 10) + except subprocess.TimeoutExpired: + # Handle the case where the process did not complete within the timeout + timeout_seconds = timeout_seconds + 10 + logger.warning(f"process wait did not complete within the timeout of {timeout_seconds}s - terminating") + exit_code = -2 + process.terminate() + except Exception as exc: + logger.warning(f'execution caught: {exc}') + finally: + # Cancel the timer to avoid it firing after the subprocess has completed + timeout_timer.cancel() + + if exit_code == -2: + # the process was terminated due to a time-out + exit_code = errors.COMMANDTIMEDOUT + else: + # get the exit code after a normal finish + exit_code = process.returncode + + return exit_code + + def get_timeout(requested_timeout): """ Define the timeout to be used with subprocess.communicate(). @@ -158,18 +229,24 @@ def kill_all(process: Any, stderr: str) -> str: try: logger.warning('killing lingering subprocess and process group') - process.kill() + sleep(1) + # process.kill() kill_process_group(getpgid(process.pid)) except ProcessLookupError as exc: stderr += f'\n(kill process group) ProcessLookupError={exc}' + except Exception as exc: + stderr += f'\n(kill_all 1) exception caught: {exc}' try: logger.warning('killing lingering process') + sleep(1) kill(process.pid, SIGTERM) logger.warning('sleeping a bit before sending SIGKILL') sleep(10) kill(process.pid, SIGKILL) except ProcessLookupError as exc: stderr += f'\n(kill process) ProcessLookupError={exc}' + except Exception as exc: + stderr += f'\n(kill_all 2) exception caught: {exc}' logger.warning(f'sent soft kill signals - final stderr: {stderr}') return stderr diff --git a/pilot/util/default.cfg b/pilot/util/default.cfg index bda415fc..eb8307e3 100644 --- a/pilot/util/default.cfg +++ b/pilot/util/default.cfg @@ -303,6 +303,9 @@ checksum_type: adler32 # Rucio server URL for traces url: https://rucio-lb-prod.cern.ch/traces/ +# Error info file in case of curl error +rucio_trace_error_file: rucio_trace_error.txt + # Rucio host host: https://voatlasrucio-server-prod.cern.ch:443 diff --git a/pilot/util/filehandling.py b/pilot/util/filehandling.py index 766ad25e..98cc9223 100644 --- a/pilot/util/filehandling.py +++ b/pilot/util/filehandling.py @@ -17,15 +17,16 @@ import time import uuid from collections.abc import Iterable, Mapping +from functools import partial, reduce from glob import glob from json import load, JSONDecodeError from json import dump as dumpjson +from mmap import mmap from pathlib import Path from shutil import copy2, rmtree -from zlib import adler32 -from functools import partial -from mmap import mmap +from typing import Any from zipfile import ZipFile, ZIP_DEFLATED +from zlib import adler32 from pilot.common.exception import ConversionFailure, FileHandlingFailure, MKDirFailure, NoSuchFile from .container import execute @@ -1256,3 +1257,62 @@ def get_directory_size(directory: str) -> float: logger.warning(f'failed to convert {match.group(1)} to float: {exc}') # path = match.group(2) return size_mb + + +def get_total_input_size(files: Any, nolib: bool = True) -> int: + """ + Calculate the total input file size, but do not include the lib file if present. + + :param files: files object (list of FileSpec) + :param nolib: if True, do not include the lib file in the calculation + :return: total input file size in bytes (int). + """ + + if not nolib: + total_size = reduce(lambda x, y: x + y.filesize, files, 0) + else: + total_size = 0 + for _file in files: + if nolib and '.lib.' not in _file.lfn: + total_size += _file.filesize + + return total_size + + +def append_to_file(from_file: str, to_file: str) -> bool: + """ + Appends the contents of one file to another. + + :param from_file: The path to the source file to read from (str) + :param to_file: The path to the target file to append to (str) + :return: True if the operation was successful, False otherwise (bool). + """ + + status = False + try: + # 1 kB chunk size + chunk_size = 1024 + + # Open the source file in read mode + with open(from_file, 'r') as source_file: + # Open the target file in append mode + with open(to_file, 'a') as target_file: + while True: + # Read a chunk from the source file + chunk = source_file.read(chunk_size) + if not chunk: + target_file.write('--------------------------------------\n') + break # Reached the end of the source file + + # Write the chunk to the target file + target_file.write(chunk) + + status = True + + except FileNotFoundError as exc: + logger.warning(f"file not found: {exc}") + + except IOError as exc: + logger.warning(f"an error occurred while processing the file: {exc}") + + return status diff --git a/pilot/util/jobmetrics.py b/pilot/util/jobmetrics.py index a4f6be22..50f5c577 100644 --- a/pilot/util/jobmetrics.py +++ b/pilot/util/jobmetrics.py @@ -30,7 +30,7 @@ def get_job_metrics_entry(name, value): return job_metrics_entry -def get_job_metrics(job): +def get_job_metrics(job, extra={}): """ Return a properly formatted job metrics string. Job metrics are highly user specific, so this function merely calls a corresponding get_job_metrics() in the @@ -42,7 +42,8 @@ def get_job_metrics(job): Format: nEvents= nEventsW= vmPeakMax= vmPeakMean= RSSMean= hs06= shutdownTime= cpuFactor= cpuLimit= diskLimit= jobStart= memLimit= runLimit= - :param job: job object. + :param job: job object + :param extra: any extra information to be added (dict) :return: job metrics (string). """ @@ -53,6 +54,6 @@ def get_job_metrics(job): job_metrics = None logger.warning(f'function not implemented in jobmetrics module: {exc}') else: - job_metrics = job_metrics_module.get_job_metrics(job) + job_metrics = job_metrics_module.get_job_metrics(job, extra=extra) return job_metrics diff --git a/pilot/util/math.py b/pilot/util/math.py index 1f32338f..a05fbf86 100644 --- a/pilot/util/math.py +++ b/pilot/util/math.py @@ -86,6 +86,9 @@ def float_to_rounded_string(num, precision=3): Convert float to a string with a desired number of digits (the precision). E.g. num=3.1415, precision=2 -> '3.14'. + round_to_n = lambda x, n: x if x == 0 else round(x, -int(math.floor(math.log10(abs(x)))) + (n - 1)) + round_to_n(x=0.123,n=2) + 0.12 :param num: number to be converted (float). :param precision: number of desired digits (int) :raises NotDefined: for undefined precisions and float conversions to Decimal. diff --git a/pilot/util/monitoring.py b/pilot/util/monitoring.py index b7a098bb..ae2a367c 100644 --- a/pilot/util/monitoring.py +++ b/pilot/util/monitoring.py @@ -11,7 +11,6 @@ import os import time -import re import subprocess from glob import glob from typing import Any @@ -46,7 +45,10 @@ get_subprocesses, reap_zombies ) -from pilot.util.psutils import is_process_running +from pilot.util.psutils import ( + is_process_running, + get_pid +) from pilot.util.timing import get_time_since from pilot.util.workernode import ( get_local_disk_space, @@ -91,10 +93,6 @@ def job_monitor_tasks(job, mt, args): # noqa: C901 # confirm that the worker node has a proper SC_CLK_TCK (problems seen on MPPMU) check_hz() - # verify that the process is still alive (again) - if not still_running(job.pid): - return 0, "" - try: cpuconsumptiontime = get_current_cpu_consumption_time(job.pid) except Exception as error: @@ -535,28 +533,12 @@ def utility_monitor(job): # noqa: C901 if utcmd == 'MemoryMonitor': if len(job.utilities[utcmd]) < 4: # only proceed if the pid has not been appended to the list already - try: - _ps = subprocess.run(['ps', 'aux', str(os.getpid())], stdout=subprocess.PIPE, - stderr=subprocess.PIPE, text=True, check=True, encoding='utf-8') - prmon = f'prmon --pid {job.pid}' - pid = None - pattern = r'\b\d+\b' - for line in _ps.stdout.split('\n'): - # line=atlprd55 16451 0.0 0.0 2944 1148 ? SN 17:42 0:00 prmon --pid 13096 .. - if prmon in line and f';{prmon}' not in line: # ignore the line that includes the setup - matches = re.findall(pattern, line) - if matches: - pid = matches[0] - logger.info(f'extracting prmon pid from line: {line}') - break - if pid: - logger.info(f'{prmon} command has pid={pid} (appending to cmd dictionary)') - job.utilities[utcmd].append(pid) - else: - logger.info(f'could not extract any pid from ps for cmd={prmon}') - - except subprocess.CalledProcessError as exc: - logger.warning(f"error: {exc}") + pid = get_pid(job.pid) + if pid: + logger.info(f'memory monitor command has pid {pid} (appending to cmd dictionary)') + job.utilities[utcmd].append(pid) + else: + logger.info('could not find any pid for memory monitor command') # make sure the subprocess is still running if not utproc.poll() is None: diff --git a/pilot/util/processes.py b/pilot/util/processes.py index be7869e5..398c22f2 100644 --- a/pilot/util/processes.py +++ b/pilot/util/processes.py @@ -551,7 +551,7 @@ def get_instant_cpu_consumption_time(pid): cstime = None hz = os.sysconf(os.sysconf_names['SC_CLK_TCK']) - if type(hz) != int: + if not isinstance(hz, int): logger.warning('unknown SC_CLK_TCK: %s', str(hz)) return 0.0 diff --git a/pilot/util/psutils.py b/pilot/util/psutils.py index 94d4bc7b..754a71be 100644 --- a/pilot/util/psutils.py +++ b/pilot/util/psutils.py @@ -7,14 +7,16 @@ # Authors: # - Paul Nilsson, paul.nilsson@cern.ch, 2023 +from re import findall import os +import subprocess try: import psutil except ImportError: print('FAILED; psutil module could not be imported') - is_psutil_available = False + _is_psutil_available = False else: - is_psutil_available = True + _is_psutil_available = True # from pilot.common.exception import MiddlewareImportFailure @@ -37,10 +39,88 @@ def is_process_running(pid): :raises: MiddlewareImportFailure if psutil module is not available. """ - if not is_psutil_available: + if not _is_psutil_available: is_running = is_process_running_by_pid(pid) logger.warning(f'using /proc/{pid} instead of psutil (is_running={is_running})') return is_running # raise MiddlewareImportFailure("required dependency could not be imported: psutil") else: return psutil.pid_exists(pid) + + +def get_pid(jobpid): + """ + Try to figure out the pid for the memory monitoring tool. + Attempt to use psutil, but use a fallback to ps-command based code if psutil is not available. + + :param jobpid: job.pid (int) + :return: pid (int|None). + """ + + pid = None + + if _is_psutil_available: + pid = find_pid_by_command_and_ppid('prmon', jobpid) + else: + try: + _ps = subprocess.run(['ps', 'aux', str(os.getpid())], stdout=subprocess.PIPE, + stderr=subprocess.PIPE, text=True, check=True, encoding='utf-8') + prmon = f'prmon --pid {jobpid}' + pid = None + pattern = r'\b\d+\b' + for line in _ps.stdout.split('\n'): + # line=atlprd55 16451 0.0 0.0 2944 1148 ? SN 17:42 0:00 prmon --pid 13096 .. + if prmon in line and f';{prmon}' not in line: # ignore the line that includes the setup + matches = findall(pattern, line) + if matches: + pid = matches[0] + logger.info(f'extracting prmon pid from line: {line}') + break + + except subprocess.CalledProcessError as exc: + logger.warning(f"error: {exc}") + + return pid + + +def find_pid_by_command_and_ppid(command, payload_pid): + """ + Find the process id corresponding to the given command, and ensure that it belongs to the given payload. + + :param command: command (string) + :param payload_pid: payload process id (int) + :return: process id (int) or None + """ + + if not _is_psutil_available: + logger.warning('find_pid_by_command_and_ppid(): psutil not available - aborting') + return None + + for process in psutil.process_iter(['pid', 'name', 'cmdline', 'ppid']): + try: + # Check if the process has a cmdline attribute (command-line arguments) + # cmdline = cmdline=['prmon', '--pid', '46258', '--filename', 'memory_monitor_output.txt', '--json-summary', + # 'memory_monitor_summary.json', '--interval', '60'] pid=54481 ppid=46487 name=prmon parent_pid=2840 + if process.info['cmdline'] and (command in process.info['cmdline'][0] and process.info['cmdline'][2] == str(payload_pid)): + logger.debug(f"command={command} is in {process.info['cmdline'][0]}") + logger.debug(f"ok returning pid={process.info['pid']}") + return process.info['pid'] + except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): + pass + return None + + +def get_parent_pid(pid): + """ + Return the parent process id for the given pid. + + :param pid: process id (int) + :return: parent process id (int or None). + """ + + try: + process = psutil.Process(pid) + parent_pid = process.ppid() + return parent_pid + except psutil.NoSuchProcess: + return None diff --git a/pilot/util/tracereport.py b/pilot/util/tracereport.py index d2dfb134..007835cd 100644 --- a/pilot/util/tracereport.py +++ b/pilot/util/tracereport.py @@ -6,20 +6,21 @@ # Authors: # - Alexey Anisenkov, alexey.anisenkov@cern.ch, 2017 # - Pavlo Svirin, pavlo.svirin@cern.ch, 2018 -# - Paul Nilsson, paul.nilsson@cern.ch, 2018 +# - Paul Nilsson, paul.nilsson@cern.ch, 2018-2023 import hashlib import os import socket import time from sys import exc_info -from json import dumps #, loads +from json import dumps from os import environ, getuid from pilot.util.config import config from pilot.util.constants import get_pilot_version, get_rucio_client_version -from pilot.util.container import execute -#from pilot.util.https import request +from pilot.util.container import execute, execute2 +from pilot.common.exception import FileHandlingFailure +from pilot.util.filehandling import append_to_file, write_file import logging logger = logging.getLogger(__name__) @@ -28,6 +29,7 @@ class TraceReport(dict): ipv = 'IPv6' + workdir = '' def __init__(self, *args, **kwargs): @@ -67,6 +69,7 @@ def __init__(self, *args, **kwargs): super(TraceReport, self).__init__(defs) self.update(dict(*args, **kwargs)) # apply extra input self.ipv = kwargs.get('ipv', 'IPv6') # ipv (internet protocol version) is needed below for the curl command, but should not be included in the report + self.workdir = kwargs.get('workdir', '') # workdir is needed for streaming the curl output, but should not be included in the report # sitename, dsname, eventType def init(self, job): @@ -109,7 +112,10 @@ def init(self, job): def get_value(self, key): """ + Return trace report value for given key. + :param key: key (str) + :return: trace report value (Any). """ return self.get(key, None) @@ -158,11 +164,14 @@ def send(self): logger.warning('cannot send trace since not all fields are set') return False + out = None + err = None try: # take care of the encoding data = dumps(self).replace('"', '\\"') - # remove the ipv item since it's for internal pilot use only + # remove the ipv and workdir items since they are for internal pilot use only data = data.replace(f'\"ipv\": \"{self.ipv}\", ', '') + data = data.replace(f'\"workdir\": \"{self.workdir}\", ', '') ssl_certificate = self.get_ssl_certificate() @@ -171,11 +180,43 @@ def send(self): if self.ipv == 'IPv4': command += ' -4' - cmd = f'{command} --connect-timeout 20 --max-time 120 --cacert {ssl_certificate} -v -k -d \"{data}\" {url}' - exit_code, stdout, stderr = execute(cmd, mute=False, timeout=300) - logger.debug(f'exit_code={exit_code}, stdout={stdout}, stderr={stderr}') - if exit_code or 'ExceptionClass' in stdout: - logger.warning('failed to send traces to rucio: %s' % stdout) + # stream the output to files to prevent massive reponses that could overwhelm subprocess.communicate() in execute() + outname, errname = self.get_trace_curl_filenames(name='trace_curl_last') + out, err = self.get_trace_curl_files(outname, errname) + logger.debug(f'using {outname} and {errname} to store curl output') + cmd = f'{command} --connect-timeout 100 --max-time 120 --cacert {ssl_certificate} -v -k -d \"{data}\" {url}' + exit_code = execute2(cmd, out, err, 300) + logger.debug(f'exit_code={exit_code}') + + # always append the output to trace_curl.std{out|err} + outname_final, errname_final = self.get_trace_curl_filenames(name='trace_curl') + _ = append_to_file(outname, outname_final) + _ = append_to_file(errname, errname_final) + self.close(out, err) + + # handle errors that only appear in stdout/err (curl) + if not exit_code: + out, err = self.get_trace_curl_files(outname, errname, mode='r') + exit_code = self.assign_error(out) + if not exit_code: + exit_code = self.assign_error(err) + logger.debug(f'curl exit_code from stdout/err={exit_code}') + self.close(out, err) + if not exit_code: + logger.info('no errors were detected from curl operation') + else: + # better to store exit code in file since env var will not be seen outside container in case middleware + # container is used + path = os.path.join(self.workdir, config.Rucio.rucio_trace_error_file) + try: + write_file(path, str(exit_code)) + except FileHandlingFailure as exc: + logger.warning(f'failed to store curl exit code to file: {exc}') + else: + logger.info(f'wrote rucio trace exit code {exit_code} to file {path}') + logger.debug(f"setting env var RUCIO_TRACE_ERROR to \'{exit_code}\' to be sent with job metrics") + os.environ['RUCIO_TRACE_ERROR'] = str(exit_code) + except Exception: # if something fails, log it but ignore logger.error('tracing failed: %s' % str(exc_info())) @@ -184,6 +225,70 @@ def send(self): return True + def close(self, out, err): + """ + Close all open file streams. + """ + + if out: + out.close() + if err: + err.close() + + def assign_error(self, out): + """ + Browse the stdout from curl line by line and look for errors. + """ + + exit_code = 0 + count = 0 + while True: + count += 1 + + # Get next line from file + line = out.readline() + + # if line is empty + # end of file is reached + if not line: + break + if 'ExceptionClass' in line.strip(): + logger.warning(f'curl failure: {line.strip()}') + exit_code = 1 + break + + return exit_code + + def get_trace_curl_filenames(self, name='trace_curl'): + """ + Return file names for the curl stdout and stderr. + + :param name: name pattern (str) + :return: stdout file name (str), stderr file name (str). + """ + + workdir = self.workdir if self.workdir else os.getcwd() + return os.path.join(workdir, f'{name}.stdout'), os.path.join(workdir, f'{name}.stderr') + + def get_trace_curl_files(self, outpath, errpath, mode='wb'): + """ + Return file objects for the curl stdout and stderr. + + :param outpath: path for stdout (str) + :param errpath: path for stderr (str) + :return: out (file), err (file). + """ + + try: + out = open(outpath, mode=mode) + err = open(errpath, mode=mode) + except IOError as error: + logger.warning(f'failed to open curl stdout/err: {error}') + out = None + err = None + + return out, err + def get_ssl_certificate(self): """ Return the path to the SSL certificate