diff --git a/PILOTVERSION b/PILOTVERSION index e20a74ab..96fd1f25 100644 --- a/PILOTVERSION +++ b/PILOTVERSION @@ -1 +1 @@ -3.6.8.29 \ No newline at end of file +3.6.9.10 \ No newline at end of file diff --git a/pilot/control/job.py b/pilot/control/job.py index 08200f66..957fe3a5 100644 --- a/pilot/control/job.py +++ b/pilot/control/job.py @@ -3001,7 +3001,7 @@ def preliminary_server_update(job, args, diagnostics): :param diagnostics: error diagnostics (string). """ - logger.debug(f'could have sent diagnostics={diagnostics}') + logger.warning(f'will send preliminary diagnostics (and pretend job is still running)={diagnostics}') piloterrorcode = job.piloterrorcode piloterrorcodes = job.piloterrorcodes piloterrordiags = job.piloterrordiags diff --git a/pilot/control/payloads/generic.py b/pilot/control/payloads/generic.py index 6479464b..60cb220b 100644 --- a/pilot/control/payloads/generic.py +++ b/pilot/control/payloads/generic.py @@ -575,7 +575,7 @@ def get_payload_command(self, job): try: pilot_user = os.environ.get('PILOT_USER', 'generic').lower() user = __import__('pilot.user.%s.common' % pilot_user, globals(), locals(), [pilot_user], 0) - cmd = user.get_payload_command(job) # + 'sleep 900' # to test looping jobs + cmd = user.get_payload_command(job) #+ 'sleep 900' # to test looping jobs except PilotException as error: self.post_setup(job) import traceback diff --git a/pilot/util/constants.py b/pilot/util/constants.py index 8d1433f9..cd238ad4 100644 --- a/pilot/util/constants.py +++ b/pilot/util/constants.py @@ -13,8 +13,8 @@ # Pilot version RELEASE = '3' # released number should be fixed at 3 for Pilot 3 VERSION = '6' # version number is '1' for first release, '0' until then, increased for bigger updates -REVISION = '8' # revision number should be reset to '0' for every new version release, increased for small updates -BUILD = '29' # build number should be reset to '1' for every new development cycle +REVISION = '9' # revision number should be reset to '0' for every new version release, increased for small updates +BUILD = '10' # build number should be reset to '1' for every new development cycle SUCCESS = 0 FAILURE = 1 diff --git a/pilot/util/loopingjob.py b/pilot/util/loopingjob.py index 016403cc..8fd47209 100644 --- a/pilot/util/loopingjob.py +++ b/pilot/util/loopingjob.py @@ -5,7 +5,7 @@ # http://www.apache.org/licenses/LICENSE-2.0 # # Authors: -# - Paul Nilsson, paul.nilsson@cern.ch, 2018-20223 +# - Paul Nilsson, paul.nilsson@cern.ch, 2018-2023 from pilot.common.errorcodes import ErrorCodes from pilot.util.auxiliary import whoami, set_pilot_state, cut_output, locate_core_file @@ -13,7 +13,8 @@ from pilot.util.container import execute #, execute_command from pilot.util.filehandling import remove_files, find_latest_modified_file, verify_file_list, copy, list_mod_files from pilot.util.parameters import convert_to_int -from pilot.util.processes import kill_processes, find_zombies, handle_zombies, get_child_processes, reap_zombies +from pilot.util.processes import kill_process, find_zombies, handle_zombies, reap_zombies +from pilot.util.psutils import get_child_processes from pilot.util.timing import time_stamp import os @@ -199,21 +200,14 @@ def kill_looping_job(job): _, stdout, _ = execute(cmd, mute=True) logger.info(f"{cmd} + '\n': {stdout}") - parent_pid = os.getpid() - child_processes = get_child_processes(parent_pid) - if child_processes: - logger.info(f"child processes of PID {parent_pid}:") - for pid, cmdline in child_processes: - logger.info(f"PID {pid}: {cmdline}") - # set the relevant error code if job.state == 'stagein': - job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.STAGEINTIMEOUT) + job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.STAGEINTIMEOUT, priority=True) elif job.state == 'stageout': - job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.STAGEOUTTIMEOUT) + job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.STAGEOUTTIMEOUT, priority=True) else: # most likely in the 'running' state, but use the catch-all 'else' - job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.LOOPINGJOB) + job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.LOOPINGJOB, priority=True) set_pilot_state(job=job, state="failed") # remove any lingering input files from the work dir @@ -223,7 +217,19 @@ def kill_looping_job(job): if _ec != 0: logger.warning('failed to remove all files') - kill_processes(job.pid) + parent_pid = os.getpid() + #logger.info(f'killing main command process id in case it is still running: {job.pid}') + #kill_processes(job.pid) + + child_processes = get_child_processes(parent_pid) + if child_processes: + logger.info(f"child processes of pilot (PID {parent_pid}) to be killed:") + for pid, cmdline in child_processes: + logger.info(f"PID {pid}: {cmdline}") + #_, ps_cache, _ = execute("ps -eo pid,ppid -m", mute=True) + for pid, _ in child_processes: + #kill_processes(pid, korphans=False, ps_cache=ps_cache, nap=1) + kill_process(pid) def get_looping_job_limit(): diff --git a/pilot/util/monitoring.py b/pilot/util/monitoring.py index ae2a367c..3920cec6 100644 --- a/pilot/util/monitoring.py +++ b/pilot/util/monitoring.py @@ -366,6 +366,7 @@ def verify_looping_job(current_time, mt, job, args): # only perform looping job check if desired and enough time has passed since start pilot_user = os.environ.get('PILOT_USER', 'generic').lower() loopingjob_definitions = __import__('pilot.user.%s.loopingjob_definitions' % pilot_user, globals(), locals(), [pilot_user], 0) + runcheck = loopingjob_definitions.allow_loopingjob_detection() if not job.looping_check and runcheck: logger.debug('looping check not desired') @@ -373,6 +374,7 @@ def verify_looping_job(current_time, mt, job, args): time_since_start = get_time_since(job.jobid, PILOT_PRE_PAYLOAD, args) # payload walltime looping_verification_time = convert_to_int(config.Pilot.looping_verification_time, default=600) + if time_since_start < looping_verification_time: logger.debug(f'no point in running looping job algorithm since time since last payload start={time_since_start} s < ' f'looping verification time={looping_verification_time} s') diff --git a/pilot/util/processes.py b/pilot/util/processes.py index 398c22f2..8de9fc53 100644 --- a/pilot/util/processes.py +++ b/pilot/util/processes.py @@ -160,11 +160,13 @@ def dump_stack_trace(pid): logger.info("skipping pstack dump for zombie process") -def kill_processes(pid): +def kill_processes(pid, korphans=True, ps_cache=None, nap=10): """ Kill process belonging to the process group that the given pid belongs to. - :param pid: process id (int). + :param pid: process id (int) + :param nap: napping time between kill signals in seconds (int) + :param korphans: kill orphans (bool). """ # if there is a known subprocess pgrp, then it should be enough to kill the group in one go @@ -174,12 +176,13 @@ def kill_processes(pid): except Exception: pgrp = 0 if pgrp != 0: - status = kill_process_group(pgrp) + status = kill_process_group(pgrp, nap=nap) if not status: # firstly find all the children process IDs to be killed children = [] - _, ps_cache, _ = execute("ps -eo pid,ppid -m", mute=True) + if not ps_cache: + _, ps_cache, _ = execute("ps -eo pid,ppid -m", mute=True) find_processes_in_group(children, pid, ps_cache) # reverse the process order so that the athena process is killed first (otherwise the stdout will be truncated) @@ -213,7 +216,8 @@ def kill_processes(pid): # kill any remaining orphan processes # note: this should no longer be necessary since ctypes has made sure all subprocesses are parented # if orphan process killing is not desired, set env var PILOT_NOKILL - kill_orphans() + if korphans: + kill_orphans() # kill any lingering defunct processes try: @@ -254,7 +258,7 @@ def kill_defunct_children(pid): pass -def kill_child_processes(pid): +def kill_child_processes(pid, ps_cache=None): """ Kill child processes. @@ -263,7 +267,8 @@ def kill_child_processes(pid): """ # firstly find all the children process IDs to be killed children = [] - _, ps_cache, _ = execute("ps -eo pid,ppid -m", mute=True) + if not ps_cache: + _, ps_cache, _ = execute("ps -eo pid,ppid -m", mute=True) find_processes_in_group(children, pid, ps_cache) # reverse the process order so that the athena process is killed first (otherwise the stdout will be truncated) @@ -304,7 +309,7 @@ def kill_process(pid, hardkillonly=False): if not hardkillonly: kill(pid, signal.SIGTERM) - _t = 10 + _t = 3 logger.info("sleeping %d s to allow process to exit", _t) time.sleep(_t) @@ -940,36 +945,6 @@ def handle_zombies(zombies, job=None): job.zombies.append(pid) -def get_child_processes(parent_pid): - child_processes = [] - - # Iterate through all directories in /proc - for pid in os.listdir('/proc'): - if not pid.isdigit(): - continue # Skip non-numeric directories - - pid = int(pid) - try: - # Read the command line of the process - with open(f'/proc/{pid}/cmdline', 'rb') as cmdline_file: - cmdline = cmdline_file.read().decode().replace('\x00', ' ') - - # Read the parent PID of the process - with open(f'/proc/{pid}/stat', 'rb') as stat_file: - stat_info = stat_file.read().decode() - parts = stat_info.split() - ppid = int(parts[3]) - - # Check if the parent PID matches the specified parent process - if ppid == parent_pid: - child_processes.append((pid, cmdline)) - - except (FileNotFoundError, PermissionError): - continue # Process may have terminated or we don't have permission - - return child_processes - - def reap_zombies(pid: int = -1): """ Check for and reap zombie processes. diff --git a/pilot/util/processgroups.py b/pilot/util/processgroups.py index bd1993bd..f35ec625 100644 --- a/pilot/util/processgroups.py +++ b/pilot/util/processgroups.py @@ -17,11 +17,12 @@ logger = logging.getLogger(__name__) -def kill_process_group(pgrp): +def kill_process_group(pgrp, nap=10): """ Kill the process group. DO NOT MOVE TO PROCESSES.PY - will lead to circular import since execute() needs it as well. :param pgrp: process group id (int). + :param nap: napping time between kill signals in seconds (int) :return: boolean (True if SIGTERM followed by SIGKILL signalling was successful) """ @@ -39,7 +40,6 @@ def kill_process_group(pgrp): logger.info(f"SIGTERM sent to process group {pgrp}") if _sleep: - nap = 10 logger.info(f"sleeping {nap} s to allow processes to exit") sleep(nap) diff --git a/pilot/util/psutils.py b/pilot/util/psutils.py index 754a71be..d3cf2f65 100644 --- a/pilot/util/psutils.py +++ b/pilot/util/psutils.py @@ -124,3 +124,91 @@ def get_parent_pid(pid): return parent_pid except psutil.NoSuchProcess: return None + + +def get_child_processes(parent_pid): + """ + Return a list of all child processes belonging to the same parent process id. + Using a fallback to /proc/{pid} in case psutil is not available. + + :param parent_pid: parent process id (int) + :return: child processes (list). + """ + + if not _is_psutil_available: + logger.warning('get_child_processes(): psutil not available - using legacy code as a fallback') + return get_child_processes_legacy(parent_pid) + else: + return get_all_descendant_processes(parent_pid) + + +def get_all_descendant_processes(parent_pid): + """ + Recursively find child processes using the given parent pid as a starting point. + + :param parent_pid: parent process id (int) + :return: descendant process ids and cmdline (list). + """ + + def find_descendant_processes(pid): + try: + descendants = [] + for process in psutil.process_iter(attrs=['pid', 'ppid', 'cmdline']): + process_info = process.info + child_pid = process_info['pid'] + ppid = process_info['ppid'] + cmdline = process_info['cmdline'] + if ppid == pid: + descendants.append((child_pid, cmdline)) + descendants.extend(find_descendant_processes(child_pid)) + return descendants + except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): + return [] + + all_descendant_processes = find_descendant_processes(parent_pid) + return all_descendant_processes + + +def get_child_processes_legacy(parent_pid): + """ + Return a list of all child processes belonging to the same parent process id. + Note: this approach is not efficient if one is to find all child processes using + the parent pid as a starting point. Better to use a recursive function using psutil. + This method should be removed once psutil is available everywhere. + + :param parent_pid: parent process id (int) + :return: child processes (list). + """ + + child_processes = [] + + # Iterate through all directories in /proc + for _pid in os.listdir('/proc'): + if not _pid.isdigit(): + continue # Skip non-numeric directories + + try: + pid = int(_pid) + except ValueError as exc: + logger.warning(f'exception caught: got an unexpected value for pid={_pid}: {exc}') + continue + + try: + # Read the command line of the process + with open(f'/proc/{pid}/cmdline', 'rb') as cmdline_file: + cmdline = cmdline_file.read().decode().replace('\x00', ' ') + + # Read the parent PID of the process + with open(f'/proc/{pid}/stat', 'rb') as stat_file: + stat_info = stat_file.read().decode() + parts = stat_info.split() + ppid = int(parts[3]) # can throw a ValueError + + # Check if the parent PID matches the specified parent process + if ppid == parent_pid: + child_processes.append((pid, cmdline)) + + except (ValueError, FileNotFoundError, PermissionError): + continue # Process may have terminated or we don't have permission + + return child_processes