diff --git a/PILOTVERSION b/PILOTVERSION index 8dfd186a..96fd1f25 100644 --- a/PILOTVERSION +++ b/PILOTVERSION @@ -1 +1 @@ -3.6.9.2 \ No newline at end of file +3.6.9.10 \ No newline at end of file diff --git a/pilot/control/job.py b/pilot/control/job.py index 08200f66..957fe3a5 100644 --- a/pilot/control/job.py +++ b/pilot/control/job.py @@ -3001,7 +3001,7 @@ def preliminary_server_update(job, args, diagnostics): :param diagnostics: error diagnostics (string). """ - logger.debug(f'could have sent diagnostics={diagnostics}') + logger.warning(f'will send preliminary diagnostics (and pretend job is still running)={diagnostics}') piloterrorcode = job.piloterrorcode piloterrorcodes = job.piloterrorcodes piloterrordiags = job.piloterrordiags diff --git a/pilot/control/payloads/generic.py b/pilot/control/payloads/generic.py index 6479464b..60cb220b 100644 --- a/pilot/control/payloads/generic.py +++ b/pilot/control/payloads/generic.py @@ -575,7 +575,7 @@ def get_payload_command(self, job): try: pilot_user = os.environ.get('PILOT_USER', 'generic').lower() user = __import__('pilot.user.%s.common' % pilot_user, globals(), locals(), [pilot_user], 0) - cmd = user.get_payload_command(job) # + 'sleep 900' # to test looping jobs + cmd = user.get_payload_command(job) #+ 'sleep 900' # to test looping jobs except PilotException as error: self.post_setup(job) import traceback diff --git a/pilot/util/constants.py b/pilot/util/constants.py index 7557d7c0..cd238ad4 100644 --- a/pilot/util/constants.py +++ b/pilot/util/constants.py @@ -14,7 +14,7 @@ RELEASE = '3' # released number should be fixed at 3 for Pilot 3 VERSION = '6' # version number is '1' for first release, '0' until then, increased for bigger updates REVISION = '9' # revision number should be reset to '0' for every new version release, increased for small updates -BUILD = '2' # build number should be reset to '1' for every new development cycle +BUILD = '10' # build number should be reset to '1' for every new development cycle SUCCESS = 0 FAILURE = 1 diff --git a/pilot/util/loopingjob.py b/pilot/util/loopingjob.py index 607fa81b..8fd47209 100644 --- a/pilot/util/loopingjob.py +++ b/pilot/util/loopingjob.py @@ -13,7 +13,7 @@ from pilot.util.container import execute #, execute_command from pilot.util.filehandling import remove_files, find_latest_modified_file, verify_file_list, copy, list_mod_files from pilot.util.parameters import convert_to_int -from pilot.util.processes import kill_processes, find_zombies, handle_zombies, reap_zombies +from pilot.util.processes import kill_process, find_zombies, handle_zombies, reap_zombies from pilot.util.psutils import get_child_processes from pilot.util.timing import time_stamp @@ -200,21 +200,14 @@ def kill_looping_job(job): _, stdout, _ = execute(cmd, mute=True) logger.info(f"{cmd} + '\n': {stdout}") - parent_pid = os.getpid() - child_processes = get_child_processes(parent_pid) - if child_processes: - logger.info(f"child processes of PID {parent_pid}:") - for pid, cmdline in child_processes: - logger.info(f"PID {pid}: {cmdline}") - # set the relevant error code if job.state == 'stagein': - job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.STAGEINTIMEOUT) + job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.STAGEINTIMEOUT, priority=True) elif job.state == 'stageout': - job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.STAGEOUTTIMEOUT) + job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.STAGEOUTTIMEOUT, priority=True) else: # most likely in the 'running' state, but use the catch-all 'else' - job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.LOOPINGJOB) + job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.LOOPINGJOB, priority=True) set_pilot_state(job=job, state="failed") # remove any lingering input files from the work dir @@ -224,7 +217,19 @@ def kill_looping_job(job): if _ec != 0: logger.warning('failed to remove all files') - kill_processes(job.pid) + parent_pid = os.getpid() + #logger.info(f'killing main command process id in case it is still running: {job.pid}') + #kill_processes(job.pid) + + child_processes = get_child_processes(parent_pid) + if child_processes: + logger.info(f"child processes of pilot (PID {parent_pid}) to be killed:") + for pid, cmdline in child_processes: + logger.info(f"PID {pid}: {cmdline}") + #_, ps_cache, _ = execute("ps -eo pid,ppid -m", mute=True) + for pid, _ in child_processes: + #kill_processes(pid, korphans=False, ps_cache=ps_cache, nap=1) + kill_process(pid) def get_looping_job_limit(): diff --git a/pilot/util/monitoring.py b/pilot/util/monitoring.py index ae2a367c..3920cec6 100644 --- a/pilot/util/monitoring.py +++ b/pilot/util/monitoring.py @@ -366,6 +366,7 @@ def verify_looping_job(current_time, mt, job, args): # only perform looping job check if desired and enough time has passed since start pilot_user = os.environ.get('PILOT_USER', 'generic').lower() loopingjob_definitions = __import__('pilot.user.%s.loopingjob_definitions' % pilot_user, globals(), locals(), [pilot_user], 0) + runcheck = loopingjob_definitions.allow_loopingjob_detection() if not job.looping_check and runcheck: logger.debug('looping check not desired') @@ -373,6 +374,7 @@ def verify_looping_job(current_time, mt, job, args): time_since_start = get_time_since(job.jobid, PILOT_PRE_PAYLOAD, args) # payload walltime looping_verification_time = convert_to_int(config.Pilot.looping_verification_time, default=600) + if time_since_start < looping_verification_time: logger.debug(f'no point in running looping job algorithm since time since last payload start={time_since_start} s < ' f'looping verification time={looping_verification_time} s') diff --git a/pilot/util/processes.py b/pilot/util/processes.py index d0fdfbca..8de9fc53 100644 --- a/pilot/util/processes.py +++ b/pilot/util/processes.py @@ -160,11 +160,13 @@ def dump_stack_trace(pid): logger.info("skipping pstack dump for zombie process") -def kill_processes(pid): +def kill_processes(pid, korphans=True, ps_cache=None, nap=10): """ Kill process belonging to the process group that the given pid belongs to. - :param pid: process id (int). + :param pid: process id (int) + :param nap: napping time between kill signals in seconds (int) + :param korphans: kill orphans (bool). """ # if there is a known subprocess pgrp, then it should be enough to kill the group in one go @@ -174,12 +176,13 @@ def kill_processes(pid): except Exception: pgrp = 0 if pgrp != 0: - status = kill_process_group(pgrp) + status = kill_process_group(pgrp, nap=nap) if not status: # firstly find all the children process IDs to be killed children = [] - _, ps_cache, _ = execute("ps -eo pid,ppid -m", mute=True) + if not ps_cache: + _, ps_cache, _ = execute("ps -eo pid,ppid -m", mute=True) find_processes_in_group(children, pid, ps_cache) # reverse the process order so that the athena process is killed first (otherwise the stdout will be truncated) @@ -213,7 +216,8 @@ def kill_processes(pid): # kill any remaining orphan processes # note: this should no longer be necessary since ctypes has made sure all subprocesses are parented # if orphan process killing is not desired, set env var PILOT_NOKILL - kill_orphans() + if korphans: + kill_orphans() # kill any lingering defunct processes try: @@ -254,7 +258,7 @@ def kill_defunct_children(pid): pass -def kill_child_processes(pid): +def kill_child_processes(pid, ps_cache=None): """ Kill child processes. @@ -263,7 +267,8 @@ def kill_child_processes(pid): """ # firstly find all the children process IDs to be killed children = [] - _, ps_cache, _ = execute("ps -eo pid,ppid -m", mute=True) + if not ps_cache: + _, ps_cache, _ = execute("ps -eo pid,ppid -m", mute=True) find_processes_in_group(children, pid, ps_cache) # reverse the process order so that the athena process is killed first (otherwise the stdout will be truncated) @@ -304,7 +309,7 @@ def kill_process(pid, hardkillonly=False): if not hardkillonly: kill(pid, signal.SIGTERM) - _t = 10 + _t = 3 logger.info("sleeping %d s to allow process to exit", _t) time.sleep(_t) diff --git a/pilot/util/processgroups.py b/pilot/util/processgroups.py index bd1993bd..f35ec625 100644 --- a/pilot/util/processgroups.py +++ b/pilot/util/processgroups.py @@ -17,11 +17,12 @@ logger = logging.getLogger(__name__) -def kill_process_group(pgrp): +def kill_process_group(pgrp, nap=10): """ Kill the process group. DO NOT MOVE TO PROCESSES.PY - will lead to circular import since execute() needs it as well. :param pgrp: process group id (int). + :param nap: napping time between kill signals in seconds (int) :return: boolean (True if SIGTERM followed by SIGKILL signalling was successful) """ @@ -39,7 +40,6 @@ def kill_process_group(pgrp): logger.info(f"SIGTERM sent to process group {pgrp}") if _sleep: - nap = 10 logger.info(f"sleeping {nap} s to allow processes to exit") sleep(nap) diff --git a/pilot/util/psutils.py b/pilot/util/psutils.py index 26b0dec6..d3cf2f65 100644 --- a/pilot/util/psutils.py +++ b/pilot/util/psutils.py @@ -147,18 +147,19 @@ def get_all_descendant_processes(parent_pid): Recursively find child processes using the given parent pid as a starting point. :param parent_pid: parent process id (int) - :return: descendant process ids (list). + :return: descendant process ids and cmdline (list). """ def find_descendant_processes(pid): try: descendants = [] - for process in psutil.process_iter(attrs=['pid', 'ppid']): + for process in psutil.process_iter(attrs=['pid', 'ppid', 'cmdline']): process_info = process.info child_pid = process_info['pid'] ppid = process_info['ppid'] + cmdline = process_info['cmdline'] if ppid == pid: - descendants.append(child_pid) + descendants.append((child_pid, cmdline)) descendants.extend(find_descendant_processes(child_pid)) return descendants except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):