diff --git a/.github/workflows/unit-tests.yml b/.github/workflows/unit-tests.yml index e18191eea..33d992900 100644 --- a/.github/workflows/unit-tests.yml +++ b/.github/workflows/unit-tests.yml @@ -11,7 +11,7 @@ jobs: continue-on-error: true strategy: matrix: - python-version: ['3.8', '3.9', '3.10'] + python-version: ['3.8', '3.9', '3.10', '3.11'] env: FLAKE8_VERSION: "==3.8.4" FLAKE8_CONFIG: ".flake8" diff --git a/PILOTVERSION b/PILOTVERSION index 7d1be157b..2a355da78 100644 --- a/PILOTVERSION +++ b/PILOTVERSION @@ -1 +1 @@ -3.6.4.7 \ No newline at end of file +3.6.5.32 \ No newline at end of file diff --git a/pilot.py b/pilot.py index 778ae7262..f74face46 100755 --- a/pilot.py +++ b/pilot.py @@ -54,6 +54,7 @@ https_setup, send_update, ) +from pilot.util.processgroups import find_defunct_subprocesses from pilot.util.loggingsupport import establish_logging from pilot.util.timing import add_to_pilot_timing @@ -666,6 +667,20 @@ def set_redirectall() -> None: args.redirectall = redirectall +def list_zombies() -> None: + """ + Make sure there are no remaining defunct processes still lingering. + + Note: can be used to find zombies, but zombies can't be killed.. + """ + + found = find_defunct_subprocesses(os.getpid()) + if found: + logging.info(f'found these defunct processes: {found}') + else: + logging.info('no defunct processes were found') + + if __name__ == '__main__': # Main function of pilot module. @@ -713,6 +728,10 @@ def set_redirectall() -> None: # store final time stamp (cannot be placed later since the mainworkdir is about to be purged) add_to_pilot_timing('0', PILOT_END_TIME, time.time(), args, store=False) + # make sure the pilot does not leave any lingering defunct child processes behind + if args.debug: + list_zombies() + # perform cleanup and terminate logging exit_code = wrap_up() diff --git a/pilot/api/data.py b/pilot/api/data.py index 5fbfe5cc8..3e014b588 100644 --- a/pilot/api/data.py +++ b/pilot/api/data.py @@ -40,7 +40,7 @@ from pilot.util.math import convert_mb_to_b from pilot.util.parameters import get_maximum_input_sizes from pilot.util.workernode import get_local_disk_space -from pilot.util.timer import TimeoutException +from pilot.util.auxiliary import TimeoutException from pilot.util.tracereport import TraceReport diff --git a/pilot/common/errorcodes.py b/pilot/common/errorcodes.py index 93ccf07a1..5af25abbd 100644 --- a/pilot/common/errorcodes.py +++ b/pilot/common/errorcodes.py @@ -159,6 +159,8 @@ class ErrorCodes: APPTAINERNOTINSTALLED = 1372 CERTIFICATEHASEXPIRED = 1373 REMOTEFILEDICTDOESNOTEXIST = 1374 + LEASETIME = 1375 + LOGCREATIONTIMEOUT = 1376 _error_messages = { GENERALERROR: "General pilot error, consult batch log", @@ -294,7 +296,9 @@ class ErrorCodes: VOMSPROXYABOUTTOEXPIRE: "VOMS proxy is about to expire", BADOUTPUTFILENAME: "Output file name contains illegal characters", CERTIFICATEHASEXPIRED: "Certificate has expired", - REMOTEFILEDICTDOESNOTEXIST: "Remote file open dictionary does not exist" + REMOTEFILEDICTDOESNOTEXIST: "Remote file open dictionary does not exist", + LEASETIME: "Lease time is up", # internal use only + LOGCREATIONTIMEOUT: "Log file creation timed out" } put_error_codes = [1135, 1136, 1137, 1141, 1152, 1181] diff --git a/pilot/control/data.py b/pilot/control/data.py index 2441e77a9..d0f33e979 100644 --- a/pilot/control/data.py +++ b/pilot/control/data.py @@ -7,7 +7,7 @@ # Authors: # - Mario Lassnig, mario.lassnig@cern.ch, 2016-2017 # - Daniel Drizhuk, d.drizhuk@gmail.com, 2017 -# - Paul Nilsson, paul.nilsson@cern.ch, 2017-2022 +# - Paul Nilsson, paul.nilsson@cern.ch, 2017-2023 # - Wen Guan, wen.guan@cern.ch, 2018 # - Alexey Anisenkov, anisyonk@cern.ch, 2018 @@ -18,21 +18,53 @@ import queue from typing import Any -from pilot.api.data import StageInClient, StageOutClient +from pilot.api.data import ( + StageInClient, + StageOutClient +) from pilot.api.es_data import StageInESClient from pilot.control.job import send_state from pilot.common.errorcodes import ErrorCodes -from pilot.common.exception import ExcThread, PilotException, LogFileCreationFailure, NoSuchFile, FileHandlingFailure -#from pilot.util.config import config -from pilot.util.auxiliary import set_pilot_state, check_for_final_server_update #, abort_jobs_in_queues +from pilot.common.exception import ( + ExcThread, + PilotException, + LogFileCreationFailure, + NoSuchFile, + FileHandlingFailure +) +from pilot.util.auxiliary import ( + set_pilot_state, + check_for_final_server_update +) from pilot.util.common import should_abort from pilot.util.config import config -from pilot.util.constants import PILOT_PRE_STAGEIN, PILOT_POST_STAGEIN, PILOT_PRE_STAGEOUT, PILOT_POST_STAGEOUT, LOG_TRANSFER_IN_PROGRESS,\ - LOG_TRANSFER_DONE, LOG_TRANSFER_NOT_DONE, LOG_TRANSFER_FAILED, SERVER_UPDATE_RUNNING, MAX_KILL_WAIT_TIME, UTILITY_BEFORE_STAGEIN +from pilot.util.constants import ( + PILOT_PRE_STAGEIN, + PILOT_POST_STAGEIN, + PILOT_PRE_STAGEOUT, + PILOT_POST_STAGEOUT, + PILOT_PRE_LOG_TAR, + PILOT_POST_LOG_TAR, + LOG_TRANSFER_IN_PROGRESS, + LOG_TRANSFER_DONE, + LOG_TRANSFER_NOT_DONE, + LOG_TRANSFER_FAILED, + SERVER_UPDATE_RUNNING, + MAX_KILL_WAIT_TIME, + UTILITY_BEFORE_STAGEIN +) from pilot.util.container import execute -from pilot.util.filehandling import remove, write_file, copy +from pilot.util.filehandling import ( + remove, + write_file, + copy, + get_directory_size +) from pilot.util.processes import threads_aborted -from pilot.util.queuehandling import declare_failed_by_kill, put_in_queue +from pilot.util.queuehandling import ( + declare_failed_by_kill, + put_in_queue +) from pilot.util.timing import add_to_pilot_timing from pilot.util.tracereport import TraceReport import pilot.util.middleware @@ -592,6 +624,9 @@ def copytool_in(queues, traces, args): # noqa: C901 # logger.info(f"job {job.jobid} has finished") # put_in_queue(job, queues.finished_jobs) + # this job is now to be monitored, so add it to the monitored_payloads queue + put_in_queue(job, queues.monitored_payloads) + logger.info('stage-in thread is no longer needed - terminating') abort = True break @@ -788,18 +823,18 @@ def create_log(workdir, logfile_name, tarball_name, cleanup, input_files=[], out # perform special cleanup (user specific) prior to log file creation if cleanup: pilot_user = os.environ.get('PILOT_USER', 'generic').lower() - user = __import__('pilot.user.%s.common' % pilot_user, globals(), locals(), [pilot_user], 0) # Python 2/3 + user = __import__(f'pilot.user.{pilot_user}.common', globals(), locals(), [pilot_user], 0) # Python 2/3 user.remove_redundant_files(workdir, piloterrors=piloterrors, debugmode=debugmode) # remove any present input/output files before tarring up workdir for fname in input_files + output_files: path = os.path.join(workdir, fname) if os.path.exists(path): - logger.info('removing file: %s', path) + logger.info(f'removing file: {path}') remove(path) if logfile_name is None or len(logfile_name.strip('/ ')) == 0: - logger.info('Skipping tarball creation, since the logfile_name is empty') + logger.info('skipping tarball creation, since the logfile_name is empty') return # rename the workdir for the tarball creation @@ -808,21 +843,34 @@ def create_log(workdir, logfile_name, tarball_name, cleanup, input_files=[], out os.rename(workdir, newworkdir) workdir = newworkdir + # get the size of the workdir + dirsize = get_directory_size(workdir) + timeout = get_tar_timeout(dirsize) + fullpath = os.path.join(current_dir, logfile_name) # /some/path/to/dirname/log.tgz - logger.info('will create archive %s', fullpath) + logger.info(f'will create archive {fullpath} using timeout={timeout} s for directory size={dirsize} MB') + try: - cmd = "pwd;tar cvfz %s %s --dereference --one-file-system; echo $?" % (fullpath, tarball_name) - _, stdout, _ = execute(cmd) + # add e.g. sleep 200; before tar command to test time-out + cmd = f"pwd;tar cvfz {fullpath} {tarball_name} --dereference --one-file-system; echo $?" + exit_code, stdout, stderr = execute(cmd, timeout=timeout) except Exception as error: raise LogFileCreationFailure(error) else: if pilot_home != current_dir: os.chdir(pilot_home) - logger.debug('stdout = %s', stdout) + logger.debug(f'stdout: {stdout}') try: os.rename(workdir, orgworkdir) - except Exception as error: - logger.debug('exception caught when renaming workdir: %s', error) + except OSError as error: + logger.debug(f'exception caught when renaming workdir: {error} (ignore)') + + if exit_code: + diagnostics = f'tarball creation failed with exit code: {exit_code}, stdout={stdout}, stderr={stderr}' + logger.warning(diagnostics) + if exit_code == errors.COMMANDTIMEDOUT: + exit_code = errors.LOGCREATIONTIMEOUT + raise PilotException(diagnostics, code=exit_code) # final step, copy the log file into the workdir - otherwise containerized stage-out won't work try: @@ -831,6 +879,22 @@ def create_log(workdir, logfile_name, tarball_name, cleanup, input_files=[], out logger.warning(f'caught exception when copying tarball: {exc}') +def get_tar_timeout(dirsize: float) -> int: + """ + Get a proper time-out limit based on the directory size. + It should also handle the case dirsize=None and return the max timeout. + + :param dirsize: directory size (float). + :return: time-out in seconds (int). + """ + + timeout_max = 3 * 3600 # 3 hours + timeout_min = 30 + timeout = timeout_min + int(60.0 + dirsize / 5.0) if dirsize else timeout_max + + return min(timeout, timeout_max) + + def _do_stageout(job, xdata, activity, queue, title, output_dir='', rucio_host='', ipv='IPv6'): """ Use the `StageOutClient` in the Data API to perform stage-out. @@ -965,18 +1029,34 @@ def _stage_out_new(job: Any, args: Any) -> bool: job.status['LOG_TRANSFER'] = LOG_TRANSFER_IN_PROGRESS logfile = job.logdata[0] + # write time stamps to pilot timing file + current_time = time.time() + add_to_pilot_timing(job.jobid, PILOT_PRE_LOG_TAR, current_time, args) + try: tarball_name = f'tarball_PandaJob_{job.jobid}_{job.infosys.pandaqueue}' - input_files = [fspec.lfn for fspec in job.indata] - output_files = [fspec.lfn for fspec in job.outdata] create_log(job.workdir, logfile.lfn, tarball_name, args.cleanup, - input_files=input_files, output_files=output_files, + input_files=[fspec.lfn for fspec in job.indata], + output_files=[fspec.lfn for fspec in job.outdata], piloterrors=job.piloterrorcodes, debugmode=job.debug) except LogFileCreationFailure as error: logger.warning(f'failed to create tar file: {error}') set_pilot_state(job=job, state="failed") job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.LOGFILECREATIONFAILURE) return False + except PilotException as error: + logger.warning(f'failed to create tar file: {error}') + set_pilot_state(job=job, state="failed") + if 'timed out' in error.get_detail(): + delta = int(time.time() - current_time) + msg = f'tar command for log file creation timed out after {delta} s: {error.get_detail()}' + else: + msg = error.get_detail() + job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(error.get_error_code(), msg=msg) + return False + + # write time stamps to pilot timing file + add_to_pilot_timing(job.jobid, PILOT_POST_LOG_TAR, time.time(), args) if not _do_stageout(job, [logfile], ['pl', 'pw', 'w'], args.queue, title='log', output_dir=args.output_dir, rucio_host=args.rucio_host, ipv=args.internet_protocol_version): @@ -993,16 +1073,7 @@ def _stage_out_new(job: Any, args: Any) -> bool: add_to_pilot_timing(job.jobid, PILOT_POST_STAGEOUT, time.time(), args) # generate fileinfo details to be sent to Panda - fileinfo = {} - checksum_type = config.File.checksum_type if config.File.checksum_type == 'adler32' else 'md5sum' - for iofile in job.outdata + job.logdata: - if iofile.status in ['transferred']: - fileinfo[iofile.lfn] = {'guid': iofile.guid, - 'fsize': iofile.filesize, - f'{checksum_type}': iofile.checksum.get(config.File.checksum_type), - 'surl': iofile.turl} - - job.fileinfo = fileinfo + job.fileinfo = generate_fileinfo(job) # WARNING THE FOLLOWING RESETS ANY PREVIOUS STAGEOUT ERRORS if not is_success: @@ -1018,12 +1089,28 @@ def _stage_out_new(job: Any, args: Any) -> bool: logger.debug(f'changing job state from {job.state} to finished') set_pilot_state(job=job, state="finished") - # send final server update since all transfers have finished correctly - # send_state(job, args, 'finished', xml=dumps(fileinfodict)) - return is_success +def generate_fileinfo(job): + """ + Generate fileinfo details to be sent to Panda. + + :param job: job object. + """ + + fileinfo = {} + checksum_type = config.File.checksum_type if config.File.checksum_type == 'adler32' else 'md5sum' + for iofile in job.outdata + job.logdata: + if iofile.status in ['transferred']: + fileinfo[iofile.lfn] = {'guid': iofile.guid, + 'fsize': iofile.filesize, + f'{checksum_type}': iofile.checksum.get(config.File.checksum_type), + 'surl': iofile.turl} + + return fileinfo + + def queue_monitoring(queues, traces, args): """ Monitoring of Data queues. diff --git a/pilot/control/job.py b/pilot/control/job.py index 6de5cdefe..139859e87 100644 --- a/pilot/control/job.py +++ b/pilot/control/job.py @@ -16,7 +16,6 @@ import time import hashlib import logging -import multiprocessing import queue from collections import namedtuple @@ -48,7 +47,7 @@ from pilot.util.middleware import containerise_general_command from pilot.util.monitoring import job_monitor_tasks, check_local_space from pilot.util.monitoringtime import MonitoringTime -from pilot.util.processes import cleanup, threads_aborted, kill_process, kill_processes +from pilot.util.processes import cleanup, threads_aborted, kill_process, kill_processes, kill_defunct_children from pilot.util.proxy import get_distinguished_name from pilot.util.queuehandling import scan_for_jobs, put_in_queue, queue_report, purge_queue from pilot.util.realtimelogger import cleanup as rtcleanup @@ -517,8 +516,12 @@ def handle_backchannel_command(res, job, args, test_tobekilled=False): if not os.path.exists(job.workdir): # jobUpdate might be delayed - do not cause problems for new downloaded job logger.warning(f'job.workdir ({job.workdir}) does not exist - ignore kill instruction') return - set_pilot_state(job=job, state="failed") - job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.PANDAKILL) + if args.workflow == 'stager': + logger.info('will set interactive job to finished (server will override this, but harvester will not)') + set_pilot_state(job=job, state="finished") # this will let pilot finish naturally and report exit code 0 to harvester + else: + set_pilot_state(job=job, state="failed") + job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.PANDAKILL) if job.pid: logger.debug('killing payload process') kill_process(job.pid) @@ -866,11 +869,10 @@ def add_timing_and_extracts(data, job, state, args): :return: """ - time_getjob, time_stagein, time_payload, time_stageout, time_initial_setup, time_setup = timing_report(job.jobid, args) - #data['pilotTiming'] = "%s|%s|%s|%s|%s" % \ - # (time_getjob, time_stagein, time_payload, time_stageout, time_initial_setup + time_setup) + time_getjob, time_stagein, time_payload, time_stageout, time_initial_setup, time_setup, time_log_creation = timing_report(job.jobid, args) data['pilotTiming'] = "%s|%s|%s|%s|%s|%s" % \ (time_getjob, time_stagein, time_payload, time_stageout, time_initial_setup, time_setup) + logger.debug(f'could have reported time_log_creation={time_log_creation} s') # add log extracts (for failed/holding jobs or for jobs with outbound connections) extracts = "" @@ -1652,6 +1654,9 @@ def get_message_from_mb(args): logger.debug('will not start ActiveMQ since graceful_stop is set') return None + # do not put this import at the top since it can possibly interfere with some modules (esp. Google Cloud Logging modules) + import multiprocessing + ctx = multiprocessing.get_context('spawn') message_queue = ctx.Queue() #amq_queue = ctx.Queue() @@ -2025,7 +2030,9 @@ def retrieve(queues, traces, args): # noqa: C901 add_to_pilot_timing(job.jobid, PILOT_POST_GETJOB, time.time(), args) # for debugging on HTCondor purposes, set special env var - htcondor_envvar(job.jobid, job.processingtype) + # (only proceed if there is a condor class ad) + if os.environ.get('_CONDOR_JOB_AD', None): + htcondor_envvar(job.jobid) # add the job definition to the jobs queue and increase the job counter, # and wait until the job has finished @@ -2034,6 +2041,9 @@ def retrieve(queues, traces, args): # noqa: C901 jobnumber += 1 while not args.graceful_stop.is_set(): if has_job_completed(queues, args): + # make sure there are no lingering defunct subprocesses + kill_defunct_children(job.pid) + # purge queue(s) that retains job object set_pilot_state(state='') purge_queue(queues.finished_data_in) @@ -2062,24 +2072,21 @@ def retrieve(queues, traces, args): # noqa: C901 logger.info('[job] retrieve thread has finished') -def htcondor_envvar(jobid, processingtype): +def htcondor_envvar(jobid): """ - On HTCondor nodes, set special env var (HTCondor_JOB_ID) for debugging Lustre. + On HTCondor nodes, set special env var (HTCondor_PANDA) for debugging Lustre. :param jobid: PanDA job id (string) - :param processingtype: PanDA processing type (string) :return: """ - # only proceed if there is a condor class ad - if os.environ.get('_CONDOR_JOB_AD', None): - try: - globaljobid = encode_globaljobid(jobid, processingtype) - if globaljobid: - os.environ['HTCondor_JOB_ID'] = globaljobid - logger.info(f'set env var HTCondor_JOB_ID={globaljobid}') - except Exception as exc: - logger.warning(f'caught exception: {exc}') + try: + globaljobid = encode_globaljobid(jobid) + if globaljobid: + os.environ['HTCondor_Job_ID'] = globaljobid + logger.info(f'set env var HTCondor_Job_ID={globaljobid}') + except Exception as exc: + logger.warning(f'caught exception: {exc}') def handle_proxy(job): @@ -2203,7 +2210,7 @@ def has_job_completed(queues, args): job.prodproxy = '' # cleanup of any remaining processes - if job.pid: + if job.pid and job.pid not in job.zombies: job.zombies.append(job.pid) cleanup(job, args) @@ -2767,10 +2774,12 @@ def job_monitor(queues, traces, args): # noqa: C901 peeking_time = start_time update_time = peeking_time + # keep track of jobs we don't want to continue monitoring + # no_monitoring = {} # { job:id: time.time(), .. } + # overall loop counter (ignoring the fact that more than one job may be running) n = 0 cont = True - first = True while cont: # abort in case graceful_stop has been set, and less than 30 s has passed since MAXTIME was reached (if set) @@ -2815,37 +2824,22 @@ def job_monitor(queues, traces, args): # noqa: C901 # sleep for a while if stage-in has not completed time.sleep(1) continue - elif not queues.finished_data_in.empty(): - # stage-in has finished, or there were no input files to begin with, job object ends up in finished_data_in queue - if args.workflow == 'stager': - if first: - logger.debug('stage-in finished - waiting for lease time to finish') - first = False - if args.pod: - # wait maximum args.leasetime seconds, then abort - time.sleep(10) - time_now = int(time.time()) - if time_now - start_time >= args.leasetime: - logger.warning(f'lease time is up: {time_now - start_time} s has passed since start - abort stager pilot') - jobs[i].stageout = 'log' # only stage-out log file - put_in_queue(jobs[i], queues.data_out) - #args.graceful_stop.set() - else: - continue - else: - continue - - if args.workflow == 'stager': - logger.debug('stage-in has finished - no need for job_monitor to continue') - break # peek at the jobs in the validated_jobs queue and send the running ones to the heartbeat function - jobs = queues.monitored_payloads.queue if args.workflow != 'stager' else None + jobs = queues.monitored_payloads.queue if jobs: # update the peeking time peeking_time = int(time.time()) + + # stop_monitoring = False # continue with the main loop (while cont) for i in range(len(jobs)): current_id = jobs[i].jobid + #if current_id in no_monitoring: + # stop_monitoring = True + # delta = peeking_time - no_monitoring.get(current_id, 0) + # if delta > 60: + # logger.warning(f'job monitoring has waited {delta} s for job {current_id} to finish - aborting') + # break error_code = None if abort_job and args.signal: @@ -2873,13 +2867,21 @@ def job_monitor(queues, traces, args): # noqa: C901 logger.info('monitor loop #%d: job %d:%s is in state \'%s\'', n, i, current_id, jobs[i].state) if jobs[i].state == 'finished' or jobs[i].state == 'failed': logger.info('will abort job monitoring soon since job state=%s (job is still in queue)', jobs[i].state) - # abort = True - do not set abort here as it will abort the entire thread, not just the current monitor loop + if args.workflow == 'stager': # abort interactive stager pilot, this will trigger an abort of all threads + set_pilot_state(job=jobs[i], state="finished") + logger.info('ordering log transfer') + jobs[i].stageout = 'log' # only stage-out log file + put_in_queue(jobs[i], queues.data_out) + cont = False + # no_monitoring[current_id] = int(time.time()) break # perform the monitoring tasks exit_code, diagnostics = job_monitor_tasks(jobs[i], mt, args) logger.debug(f'job_monitor_tasks returned {exit_code}, {diagnostics}') if exit_code != 0: + # do a quick server update with the error diagnostics only + preliminary_server_update(jobs[i], args, diagnostics) if exit_code == errors.VOMSPROXYABOUTTOEXPIRE: # attempt to download a new proxy since it is about to expire ec = download_new_proxy(role='production') @@ -2889,6 +2891,11 @@ def job_monitor(queues, traces, args): # noqa: C901 logger.debug('killing payload process') kill_process(jobs[i].pid) break + elif exit_code == errors.LEASETIME: # stager mode, order log stage-out + set_pilot_state(job=jobs[i], state="finished") + logger.info('ordering log transfer') + jobs[i].stageout = 'log' # only stage-out log file + put_in_queue(jobs[i], queues.data_out) elif exit_code == 0: # ie if download of new proxy was successful diagnostics = "" @@ -2924,6 +2931,9 @@ def job_monitor(queues, traces, args): # noqa: C901 #abort = True break + #if stop_monitoring: + # continue + elif os.environ.get('PILOT_JOB_STATE') == 'stagein': logger.info('job monitoring is waiting for stage-in to finish') #else: @@ -2947,6 +2957,28 @@ def job_monitor(queues, traces, args): # noqa: C901 logger.info('[job] job monitor thread has finished') +def preliminary_server_update(job, args, diagnostics): + """ + Send a quick job update to the server (do not send any error code yet) for a failed job. + + :param job: job object + :param args: args object + :param diagnostics: error diagnostics (string). + """ + + logger.debug(f'could have sent diagnostics={diagnostics}') + piloterrorcode = job.piloterrorcode + piloterrorcodes = job.piloterrorcodes + piloterrordiags = job.piloterrordiags + job.piloterrorcode = 0 + job.piloterrorcodes = [] + job.piloterrordiags = [diagnostics] + send_state(job, args, 'running') + job.piloterrorcode = piloterrorcode + job.piloterrorcodes = piloterrorcodes + job.piloterrordiags = piloterrordiags + + def get_signal_error(sig): """ Return a corresponding pilot error code for the given signal. diff --git a/pilot/control/payload.py b/pilot/control/payload.py index 08a77c811..ba7064936 100644 --- a/pilot/control/payload.py +++ b/pilot/control/payload.py @@ -339,6 +339,25 @@ def get_transport(catchall): return transport +def get_rtlogging(): + """ + Return the proper rtlogging value from the experiment specific plug-in or the config file. + + :return: rtlogging (str). + """ + + rtlogging = None + pilot_user = os.environ.get('PILOT_USER', 'generic').lower() + try: + user = __import__('pilot.user.%s.common' % pilot_user, globals(), locals(), [pilot_user], 0) + rtlogging = user.get_rtlogging() + except Exception as exc: + rtlogging = config.Pilot.rtlogging + logger.warning(f'found no experiment specific rtlogging, using config value ({rtlogging}): {exc}') + + return rtlogging + + def get_logging_info(job, args): """ Extract the logging type/protocol/url/port from catchall if present, or from args fields. @@ -365,7 +384,7 @@ def get_logging_info(job, args): logserver = args.realtime_logging_server if args.realtime_logging_server else "" pattern = r'(\S+)\;(\S+)\:\/\/(\S+)\:(\d+)' - info = findall(pattern, config.Pilot.rtlogging) + info = findall(pattern, get_rtlogging()) if not logserver and not info: logger.warning('not enough info available for activating real-time logging') diff --git a/pilot/control/payloads/generic.py b/pilot/control/payloads/generic.py index b6f828d0d..285d98a0f 100644 --- a/pilot/control/payloads/generic.py +++ b/pilot/control/payloads/generic.py @@ -18,7 +18,7 @@ from pilot.common.errorcodes import ErrorCodes from pilot.control.job import send_state -from pilot.util.auxiliary import set_pilot_state, show_memory_usage +from pilot.util.auxiliary import set_pilot_state #, show_memory_usage from pilot.util.config import config from pilot.util.container import execute from pilot.util.constants import UTILITY_BEFORE_PAYLOAD, UTILITY_WITH_PAYLOAD, UTILITY_AFTER_PAYLOAD_STARTED, \ @@ -551,7 +551,7 @@ def get_payload_command(self, job): try: pilot_user = os.environ.get('PILOT_USER', 'generic').lower() user = __import__('pilot.user.%s.common' % pilot_user, globals(), locals(), [pilot_user], 0) - cmd = user.get_payload_command(job) #+ 'sleep 1000' # to test looping jobs + cmd = user.get_payload_command(job) # + 'sleep 900' # to test looping jobs except PilotException as error: self.post_setup(job) import traceback @@ -682,8 +682,8 @@ def run(self): # noqa: C901 logger.info('payload iteration loop #%d', iteration + 1) os.environ['PILOT_EXEC_ITERATION_COUNT'] = '%s' % iteration - if self.__args.debug: - show_memory_usage() + #if self.__args.debug: + # show_memory_usage() # first run the preprocess (if necessary) - note: this might update jobparams -> must update cmd jobparams_pre = self.__job.jobparams @@ -826,20 +826,49 @@ def stop_utilities(self): """ pilot_user = os.environ.get('PILOT_USER', 'generic').lower() + user = __import__('pilot.user.%s.common' % pilot_user, globals(), locals(), [pilot_user], 0) for utcmd in list(self.__job.utilities.keys()): utproc = self.__job.utilities[utcmd][0] if utproc: - user = __import__('pilot.user.%s.common' % pilot_user, globals(), locals(), [pilot_user], 0) - sig = user.get_utility_command_kill_signal(utcmd) - logger.info("stopping process \'%s\' with signal %d", utcmd, sig) - try: - os.killpg(os.getpgid(utproc.pid), sig) - except Exception as error: - logger.warning('exception caught: %s (ignoring)', error) - + status = self.kill_and_wait_for_process(utproc.pid, user, utcmd) + if status == 0: + logger.info(f'cleaned up after prmon process {utproc.pid}') + else: + logger.warning(f'failed to cleanup prmon process {utproc.pid} (abnormal exit status: {status})') user.post_utility_command_action(utcmd, self.__job) + def kill_and_wait_for_process(self, pid, user, utcmd): + """ + Kill utility process and wait for it to finish. + + :param pid: process id (int) + :param user: pilot user/experiment (str) + :param utcmd: utility command (str) + :return: process exit status (int, None). + """ + + sig = user.get_utility_command_kill_signal(utcmd) + logger.info("stopping process \'%s\' with signal %d", utcmd, sig) + + try: + # Send SIGUSR1 signal to the process group + os.killpg(pid, sig) + + # Wait for the process to finish + _, status = os.waitpid(pid, 0) + + # Check the exit status of the process + if os.WIFEXITED(status): + return os.WEXITSTATUS(status) + else: + # Handle abnormal termination if needed + return None + except OSError as exc: + # Handle errors, such as process not found + logger.warning(f"exception caught: {exc}") + return None + def rename_log_files(self, iteration): """ diff --git a/pilot/copytool/s3.py b/pilot/copytool/s3.py index a0a480bc9..d79e2008e 100644 --- a/pilot/copytool/s3.py +++ b/pilot/copytool/s3.py @@ -16,27 +16,53 @@ except Exception: pass +from glob import glob +from urllib.parse import urlparse + from .common import resolve_common_transfer_errors from pilot.common.errorcodes import ErrorCodes from pilot.common.exception import PilotException +from pilot.info import infosys +from pilot.util.config import config from pilot.util.ruciopath import get_rucio_path logger = logging.getLogger(__name__) errors = ErrorCodes() -require_replicas = False ## indicates if given copytool requires input replicas to be resolved -require_input_protocols = True ## indicates if given copytool requires input protocols and manual generation of input replicas -require_protocols = True ## indicates if given copytool requires protocols to be resolved first for stage-out +require_replicas = False # indicates if given copytool requires input replicas to be resolved +require_input_protocols = True # indicates if given copytool requires input protocols and manual generation of input replicas +require_protocols = True # indicates if given copytool requires protocols to be resolved first for stage-out allowed_schemas = ['srm', 'gsiftp', 'https', 'davs', 'root', 's3', 's3+rucio'] def is_valid_for_copy_in(files): - return True ## FIX ME LATER + return True # FIX ME LATER def is_valid_for_copy_out(files): - return True ## FIX ME LATER + return True # FIX ME LATER + + +def get_pilot_s3_profile(): + return os.environ.get("PANDA_PILOT_AWS_PROFILE", None) + + +def get_copy_out_extend(): + return os.environ.get("PANDA_PILOT_COPY_OUT_EXTEND", None) + + +def get_endpoint_bucket_key(surl): + parsed = urlparse(surl) + endpoint = parsed.scheme + '://' + parsed.netloc + full_path = parsed.path + while "//" in full_path: + full_path = full_path.replace('//', '/') + + parts = full_path.split('/') + bucket = parts[1] + key = '/'.join(parts[2:]) + return endpoint, bucket, key def resolve_surl(fspec, protocol, ddmconf, **kwargs): @@ -49,6 +75,13 @@ def resolve_surl(fspec, protocol, ddmconf, **kwargs): :param fspec: file spec data :return: dictionary {'surl': surl} """ + try: + pandaqueue = infosys.pandaqueue + except Exception: + pandaqueue = "" + if pandaqueue is None: + pandaqueue = "" + ddm = ddmconf.get(fspec.ddmendpoint) if not ddm: raise PilotException('failed to resolve ddmendpoint by name=%s' % fspec.ddmendpoint) @@ -56,11 +89,27 @@ def resolve_surl(fspec, protocol, ddmconf, **kwargs): if ddm.is_deterministic: surl = protocol.get('endpoint', '') + os.path.join(protocol.get('path', ''), get_rucio_path(fspec.scope, fspec.lfn)) elif ddm.type in ['OS_ES', 'OS_LOGS']: - surl = protocol.get('endpoint', '') + os.path.join(protocol.get('path', ''), fspec.lfn) + try: + pandaqueue = infosys.pandaqueue + except Exception: + pandaqueue = "" + if pandaqueue is None: + pandaqueue = "" + + dataset = fspec.dataset + if dataset: + dataset = dataset.replace("#{pandaid}", os.environ['PANDAID']) + else: + dataset = "" + + remote_path = os.path.join(protocol.get('path', ''), pandaqueue, dataset) + surl = protocol.get('endpoint', '') + remote_path + fspec.protocol_id = protocol.get('id') else: raise PilotException('resolve_surl(): Failed to construct SURL for non deterministic ddm=%s: NOT IMPLEMENTED', fspec.ddmendpoint) + logger.info('resolve_surl, surl: %s', surl) # example: # protocol = {u'path': u'/atlas-eventservice', u'endpoint': u's3://s3.cern.ch:443/', u'flavour': u'AWS-S3-SSL', u'id': 175} # surl = 's3://s3.cern.ch:443//atlas-eventservice/EventService_premerge_24706191-5013009653-24039149400-322-5.tar' @@ -79,12 +128,12 @@ def copy_in(files, **kwargs): dst = fspec.workdir or kwargs.get('workdir') or '.' - bucket = 'bucket' # UPDATE ME + # bucket = 'bucket' # UPDATE ME path = os.path.join(dst, fspec.lfn) - logger.info('downloading object %s from bucket=%s to local file %s', fspec.lfn, bucket, path) - status, diagnostics = download_file(path, bucket, object_name=fspec.lfn) + logger.info('downloading surl %s to local file %s', fspec.surl, path) + status, diagnostics = download_file(path, fspec.surl) - if not status: ## an error occurred + if not status: # an error occurred error = resolve_common_transfer_errors(diagnostics, is_stagein=True) fspec.status = 'failed' fspec.status_code = error.get('rcode') @@ -96,22 +145,21 @@ def copy_in(files, **kwargs): return files -def download_file(path, bucket, object_name=None): +def download_file(path, surl, object_name=None): """ Download a file from an S3 bucket. :param path: Path to local file after download (string). - :param bucket: Bucket to download from. + :param surl: Source url to download from. :param object_name: S3 object name. If not specified then file_name from path is used. :return: True if file was uploaded (else False), diagnostics (string). """ - # if S3 object_name was not specified, use file name from path - if object_name is None: - object_name = os.path.basename(path) - try: - s3 = boto3.client('s3') + endpoint, bucket, object_name = get_endpoint_bucket_key(surl) + session = boto3.Session(profile_name=get_pilot_s3_profile()) + # s3 = boto3.client('s3') + s3 = session.client('s3', endpoint_url=endpoint) s3.download_file(bucket, object_name, path) except ClientError as error: diagnostics = 'S3 ClientError: %s' % error @@ -125,6 +173,62 @@ def download_file(path, bucket, object_name=None): return True, "" +def copy_out_extend(files, **kwargs): + """ + Upload given files to S3 storage. + + :param files: list of `FileSpec` objects + :raise: PilotException in case of controlled error + """ + + workdir = kwargs.pop('workdir') + + for fspec in files: + + # path = os.path.join(workdir, fspec.lfn) + logger.info('uploading %s to fspec.turl %s', workdir, fspec.turl) + + logfiles = [] + lfn = fspec.lfn.strip() + if lfn == '/' or lfn.endswith("log.tgz"): + # ["pilotlog.txt", "payload.stdout", "payload.stderr"]: + logfiles += glob(workdir + '/payload*.*') + logfiles += glob(workdir + '/memory_monitor*.*') + # if lfn.find('/') < 0: + # lfn_path = os.path.join(workdir, lfn) + # if os.path.exists(lfn_path) and lfn_path not in logfiles: + # logfiles += [lfn_path] + logfiles += glob(workdir + '/pilotlog*.*') + else: + logfiles = [os.path.join(workdir, lfn)] + + for path in logfiles: + logfile = os.path.basename(path) + if os.path.exists(path): + full_url = os.path.join(fspec.turl, logfile) + logger.info('uploading %s to%s', path, full_url) + status, diagnostics = upload_file(path, full_url) + + if not status: # an error occurred + # create new error code(s) in ErrorCodes.py and set it/them in resolve_common_transfer_errors() + error = resolve_common_transfer_errors(diagnostics, is_stagein=False) + fspec.status = 'failed' + fspec.status_code = error.get('rcode') + raise PilotException(error.get('error'), code=error.get('rcode'), state=error.get('state')) + else: + diagnostics = 'local output file does not exist: %s' % path + logger.warning(diagnostics) + fspec.status = 'failed' + fspec.status_code = errors.STAGEOUTFAILED + # raise PilotException(diagnostics, code=fspec.status_code, state=fspec.status) + + if fspec.status is None: + fspec.status = 'transferred' + fspec.status_code = 0 + + return files + + def copy_out(files, **kwargs): """ Upload given files to S3 storage. @@ -133,17 +237,21 @@ def copy_out(files, **kwargs): :raise: PilotException in case of controlled error """ + if get_copy_out_extend(): + return copy_out_extend(files, **kwargs) + workdir = kwargs.pop('workdir') for fspec in files: path = os.path.join(workdir, fspec.lfn) if os.path.exists(path): - bucket = 'bucket' # UPDATE ME - logger.info('uploading %s to bucket=%s using object name=%s', path, bucket, fspec.lfn) - status, diagnostics = upload_file(path, bucket, object_name=fspec.lfn) + # bucket = 'bucket' # UPDATE ME + logger.info('uploading %s to fspec.turl %s', path, fspec.turl) + full_url = os.path.join(fspec.turl, fspec.lfn) + status, diagnostics = upload_file(path, full_url) - if not status: ## an error occurred + if not status: # an error occurred # create new error code(s) in ErrorCodes.py and set it/them in resolve_common_transfer_errors() error = resolve_common_transfer_errors(diagnostics, is_stagein=False) fspec.status = 'failed' @@ -162,25 +270,27 @@ def copy_out(files, **kwargs): return files -def upload_file(file_name, bucket, object_name=None): +def upload_file(file_name, full_url, object_name=None): """ Upload a file to an S3 bucket. :param file_name: File to upload. - :param bucket: Bucket to upload to. + :param turl: Target url to upload to. :param object_name: S3 object name. If not specified then file_name is used. :return: True if file was uploaded (else False), diagnostics (string). """ - # if S3 object_name was not specified, use file_name - if object_name is None: - object_name = file_name - # upload the file try: - s3_client = boto3.client('s3') - #response = s3_client.upload_file(file_name, bucket, object_name) + # s3_client = boto3.client('s3') + endpoint, bucket, object_name = get_endpoint_bucket_key(full_url) + session = boto3.Session(profile_name=get_pilot_s3_profile()) + s3_client = session.client('s3', endpoint_url=endpoint) + # response = s3_client.upload_file(file_name, bucket, object_name) s3_client.upload_file(file_name, bucket, object_name) + if object_name.endswith(config.Pilot.pilotlog): + os.environ['GTAG'] = full_url + logger.debug("Set envvar GTAG with the pilotLot URL=%s", full_url) except ClientError as error: diagnostics = 'S3 ClientError: %s' % error logger.critical(diagnostics) diff --git a/pilot/info/jobdata.py b/pilot/info/jobdata.py index 2c283f38d..d78cff81a 100644 --- a/pilot/info/jobdata.py +++ b/pilot/info/jobdata.py @@ -975,7 +975,7 @@ def collect_zombies(self, depth=None): depth -= 1 for zombie in self.zombies: try: - logger.info(f"zombie collector trying to kill pid {zombie}") + logger.info(f"zombie collector waiting for pid {zombie}") _id, _ = os.waitpid(zombie, os.WNOHANG) except OSError as exc: logger.info(f"harmless exception when collecting zombies: {exc}") diff --git a/pilot/user/atlas/common.py b/pilot/user/atlas/common.py index f7816b83d..0ef100270 100644 --- a/pilot/user/atlas/common.py +++ b/pilot/user/atlas/common.py @@ -1108,6 +1108,10 @@ def update_job_data(job): # extract output files from the job report if required, in case the trf # has created additional (overflow) files. Also make sure all guids are # assigned (use job report value if present, otherwise generate the guid) + is_raythena = os.environ.get('PILOT_ES_EXECUTOR_TYPE', 'generic') == 'raythena' + if is_raythena: + return + if job.metadata and not job.is_eventservice: # keep this for now, complicated to merge with verify_output_files? extract_output_file_guids(job) @@ -2619,8 +2623,7 @@ def get_utility_command_kill_signal(name): """ # note that the NetworkMonitor does not require killing (to be confirmed) - sig = SIGUSR1 if name == 'MemoryMonitor' else SIGTERM - return sig + return SIGUSR1 if name == 'MemoryMonitor' else SIGTERM def get_utility_command_output_filename(name, selector=None): @@ -2632,12 +2635,7 @@ def get_utility_command_output_filename(name, selector=None): :return: filename (string). """ - if name == 'MemoryMonitor': - filename = get_memory_monitor_summary_filename(selector=selector) - else: - filename = "" - - return filename + return get_memory_monitor_summary_filename(selector=selector) if name == 'MemoryMonitor' else "" def verify_lfn_length(outdata): diff --git a/pilot/user/atlas/diagnose.py b/pilot/user/atlas/diagnose.py index 031278a05..14af95bf5 100644 --- a/pilot/user/atlas/diagnose.py +++ b/pilot/user/atlas/diagnose.py @@ -5,7 +5,7 @@ # http://www.apache.org/licenses/LICENSE-2.0 # # Authors: -# - Paul Nilsson, paul.nilsson@cern.ch, 2018-2022 +# - Paul Nilsson, paul.nilsson@cern.ch, 2018-2023 import json import os @@ -14,9 +14,9 @@ from glob import glob from pilot.common.errorcodes import ErrorCodes -from pilot.common.exception import PilotException, BadXML +from pilot.common.exception import PilotException, BadXML, FileHandlingFailure, NoSuchFile from pilot.util.config import config -from pilot.util.filehandling import get_guid, tail, grep, open_file, read_file, scan_file #, write_file +from pilot.util.filehandling import get_guid, tail, grep, open_file, read_file, scan_file, write_json, copy from pilot.util.math import convert_mb_to_b from pilot.util.workernode import get_local_disk_space @@ -577,11 +577,15 @@ def process_job_report(job): # get the metadata from the xml file instead, which must exist for most production transforms process_metadata_from_xml(job) else: + _metadata = {} # used to overwrite original metadata file in case of changes with open(path) as data_file: # compulsory field; the payload must produce a job report (see config file for file name), attach it to the # job object job.metadata = json.load(data_file) + # truncate warnings if necessary (note: _metadata will remain unset if there are no changes) + _metadata = truncate_metadata(job.metadata) + # update job data if necessary update_job_data(job) @@ -633,6 +637,69 @@ def process_job_report(job): job.piloterrorcode = errors.BADALLOC job.piloterrordiag = diagnostics + if _metadata: + # overwrite job.metadata since it was updated and overwrite the json file + job.metadata = _metadata + overwrite_metadata(_metadata, path) + + +def truncate_metadata(job_report_dictionary): + """ + Truncate the metadata if necessary. + + This function will truncate the job.metadata if some fields are too large. This can at least happen with the 'WARNINGS' + field. + + :param job_report_dictionary: original job.metadata (dictionary) + :return: updated metadata, empty if no updates (dictionary). + """ + + _metadata = {} + + limit = 25 + if 'executor' in job_report_dictionary: + try: + warnings = job_report_dictionary['executor'][0]['logfileReport']['details']['WARNING'] + except KeyError as exc: + logger.debug(f"jobReport has no such key: {exc} (ignore)") + except (TypeError, IndexError) as exc: + logger.warning(f"caught exception (aborting jobReport scan): {exc}") + else: + if isinstance(warnings, list) and len(warnings) > limit: + job_report_dictionary['executor'][0]['logfileReport']['details']['WARNING'] = warnings[0:limit] + _metadata = job_report_dictionary + logger.warning(f'truncated jobReport WARNING field to length: {limit}') + else: + logger.warning("jobReport does not have the executor key (aborting)") + + return _metadata + + +def overwrite_metadata(metadata, path): + """ + Overwrite the original metadata with updated info. + Also make a backup of the original file. + """ + + # make a backup of the original metadata file + try: + copy(path, path + '.original') + except (IOError, FileHandlingFailure, NoSuchFile) as exc: + logger.warning(f'failed to make a backup of {path} (ignore): {exc}') + else: + logger.info(f'backed up original metadata file: {path}') + + # store the updated metadata + try: + status = write_json(path, metadata) + except (IOError, FileHandlingFailure) as exc: + logger.warning(f'caught exception while writing metadata json: {exc}') + else: + if status: + logger.info(f'overwrote {path} with updated metadata') + else: + logger.warning(f'failed to overwrite {path} with updated metadata (ignore)') + def get_frontier_details(job_report_dictionary): """ diff --git a/pilot/user/atlas/utilities.py b/pilot/user/atlas/utilities.py index 1795f8155..5dc503436 100644 --- a/pilot/user/atlas/utilities.py +++ b/pilot/user/atlas/utilities.py @@ -153,18 +153,6 @@ def get_proper_pid(pid, pgrp, jobid, command="", transformation="", outdata="", if not is_process_running(pid): return -1 - #_cmd = get_trf_command(command, transformation=transformation) - # get ps info using group id - ps = get_ps_info(pgrp) - #if dump_ps: - # logger.debug('ps:\n%s' % ps) - #logger.debug('ps:\n%s' % ps) - #logger.debug('attempting to identify pid for Singularity (v.3) runtime parent process') - #_pid = get_pid_for_command(ps, command="Singularity runtime parent") - #if _pid: - # logger.debug('discovered pid=%d for process \"%s\"' % (_pid, _cmd)) - # return _pid - i = 0 imax = 120 while i < imax: @@ -883,7 +871,10 @@ def filter_output(stdout): cmd = setup + script # CPU arch script has now been copied, time to execute it + # (reset irrelevant stderr) ec, stdout, stderr = execute(cmd) + if ec == 0 and 'RHEL9 and clone support is relatively new' in stderr: + stderr = '' if ec or stderr: logger.warning(f'ec={ec}, stdout={stdout}, stderr={stderr}') else: diff --git a/pilot/user/sphenix/common.py b/pilot/user/sphenix/common.py index dadd86aeb..094716d15 100644 --- a/pilot/user/sphenix/common.py +++ b/pilot/user/sphenix/common.py @@ -456,3 +456,23 @@ def get_pilot_id(jobid): """ return os.environ.get("GTAG", "unknown") + + +def get_rtlogging(): + """ + Return the proper rtlogging value. + + :return: rtlogging (str). + """ + + return 'logstash;http://splogstash.sdcc.bnl.gov:8080' + + +def get_rtlogging_ssl(): + """ + Return the proper ssl_enable and ssl_verify for real-time logging. + + :return: ssl_enable (bool), ssl_verify (bool) (tuple). + """ + + return False, False diff --git a/pilot/util/auxiliary.py b/pilot/util/auxiliary.py index 238e140b0..0c136ffdd 100644 --- a/pilot/util/auxiliary.py +++ b/pilot/util/auxiliary.py @@ -658,46 +658,34 @@ def sort_words(input_str: str) -> str: return output_str -def encode_globaljobid(jobid: str, processingtype: str, maxsize: int = 31) -> str: +def encode_globaljobid(jobid: str, maxsize: int = 31) -> str: """ Encode the global job id on HTCondor. To be used as an environmental variable on HTCondor nodes to facilitate debugging. Format: ::._ + NEW FORMAT: WN hostname, process and user id + Note: due to batch system restrictions, this string is limited to 31 (maxsize) characters, using the least significant characters (i.e. the left part of the string might get cut). Also, the cluster ID and process IDs are converted to hex to limit the sizes. The schedd host name is further encoded using the last digit in the host name (spce03.sdcc.bnl.gov -> spce03 -> 3). :param jobid: panda job id (string) - :param processingtype: panda processing type (string) :param maxsize: max length allowed (int) :return: encoded global job id (string). """ - def reformat(num: str, maxsize: int = 8) -> str: - # can handle clusterid=4294967297, ie larger than 0xffffffff - try: - num_hex = hex(int(num)).replace('0x', '') - if len(num_hex) > maxsize: # i.e. larger than 'ffffffff' or 'ff' - num_hex = num_hex[-maxsize:] # take the least significant bits - num_hex = '0x' + num_hex - num_int = int(num_hex, base=16) - size = "{0:0" + str(maxsize) + "x}" # e.g. "{0:08x}" - num_hex = size.format(num_int) - except (IndexError, ValueError, TypeError) as exc: - logger.warning(exc) - num_hex = "" - return num_hex - - def get_schedd_id(host: str) -> str: - # spce03.sdcc.bnl.gov -> spce03 -> 3 - try: - schedd_id = host.split('.')[0][-1] - except IndexError as exc: - logger.warning(f'failed to extract schedd from host={host}: {exc}') - schedd_id = None - return schedd_id + def get_host_name(): + # spool1462.sdcc.bnl.gov -> spool1462 + if 'PANDA_HOSTNAME' in os.environ: + host = os.environ.get('PANDA_HOSTNAME') + elif hasattr(os, 'uname'): + host = os.uname()[1] + else: + import socket + host = socket.gethostname() + return host.split('.')[0] globaljobid = get_globaljobid() if not globaljobid: @@ -705,22 +693,18 @@ def get_schedd_id(host: str) -> str: try: _globaljobid = globaljobid.split('#') - host = _globaljobid[0] + # host = _globaljobid[0] tmp = _globaljobid[1].split('.') # timestamp = _globaljobid[2] - ignore this one - clusterid = tmp[0] + # clusterid = tmp[0] processid = tmp[1] except IndexError as exc: logger.warning(exc) return "" - logger.debug(f'clusterid={clusterid}') - logger.debug(f'host name={host}') - clusterid_hex = reformat(clusterid, maxsize=8) # 00283984 - processid_hex = reformat(processid, maxsize=2) # 00 - schedd_id = get_schedd_id(host) # 3 - if clusterid_hex and processid_hex and schedd_id: - global_name = f'{jobid}:{processingtype}:{clusterid_hex}.{processid_hex}_{schedd_id}' + host_name = get_host_name() + if processid and host_name: + global_name = f'{host_name}_{processid}_{jobid}' else: global_name = '' @@ -732,3 +716,40 @@ def get_schedd_id(host: str) -> str: logger.debug(f'HTCondor: global name is within limits: {global_name} (length={len(global_name)}, max size={maxsize})') return global_name + + +def grep_str(patterns, stdout): + """ + Search for the patterns in the given stdout. + For expected large stdout, better to use FileHandling::grep() + + :param patterns: list of regexp patterns. + :param stdout: some text (string). + :return: list of matched lines in stdout. + """ + + matched_lines = [] + _pats = [] + for pattern in patterns: + _pats.append(re.compile(pattern)) + + lines = stdout.split('\n') + for line in lines: + # can the search pattern be found? + for _cp in _pats: + if re.search(_cp, line): + matched_lines.append(line) + + return matched_lines + + +class TimeoutException(Exception): + + def __init__(self, message, timeout=None, *args): + self.timeout = timeout + self.message = message + self._errorCode = 1334 + super(TimeoutException, self).__init__(*args) + + def __str__(self): + return "%s: %s, timeout=%s seconds%s" % (self.__class__.__name__, self.message, self.timeout, ' : %s' % repr(self.args) if self.args else '') diff --git a/pilot/util/constants.py b/pilot/util/constants.py index a2dd8b14a..86d7da68b 100644 --- a/pilot/util/constants.py +++ b/pilot/util/constants.py @@ -13,8 +13,8 @@ # Pilot version RELEASE = '3' # released number should be fixed at 3 for Pilot 3 VERSION = '6' # version number is '1' for first release, '0' until then, increased for bigger updates -REVISION = '4' # revision number should be reset to '0' for every new version release, increased for small updates -BUILD = '7' # build number should be reset to '1' for every new development cycle +REVISION = '5' # revision number should be reset to '0' for every new version release, increased for small updates +BUILD = '32' # build number should be reset to '1' for every new development cycle SUCCESS = 0 FAILURE = 1 @@ -45,6 +45,8 @@ PILOT_POST_PAYLOAD = 'PILOT_POST_PAYLOAD' PILOT_PRE_STAGEOUT = 'PILOT_PRE_STAGEOUT' PILOT_POST_STAGEOUT = 'PILOT_POST_STAGEOUT' +PILOT_PRE_LOG_TAR = 'PILOT_PRE_LOG_TAR' +PILOT_POST_LOG_TAR = 'PILOT_POST_LOG_TAR' PILOT_PRE_FINAL_UPDATE = 'PILOT_PRE_FINAL_UPDATE' PILOT_POST_FINAL_UPDATE = 'PILOT_POST_FINAL_UPDATE' PILOT_END_TIME = 'PILOT_END_TIME' diff --git a/pilot/util/container.py b/pilot/util/container.py index 9f1a43051..cf6e6e1f7 100644 --- a/pilot/util/container.py +++ b/pilot/util/container.py @@ -6,10 +6,13 @@ # # Authors: # - Paul Nilsson, paul.nilsson@cern.ch, 2018-2022 - +import os import subprocess import logging -from os import environ, getcwd, setpgrp, getpgid, kill #, getpgid #setsid +import shlex +import threading + +from os import environ, getcwd, getpgid, kill #, setpgrp, getpgid #setsid from time import sleep from signal import SIGTERM, SIGKILL from typing import Any @@ -21,6 +24,9 @@ logger = logging.getLogger(__name__) errors = ErrorCodes() +# Define a global lock for synchronization +execute_lock = threading.Lock() + def execute(executable: Any, **kwargs: dict) -> Any: """ @@ -34,6 +40,7 @@ def execute(executable: Any, **kwargs: dict) -> Any: usecontainer = kwargs.get('usecontainer', False) job = kwargs.get('job') + #shell = kwargs.get("shell", False) obscure = kwargs.get('obscure', '') # if this string is set, hide it in the log message # convert executable to string if it is a list @@ -55,34 +62,44 @@ def execute(executable: Any, **kwargs: dict) -> Any: if not kwargs.get('mute', False): print_executable(executable, obscure=obscure) + # always use a timeout to prevent stdout buffer problem in nodes with lots of cores + timeout = get_timeout(kwargs.get('timeout', None)) + logger.debug(f'subprocess.communicate() will use timeout={timeout} s') + exe = ['/usr/bin/python'] + executable.split() if kwargs.get('mode', 'bash') == 'python' else ['/bin/bash', '-c', executable] # try: intercept exception such as OSError -> report e.g. error.RESOURCEUNAVAILABLE: "Resource temporarily unavailable" exit_code = 0 stdout = '' stderr = '' - process = subprocess.Popen(exe, - bufsize=-1, - stdout=kwargs.get('stdout', subprocess.PIPE), - stderr=kwargs.get('stderr', subprocess.PIPE), - cwd=kwargs.get('cwd', getcwd()), - preexec_fn=setpgrp, - encoding='utf-8', - errors='replace') - if kwargs.get('returnproc', False): - return process + # Acquire the lock before creating the subprocess + with execute_lock: + process = subprocess.Popen(exe, + bufsize=-1, + stdout=kwargs.get('stdout', subprocess.PIPE), + stderr=kwargs.get('stderr', subprocess.PIPE), + cwd=kwargs.get('cwd', getcwd()), + preexec_fn=os.setsid, # setpgrp + encoding='utf-8', + errors='replace') + if kwargs.get('returnproc', False): + return process + + try: + stdout, stderr = process.communicate(timeout=timeout) + except subprocess.TimeoutExpired as exc: + # make sure that stdout buffer gets flushed - in case of time-out exceptions + flush_handler(name="stream_handler") + stderr += f'subprocess communicate sent TimeoutExpired: {exc}' + logger.warning(stderr) + exit_code = errors.COMMANDTIMEDOUT + stderr = kill_all(process, stderr) + else: + exit_code = process.poll() - try: - stdout, stderr = process.communicate(timeout=kwargs.get('timeout', None)) - except subprocess.TimeoutExpired as exc: - # make sure that stdout buffer gets flushed - in case of time-out exceptions - flush_handler(name="stream_handler") - stderr += f'subprocess communicate sent TimeoutExpired: {exc}' - logger.warning(stderr) - exit_code = errors.COMMANDTIMEDOUT - stderr = kill_all(process, stderr) - else: - exit_code = process.poll() + # wait for the process to finish + # (not strictly necessary when process.communicate() is used) + process.wait() # remove any added \n if stdout and stdout.endswith('\n'): @@ -91,6 +108,45 @@ def execute(executable: Any, **kwargs: dict) -> Any: return exit_code, stdout, stderr +def get_timeout(requested_timeout): + """ + Define the timeout to be used with subprocess.communicate(). + + If no timeout was requested by the execute() caller, a large default 10 days timeout will be returned. + It is better to give a really large timeout than no timeout at all, since the subprocess python module otherwise + can get stuck processing stdout on nodes with many cores. + + :param requested_timeout: timeout in seconds set by execute() caller (int) + :return: timeout in seconds (int). + """ + + return requested_timeout if requested_timeout else 10 * 24 * 60 * 60 # using a ridiculously large default timeout + + +def execute_command(command: str) -> str: + """ + Executes a command using subprocess without using the shell. + + :param command: The command to execute. + + :return: The output of the command (string). + """ + + try: + logger.info(f'executing command: {command}') + command = shlex.split(command) + proc = subprocess.Popen(command, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + proc.wait() + #output, err = proc.communicate() + exit_code = proc.returncode + logger.info(f'command finished with exit code: {exit_code}') + # output = subprocess.check_output(command, text=True) + except subprocess.CalledProcessError as exc: + logger.warning(f"error executing command:\n{command}\nexit code: {exc.returncode}\nStderr: {exc.stderr}") + exit_code = exc.returncode + return exit_code + + def kill_all(process: Any, stderr: str) -> str: """ Kill all processes after a time-out exception in process.communication(). diff --git a/pilot/util/default.cfg b/pilot/util/default.cfg index 53d09d0bc..bda415fc5 100644 --- a/pilot/util/default.cfg +++ b/pilot/util/default.cfg @@ -45,7 +45,8 @@ pandaserver: https://pandaserver.cern.ch:25443 iddsserver: https://pandaserver.cern.ch:25443 # The log type and URL for the real-time logging server (format: ;) -rtlogging:logstash;http://aipanda020.cern.ch:8443 +# (experiment specific values are preferred, see common.py in user area) +rtlogging: logstash;http://aipanda020.cern.ch:8443 ssl_enable: True ssl_verify: False diff --git a/pilot/util/filehandling.py b/pilot/util/filehandling.py index 7f4f5349f..766ad25ee 100644 --- a/pilot/util/filehandling.py +++ b/pilot/util/filehandling.py @@ -9,12 +9,13 @@ import hashlib import io +import logging import os import re +import subprocess import tarfile import time import uuid -import logging from collections.abc import Iterable, Mapping from glob import glob from json import load, JSONDecodeError @@ -1231,3 +1232,27 @@ def generate_test_file(filename, filesize=1024): with open(filename, 'wb') as fout: fout.write(os.urandom(filesize)) # replace 1024 with a size in kilobytes if it is not unreasonably large + + +def get_directory_size(directory: str) -> float: + """ + Measure the size of the given directory with du -sh. + The function will return None in case of failure. + + :param directory: full directory path (string). + :return: size in MB (float). + """ + + size_mb = None + command = ["du", "-sh", directory] + output = subprocess.check_output(command) + # E.g. '269M /path' + match = re.search(r"^([0-9.]+)\S+(.*)$", output.decode("utf-8")) + if match: + print(match.group(1)) + try: + size_mb = float(match.group(1)) + except ValueError as exc: + logger.warning(f'failed to convert {match.group(1)} to float: {exc}') + # path = match.group(2) + return size_mb diff --git a/pilot/util/harvester.py b/pilot/util/harvester.py index cd79aa996..8663e43ac 100644 --- a/pilot/util/harvester.py +++ b/pilot/util/harvester.py @@ -111,7 +111,7 @@ def get_initial_work_report(): :return: work report dictionary. """ - hostname = os.environ.get('PAMDA_HOSTNAME', socket.gethostname()) + hostname = os.environ.get('PANDA_HOSTNAME', socket.gethostname()) work_report = {'jobStatus': 'starting', 'messageLevel': logging.getLevelName(logger.getEffectiveLevel()), 'cpuConversionFactor': 1.0, diff --git a/pilot/util/loopingjob.py b/pilot/util/loopingjob.py index fec63978d..016403cc8 100644 --- a/pilot/util/loopingjob.py +++ b/pilot/util/loopingjob.py @@ -5,15 +5,15 @@ # http://www.apache.org/licenses/LICENSE-2.0 # # Authors: -# - Paul Nilsson, paul.nilsson@cern.ch, 2018-2022 +# - Paul Nilsson, paul.nilsson@cern.ch, 2018-20223 from pilot.common.errorcodes import ErrorCodes from pilot.util.auxiliary import whoami, set_pilot_state, cut_output, locate_core_file from pilot.util.config import config -from pilot.util.container import execute +from pilot.util.container import execute #, execute_command from pilot.util.filehandling import remove_files, find_latest_modified_file, verify_file_list, copy, list_mod_files from pilot.util.parameters import convert_to_int -from pilot.util.processes import kill_processes +from pilot.util.processes import kill_processes, find_zombies, handle_zombies, get_child_processes, reap_zombies from pilot.util.timing import time_stamp import os @@ -40,7 +40,6 @@ def looping_job(job, montime): diagnostics = "" logger.info(f'checking for looping job (in state={job.state})') - looping_limit = get_looping_job_limit() if job.state == 'stagein': @@ -60,15 +59,18 @@ def looping_job(job, montime): logger.info(f'current time: {currenttime}') logger.info(f'last time files were touched: {time_last_touched}') logger.info(f'looping limit: {looping_limit} s') + if currenttime - time_last_touched > looping_limit: try: # which were the considered files? list_mod_files(recent_files) # first produce core dump and copy it - create_core_dump(pid=job.pid, workdir=job.workdir) + create_core_dump(job) # set debug mode to prevent core file from being removed before log creation job.debug = True kill_looping_job(job) + exit_code = errors.LOOPINGJOB + diagnostics = 'the payload was found to be looping - job will be failed in the next update' except Exception as error: logger.warning(f'exception caught: {error}') else: @@ -77,29 +79,41 @@ def looping_job(job, montime): return exit_code, diagnostics -def create_core_dump(pid=None, workdir=None): +def create_core_dump(job): """ Create core dump and copy it to work directory + + :param job: job object. """ - if not pid or not workdir: + if not job.pid or not job.workdir: logger.warning('cannot create core file since pid or workdir is unknown') return - cmd = 'gdb --pid %d -ex \'generate-core-file\'' % pid + cmd = f'gdb --pid {job.pid} -ex \'generate-core-file\' -ex quit' exit_code, stdout, stderr = execute(cmd) - if not exit_code: - path = locate_core_file(pid=pid) + #exit_code = execute_command(cmd) + if exit_code == 0: + path = locate_core_file(pid=job.pid) if path: try: - copy(path, workdir) + copy(path, job.workdir) except Exception as error: logger.warning(f'failed to copy core file: {error}') else: logger.debug('copied core dump to workdir') - else: - logger.warning(f'failed to execute command: {cmd}, stdout+stderr={stdout + stderr}') + logger.warning(f'failed to execute command: {cmd}, exit code={exit_code}, stdout={stdout}, stderr={stderr}') + + try: + zombies = find_zombies(os.getpid()) + if zombies: + logger.info(f'found zombies: {zombies}') + handle_zombies(zombies, job=job) + else: + logger.info('found no zombies') + except Exception as exp: + logger.warning(f'exception caught: {exp}') def get_time_for_last_touch(job, montime, looping_limit): @@ -166,15 +180,32 @@ def kill_looping_job(job): # the child process is looping, kill it diagnostics = f"pilot has decided to kill looping job {job.jobid} at {time_stamp()}" logger.fatal(diagnostics) + job.debug_command = 'looping' # overrule any other debug command - also prevents real time logging from starting + + # process zombies + if job.pid not in job.zombies: + job.zombies.append(job.pid) + logger.info("pass #1/2: collecting zombie processes") + job.collect_zombies(depth=10) + logger.debug('pass #2/2: reaping zombies') + reap_zombies() cmds = [f'ps -fwu {whoami()}', f'ls -ltr {job.workdir}', f'ps -o pid,ppid,sid,pgid,tpgid,stat,comm -u {whoami()}', 'pstree -g -a'] +# f'ps -f --ppid {os.getpid()} | grep python3'] for cmd in cmds: _, stdout, _ = execute(cmd, mute=True) logger.info(f"{cmd} + '\n': {stdout}") + parent_pid = os.getpid() + child_processes = get_child_processes(parent_pid) + if child_processes: + logger.info(f"child processes of PID {parent_pid}:") + for pid, cmdline in child_processes: + logger.info(f"PID {pid}: {cmdline}") + # set the relevant error code if job.state == 'stagein': job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.STAGEINTIMEOUT) diff --git a/pilot/util/monitoring.py b/pilot/util/monitoring.py index 7a127d6a7..31583a837 100644 --- a/pilot/util/monitoring.py +++ b/pilot/util/monitoring.py @@ -5,7 +5,7 @@ # http://www.apache.org/licenses/LICENSE-2.0 # # Authors: -# - Paul Nilsson, paul.nilsson@cern.ch, 2018-2022 +# - Paul Nilsson, paul.nilsson@cern.ch, 2018-2023 # This module contains implementations of job monitoring tasks @@ -13,10 +13,12 @@ import time from subprocess import PIPE from glob import glob +from typing import Any +from signal import SIGKILL from pilot.common.errorcodes import ErrorCodes from pilot.common.exception import PilotException -from pilot.util.auxiliary import set_pilot_state, show_memory_usage +from pilot.util.auxiliary import set_pilot_state #, show_memory_usage from pilot.util.config import config from pilot.util.constants import PILOT_PRE_PAYLOAD from pilot.util.container import execute @@ -25,7 +27,7 @@ from pilot.util.math import convert_mb_to_b, human2bytes from pilot.util.parameters import convert_to_int, get_maximum_input_sizes from pilot.util.processes import get_current_cpu_consumption_time, kill_processes, get_number_of_child_processes,\ - get_subprocesses + get_subprocesses, reap_zombies from pilot.util.timing import get_time_since from pilot.util.workernode import get_local_disk_space, check_hz from pilot.info import infosys @@ -89,6 +91,12 @@ def job_monitor_tasks(job, mt, args): if exit_code != 0: return exit_code, diagnostics + # check lease time in stager/pod mode on Kubernetes + if args.workflow == 'stager': + exit_code, diagnostics = check_lease_time(current_time, mt, args.leasetime) + if exit_code != 0: + return exit_code, diagnostics + # is it time to verify the pilot running time? # exit_code, diagnostics = verify_pilot_running_time(current_time, mt, job) # if exit_code != 0: @@ -209,8 +217,8 @@ def verify_memory_usage(current_time, mt, job, debug=False): :return: exit code (int), error diagnostics (string). """ - if debug: - show_memory_usage() + #if debug: + # show_memory_usage() pilot_user = os.environ.get('PILOT_USER', 'generic').lower() memory = __import__('pilot.user.%s.memory' % pilot_user, globals(), locals(), [pilot_user], 0) @@ -317,6 +325,13 @@ def verify_looping_job(current_time, mt, job, args): return 0, "" if current_time - mt.get('ct_looping') > looping_verification_time: + + # remove any lingering defunct processes + try: + reap_zombies() + except Exception as exc: + logger.warning(f'reap_zombies threw an exception: {exc}') + # is the job looping? try: exit_code, diagnostics = looping_job(job, mt) @@ -338,14 +353,41 @@ def verify_looping_job(current_time, mt, job, args): return 0, "" +def check_lease_time(current_time, mt, leasetime): + """ + Check the lease time in stager mode. + + :param current_time: current time at the start of the monitoring loop (int) + :param mt: measured time object + :param leasetime: lease time in seconds (int) + :return: exit code (int), error diagnostics (string). + """ + + exit_code = 0 + diagnostics = '' + if current_time - mt.get('ct_lease') > 10: + # time to check the lease time + + logger.debug(f'checking lease time (lease time={leasetime})') + if current_time - mt.get('ct_start') > leasetime: + diagnostics = f"lease time is up: {current_time - mt.get('ct_start')} s has passed since start - abort stager pilot" + logger.warning(diagnostics) + exit_code = errors.LEASETIME + + # update the ct_lease with the current time + mt.update('ct_lease') + + return exit_code, diagnostics + + def verify_disk_usage(current_time, mt, job): """ Verify the disk usage. The function checks 1) payload stdout size, 2) local space, 3) work directory size, 4) output file sizes. - :param current_time: current time at the start of the monitoring loop (int). - :param mt: measured time object. - :param job: job object. + :param current_time: current time at the start of the monitoring loop (int) + :param mt: measured time object + :param job: job object :return: exit code (int), error diagnostics (string). """ @@ -436,6 +478,10 @@ def utility_monitor(job): # make sure the subprocess is still running utproc = job.utilities[utcmd][0] if not utproc.poll() is None: + + # clean up the process + kill_process(utproc) + if job.state == 'finished' or job.state == 'failed' or job.state == 'stageout': logger.debug('no need to restart utility command since payload has finished running') continue @@ -467,6 +513,27 @@ def utility_monitor(job): time.sleep(10) +def kill_process(process: Any): + """ + Kill process before restart to get rid of defunct processes. + + :param process: process object + """ + + diagnostics = '' + try: + logger.warning('killing lingering subprocess') + process.kill() + except ProcessLookupError as exc: + diagnostics += f'\n(kill process group) ProcessLookupError={exc}' + try: + logger.warning('killing lingering process') + os.kill(process.pid, SIGKILL) + except ProcessLookupError as exc: + diagnostics += f'\n(kill process) ProcessLookupError={exc}' + logger.warning(f'sent hard kill signal - final stderr: {diagnostics}') + + def get_local_size_limit_stdout(bytes=True): """ Return a proper value for the local size limit for payload stdout (from config file). @@ -795,6 +862,10 @@ def store_subprocess_pids(job): :return: """ + # only store the pid once + if job.subprocesses: + return + # is the payload running? if job.pid: # get all subprocesses diff --git a/pilot/util/monitoringtime.py b/pilot/util/monitoringtime.py index 25a83cc91..4c7563699 100644 --- a/pilot/util/monitoringtime.py +++ b/pilot/util/monitoringtime.py @@ -24,6 +24,7 @@ def __init__(self): """ ct = int(time.time()) + self.ct_start = ct self.ct_proxy = ct self.ct_looping = ct self.ct_looping_last_touched = None @@ -32,6 +33,7 @@ def __init__(self): self.ct_process = ct self.ct_heartbeat = ct self.ct_kill = ct + self.ct_lease = ct def update(self, key, modtime=None): """ diff --git a/pilot/util/processes.py b/pilot/util/processes.py index 8ff7de273..be7869e51 100644 --- a/pilot/util/processes.py +++ b/pilot/util/processes.py @@ -14,15 +14,48 @@ import threading from pilot.util.container import execute -from pilot.util.auxiliary import whoami +from pilot.util.auxiliary import whoami, grep_str from pilot.util.filehandling import read_file, remove_dir_tree from pilot.util.processgroups import kill_process_group +from pilot.util.timer import timeout import logging logger = logging.getLogger(__name__) -def find_processes_in_group(cpids, pid): +def find_processes_in_group(cpids, pid, ps_cache): + """ + Find all processes that belong to the same group using the given ps command output. + Recursively search for the children processes belonging to pid and return their pid's. + pid is the parent pid and cpids is a list that has to be initialized before calling this function and it contains + the pids of the children AND the parent. + + ps_cache is expected to be the output from the command "ps -eo pid,ppid -m". + + :param cpids: list of pid's for all child processes to the parent pid, as well as the parent pid itself (int). + :param pid: parent process id (int). + :param ps_cache: ps command output (string). + :return: (updated cpids input parameter list). + """ + + if not pid: + return + + cpids.append(pid) + lines = grep_str([str(pid)], ps_cache) + + if lines and lines != ['']: + for i in range(0, len(lines)): + try: + thispid = int(lines[i].split()[0]) + thisppid = int(lines[i].split()[1]) + except Exception as error: + logger.warning(f'exception caught: {error}') + if thisppid == pid: + find_processes_in_group(cpids, thispid, ps_cache) + + +def find_processes_in_group_old(cpids, pid): """ Find all processes that belong to the same group. Recursively search for the children processes belonging to pid and return their pid's. @@ -40,7 +73,7 @@ def find_processes_in_group(cpids, pid): cpids.append(pid) cmd = "ps -eo pid,ppid -m | grep %d" % pid - exit_code, psout, stderr = execute(cmd, mute=True) + _, psout, _ = execute(cmd, mute=True) lines = psout.split("\n") if lines != ['']: @@ -64,7 +97,7 @@ def is_zombie(pid): status = False cmd = "ps aux | grep %d" % (pid) - exit_code, stdout, stderr = execute(cmd, mute=True) + _, stdout, _ = execute(cmd, mute=True) if "" in stdout: status = True @@ -132,7 +165,6 @@ def kill_processes(pid): Kill process belonging to the process group that the given pid belongs to. :param pid: process id (int). - :return: """ # if there is a known subprocess pgrp, then it should be enough to kill the group in one go @@ -147,7 +179,8 @@ def kill_processes(pid): if not status: # firstly find all the children process IDs to be killed children = [] - find_processes_in_group(children, pid) + _, ps_cache, _ = execute("ps -eo pid,ppid -m", mute=True) + find_processes_in_group(children, pid, ps_cache) # reverse the process order so that the athena process is killed first (otherwise the stdout will be truncated) if not children: @@ -182,6 +215,44 @@ def kill_processes(pid): # if orphan process killing is not desired, set env var PILOT_NOKILL kill_orphans() + # kill any lingering defunct processes + try: + kill_defunct_children(pid) + except Exception as exc: + logger.warning(f'exception caught: {exc}') + + +def kill_defunct_children(pid): + """ + Kills any defunct child processes of the specified process ID. + + :param pid: process id (int). + """ + + defunct_children = [] + for proc in os.listdir("/proc"): + if proc.isdigit(): + try: + cmdline = os.readlink(f"/proc/{proc}/cmdline") + except Exception: + # ignore lines that do not have cmdline + continue + if not cmdline or cmdline.startswith("/bin/init"): + continue + pinfo = os.readlink(f"/proc/{proc}/status") + if pinfo.startswith("Z") and os.readlink(f"/proc/{proc}/parent") == str(pid): + defunct_children.append(int(proc)) + + if defunct_children: + logger.info(f'will now remove defunct processes: {defunct_children}') + else: + logger.info(f'did not find any defunct processes belonging to {pid}') + for child_pid in defunct_children: + try: + os.kill(child_pid, signal.SIGKILL) + except ProcessLookupError: + pass + def kill_child_processes(pid): """ @@ -192,7 +263,8 @@ def kill_child_processes(pid): """ # firstly find all the children process IDs to be killed children = [] - find_processes_in_group(children, pid) + _, ps_cache, _ = execute("ps -eo pid,ppid -m", mute=True) + find_processes_in_group(children, pid, ps_cache) # reverse the process order so that the athena process is killed first (otherwise the stdout will be truncated) children.reverse() @@ -220,7 +292,7 @@ def kill_child_processes(pid): kill_process(i) -def kill_process(pid): +def kill_process(pid, hardkillonly=False): """ Kill process. @@ -228,14 +300,13 @@ def kill_process(pid): :return: boolean (True if successful SIGKILL) """ - status = False - # start with soft kill (ignore any returned status) - kill(pid, signal.SIGTERM) + if not hardkillonly: + kill(pid, signal.SIGTERM) - _t = 10 - logger.info("sleeping %d s to allow process to exit", _t) - time.sleep(_t) + _t = 10 + logger.info("sleeping %d s to allow process to exit", _t) + time.sleep(_t) # now do a hard kill just in case some processes haven't gone away status = kill(pid, signal.SIGKILL) @@ -276,7 +347,8 @@ def get_number_of_child_processes(pid): children = [] n = 0 try: - find_processes_in_group(children, pid) + _, ps_cache, _ = execute("ps -eo pid,ppid -m", mute=True) + find_processes_in_group(children, pid, ps_cache) except Exception as error: logger.warning("exception caught in find_processes_in_group: %s", error) else: @@ -512,7 +584,8 @@ def get_current_cpu_consumption_time(pid): # get all the child processes children = [] - find_processes_in_group(children, pid) + _, ps_cache, _ = execute("ps -eo pid,ppid -m", mute=True) + find_processes_in_group(children, pid, ps_cache) cpuconsumptiontime = 0 for _pid in children: @@ -804,3 +877,123 @@ def get_subprocesses(pid): cmd = f'ps -opid --no-headers --ppid {pid}' _, out, _ = execute(cmd) return [int(line) for line in out.splitlines()] if out else [] + + +def identify_numbers_and_strings(string): + """Identifies numbers and strings in a given string. + + Args: + string: The string to be processed. + + Returns: + A list of tuples, where each tuple contains the matched numbers and strings. + """ + + pattern = r'(\d+)\s+(\d+)\s+([A-Za-z]+)\s+([A-Za-z]+)' + return re.findall(pattern, string) + + +def find_zombies(parent_pid): + """ + Find all zombies/defunct processes under the given parent pid. + + :param parent_pid: parent pid (int). + """ + + zombies = {} + cmd = 'ps -eo pid,ppid,stat,comm' + ec, stdout, _ = execute(cmd) + for line in stdout.split('\n'): + matches = identify_numbers_and_strings(line) + if matches: + pid = int(matches[0][0]) + ppid = int(matches[0][1]) + stat = matches[0][2] + comm = matches[0][3] + #print(f'pid={pid} ppid={ppid} stat={stat} comm={comm}') + if ppid == parent_pid and stat.startswith('Z'): + if not zombies.get(parent_pid): + zombies[parent_pid] = [] + zombies[parent_pid].append([pid, stat, comm]) + + return zombies + + +def handle_zombies(zombies, job=None): + """ + Dump some info about the given zombies. + + :param zombies: list of zombies. + :param job: if job object is given, then the zombie pid will be added to the job.zombies list + """ + + for parent in zombies: + #logger.info(f'sending SIGCHLD to ppid={parent}') + #kill(parent, signal.SIGCHLD) + for zombie in zombies.get(parent): + pid = zombie[0] + # stat = zombie[1] + comm = zombie[2] + logger.info(f'zombie process {pid} (comm={comm}, ppid={parent})') + # kill_process(pid, hardkillonly=True) # useless for zombies - they are already dead + if job: + job.zombies.append(pid) + + +def get_child_processes(parent_pid): + child_processes = [] + + # Iterate through all directories in /proc + for pid in os.listdir('/proc'): + if not pid.isdigit(): + continue # Skip non-numeric directories + + pid = int(pid) + try: + # Read the command line of the process + with open(f'/proc/{pid}/cmdline', 'rb') as cmdline_file: + cmdline = cmdline_file.read().decode().replace('\x00', ' ') + + # Read the parent PID of the process + with open(f'/proc/{pid}/stat', 'rb') as stat_file: + stat_info = stat_file.read().decode() + parts = stat_info.split() + ppid = int(parts[3]) + + # Check if the parent PID matches the specified parent process + if ppid == parent_pid: + child_processes.append((pid, cmdline)) + + except (FileNotFoundError, PermissionError): + continue # Process may have terminated or we don't have permission + + return child_processes + + +def reap_zombies(pid: int = -1): + """ + Check for and reap zombie processes. + + This function can be called by the monitoring loop. Using PID -1 in os.waitpid() means that the request pertains to + any child of the current process. + + :param pid: process id (int). + """ + + max_timeout = 20 + + @timeout(seconds=max_timeout) + def waitpid(pid: int = -1): + try: + while True: + _pid, status = os.waitpid(pid, os.WNOHANG) + if _pid == 0: + break + # Handle the terminated process here + if os.WIFEXITED(status): + exit_code = os.WEXITSTATUS(status) + logger.info(f'pid={_pid} exited with {exit_code}') + except ChildProcessError: + pass + logger.info(f'reaping zombies for max {max_timeout} seconds') + waitpid(pid) diff --git a/pilot/util/processgroups.py b/pilot/util/processgroups.py index 43007e1cf..bd1993bda 100644 --- a/pilot/util/processgroups.py +++ b/pilot/util/processgroups.py @@ -8,6 +8,8 @@ # - Paul Nilsson, paul.nilsson@cern.ch, 2023 import os +import re +import subprocess from signal import SIGTERM, SIGKILL from time import sleep @@ -37,7 +39,7 @@ def kill_process_group(pgrp): logger.info(f"SIGTERM sent to process group {pgrp}") if _sleep: - nap = 30 + nap = 10 logger.info(f"sleeping {nap} s to allow processes to exit") sleep(nap) @@ -50,3 +52,78 @@ def kill_process_group(pgrp): status = True return status + + +def kill_defunct_process(pid): + """ + Kills a process if it is in a defunct state. + + :param pid: main process PID (int). + """ + + try: + cmd = f"ps -p {pid} -o stat" + logger.info(f'executing command with os.popen(): {cmd}') + process = os.popen(cmd) + output = process.read().strip() + process.close() + except Exception as exc: + logger.warning(f'caught exception: {exc}') + else: + for line in output.split('\n'): + if line.upper().startswith('Z') or line.upper().endswith('Z'): + # The process is in a defunct state. + logger.info(f'killing defunct process {pid}') + os.kill(pid, SIGKILL) + + +def get_all_child_pids(parent_pid): + + def extract_pids_from_string(s): + pid_pattern = re.compile(r'\((\d+)\)') + pids = pid_pattern.findall(s) + return [int(pid) for pid in pids] + try: + output = subprocess.check_output(["pstree", "-p", str(parent_pid)], universal_newlines=True) + logger.debug(output) + pids = extract_pids_from_string(output) + return pids + except subprocess.CalledProcessError: + return [] + + +def is_defunct(pid): + def get_process_info(pid): + process = subprocess.run(['ps', '-o', 'stat=', '-p', str(pid)], capture_output=True, text=True) + return process.returncode, process.stdout, process.stderr + + try: + #cmd = f'ps -p {pid} -o pid,vsz=MEMORY -o user,group=GROUP -o comm,args=ARGS' + #result = subprocess.run(cmd.split(' '), capture_output=True, text=True) + #logger.debug(f'{cmd}: {result}') + + returncode, stdout, stderr = get_process_info(pid) + logger.debug(f'{pid}: return code={returncode}, stdout={stdout}, stderr={stderr}') + + return 'Z' in stdout.strip() + except subprocess.CalledProcessError: + return False + + +def find_defunct_subprocesses(parent_pid): + """ + Finds all defunct subprocesses to the given process. + + :param pid: main process PID (int). + :return: A list of all defunct subprocesses to the given process. + """ + + child_pids = get_all_child_pids(parent_pid) + logger.info(f'child pids={child_pids}') + defunct_subprocesses = [] + + for child_pid in child_pids: + if is_defunct(child_pid): + defunct_subprocesses.append(child_pid) + + return defunct_subprocesses diff --git a/pilot/util/realtimelogger.py b/pilot/util/realtimelogger.py index e6a011226..1ecded277 100644 --- a/pilot/util/realtimelogger.py +++ b/pilot/util/realtimelogger.py @@ -145,11 +145,12 @@ def __init__(self, args, info_dic, workdir, secrets, level=INFO): if isinstance(secrets, str): secrets = json.loads(secrets) + ssl_enable, ssl_verify = self.get_rtlogging_ssl() transport = HttpTransport( server, port, - ssl_enable=config.Pilot.ssl_enable, - ssl_verify=config.Pilot.ssl_verify, + ssl_enable=ssl_enable, + ssl_verify=ssl_verify, timeout=5.0, username=secrets.get('logstash_login', 'unknown_login'), password=secrets.get('logstash_password', 'unknown_password') @@ -285,3 +286,21 @@ def sending_logs(self, args, job): else: self.send_loginfiles() # send the remaining logs after the job completion self.close_files() + + def get_rtlogging_ssl(self): + """ + Return the proper rtlogging value from the experiment specific plug-in or the config file. + + :return: ssl_enable (bool), ssl_verify (bool) (tuple). + """ + + pilot_user = os.environ.get('PILOT_USER', 'generic').lower() + try: + user = __import__('pilot.user.%s.common' % pilot_user, globals(), locals(), [pilot_user], 0) + ssl_enable, ssl_verify = user.get_rtlogging_ssl() + except Exception: + ssl_enable = config.Pilot.ssl_enable + ssl_verify = config.Pilot.ssl_verify + logger.warning(f'found no experiment specific ssl_enable, ssl_verify, using config values ({ssl_enable}, {ssl_verify})') + + return ssl_enable, ssl_verify diff --git a/pilot/util/timer.py b/pilot/util/timer.py index 6b58d4f6b..3c591e885 100644 --- a/pilot/util/timer.py +++ b/pilot/util/timer.py @@ -25,22 +25,10 @@ import traceback import threading -import multiprocessing from queue import Empty from functools import wraps - - -class TimeoutException(Exception): - - def __init__(self, message, timeout=None, *args): - self.timeout = timeout - self.message = message - self._errorCode = 1334 - super(TimeoutException, self).__init__(*args) - - def __str__(self): - return "%s: %s, timeout=%s seconds%s" % (self.__class__.__name__, self.message, self.timeout, ' : %s' % repr(self.args) if self.args else '') +from pilot.util.auxiliary import TimeoutException class TimedThread(object): @@ -126,6 +114,9 @@ def _execute(func, args, kwargs, queue): traceback.print_exc(file=sys.stderr) queue.put((False, e)) + # do not put this import at the top since it can possibly interfere with some modules (esp. Google Cloud Logging modules) + import multiprocessing + queue = multiprocessing.Queue(1) process = multiprocessing.Process(target=_execute, args=(func, args, kwargs, queue)) process.daemon = True diff --git a/pilot/util/timing.py b/pilot/util/timing.py index f0f2f25ee..29587964c 100644 --- a/pilot/util/timing.py +++ b/pilot/util/timing.py @@ -5,7 +5,7 @@ # http://www.apache.org/licenses/LICENSE-2.0 # # Authors: -# - Paul Nilsson, paul.nilsson@cern.ch, 2018 +# - Paul Nilsson, paul.nilsson@cern.ch, 2018-2023 # Note: The Pilot 2 modules that need to record timing measurements, can do so using the add_to_pilot_timing() function. # When the timing measurements need to be recorded, the high-level functions, e.g. get_getjob_time(), can be used. @@ -19,11 +19,29 @@ import time from pilot.util.config import config -from pilot.util.constants import PILOT_START_TIME, PILOT_PRE_GETJOB, PILOT_POST_GETJOB, PILOT_PRE_SETUP, \ - PILOT_POST_SETUP, PILOT_PRE_STAGEIN, PILOT_POST_STAGEIN, PILOT_PRE_PAYLOAD, PILOT_POST_PAYLOAD, PILOT_PRE_STAGEOUT,\ - PILOT_POST_STAGEOUT, PILOT_PRE_FINAL_UPDATE, PILOT_POST_FINAL_UPDATE, PILOT_END_TIME, PILOT_MULTIJOB_START_TIME -from pilot.util.filehandling import read_json, write_json -#from pilot.util.mpi import get_ranks_info +from pilot.util.constants import ( + PILOT_START_TIME, + PILOT_PRE_GETJOB, + PILOT_POST_GETJOB, + PILOT_PRE_SETUP, + PILOT_POST_SETUP, + PILOT_PRE_STAGEIN, + PILOT_POST_STAGEIN, + PILOT_PRE_PAYLOAD, + PILOT_POST_PAYLOAD, + PILOT_PRE_STAGEOUT, + PILOT_POST_STAGEOUT, + PILOT_PRE_FINAL_UPDATE, + PILOT_POST_FINAL_UPDATE, + PILOT_END_TIME, + PILOT_MULTIJOB_START_TIME, + PILOT_PRE_LOG_TAR, + PILOT_POST_LOG_TAR +) +from pilot.util.filehandling import ( + read_json, + write_json +) import logging logger = logging.getLogger(__name__) @@ -148,6 +166,18 @@ def get_stageout_time(job_id, args): return get_time_difference(job_id, PILOT_PRE_STAGEOUT, PILOT_POST_STAGEOUT, args) +def get_log_creation_time(job_id, args): + """ + High level function that returns the time for creating the job log for the given job_id. + + :param job_id: PanDA job id (string). + :param args: pilot arguments. + :return: time in seconds (int). + """ + + return get_time_difference(job_id, PILOT_PRE_LOG_TAR, PILOT_POST_LOG_TAR, args) + + def get_payload_execution_time(job_id, args): """ High level function that returns the time for the payload execution for the given job_id. @@ -349,6 +379,8 @@ def timing_report(job_id, args): time_stagein = get_stagein_time(job_id, args) time_payload = get_payload_execution_time(job_id, args) time_stageout = get_stageout_time(job_id, args) + time_log_creation = get_log_creation_time(job_id, args) + logger.info('.' * 30) logger.info('. Timing measurements:') logger.info(f'. get job = {time_getjob} s') @@ -358,9 +390,10 @@ def timing_report(job_id, args): logger.info(f'. stage-in = {time_stagein} s') logger.info(f'. payload execution = {time_payload} s') logger.info(f'. stage-out = {time_stageout} s') + logger.info(f'. log creation = {time_log_creation} s') logger.info('.' * 30) - return time_getjob, time_stagein, time_payload, time_stageout, time_initial_setup, time_setup + return time_getjob, time_stagein, time_payload, time_stageout, time_initial_setup, time_setup, time_log_creation def time_stamp(): diff --git a/pilot/util/tracereport.py b/pilot/util/tracereport.py index a274a83b5..d2dfb1345 100644 --- a/pilot/util/tracereport.py +++ b/pilot/util/tracereport.py @@ -87,7 +87,7 @@ def init(self, job): self.update(data) self['timeStart'] = time.time() - hostname = os.environ.get('PAMDA_HOSTNAME', socket.gethostname()) + hostname = os.environ.get('PANDA_HOSTNAME', socket.gethostname()) try: self['hostname'] = socket.gethostbyaddr(hostname)[0] except Exception: @@ -172,7 +172,7 @@ def send(self): command += ' -4' cmd = f'{command} --connect-timeout 20 --max-time 120 --cacert {ssl_certificate} -v -k -d \"{data}\" {url}' - exit_code, stdout, stderr = execute(cmd, mute=False) + exit_code, stdout, stderr = execute(cmd, mute=False, timeout=300) logger.debug(f'exit_code={exit_code}, stdout={stdout}, stderr={stderr}') if exit_code or 'ExceptionClass' in stdout: logger.warning('failed to send traces to rucio: %s' % stdout) diff --git a/pilot/workflow/stager.py b/pilot/workflow/stager.py index 8e8c931a6..daa341220 100644 --- a/pilot/workflow/stager.py +++ b/pilot/workflow/stager.py @@ -93,7 +93,7 @@ def run(args): signal.signal(signal.SIGBUS, functools.partial(interrupt, args)) logger.info('setting up queues') - queues = namedtuple('queues', ['jobs', 'data_in', 'data_out', 'current_data_in', 'validated_jobs', + queues = namedtuple('queues', ['jobs', 'data_in', 'data_out', 'current_data_in', 'validated_jobs', 'monitored_payloads', 'finished_jobs', 'finished_data_in', 'finished_data_out', 'completed_jobids', 'failed_jobs', 'failed_data_in', 'failed_data_out', 'completed_jobs']) @@ -103,6 +103,7 @@ def run(args): queues.current_data_in = queue.Queue() queues.validated_jobs = queue.Queue() + queues.monitored_payloads = queue.Queue() queues.finished_jobs = queue.Queue() queues.finished_data_in = queue.Queue()