Skip to content

Commit

Permalink
Interception of SIGSTOP (no action)
Browse files Browse the repository at this point in the history
  • Loading branch information
Paul Nilsson committed Oct 3, 2023
1 parent ad4ad15 commit 0a6fda2
Show file tree
Hide file tree
Showing 9 changed files with 42 additions and 29 deletions.
2 changes: 1 addition & 1 deletion PILOTVERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.6.9.2
3.6.9.10
2 changes: 1 addition & 1 deletion pilot/control/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -3001,7 +3001,7 @@ def preliminary_server_update(job, args, diagnostics):
:param diagnostics: error diagnostics (string).
"""

logger.debug(f'could have sent diagnostics={diagnostics}')
logger.warning(f'will send preliminary diagnostics (and pretend job is still running)={diagnostics}')
piloterrorcode = job.piloterrorcode
piloterrorcodes = job.piloterrorcodes
piloterrordiags = job.piloterrordiags
Expand Down
2 changes: 1 addition & 1 deletion pilot/control/payloads/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ def get_payload_command(self, job):
try:
pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
user = __import__('pilot.user.%s.common' % pilot_user, globals(), locals(), [pilot_user], 0)
cmd = user.get_payload_command(job) # + 'sleep 900' # to test looping jobs
cmd = user.get_payload_command(job) #+ 'sleep 900' # to test looping jobs
except PilotException as error:
self.post_setup(job)
import traceback
Expand Down
2 changes: 1 addition & 1 deletion pilot/util/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
RELEASE = '3' # released number should be fixed at 3 for Pilot 3
VERSION = '6' # version number is '1' for first release, '0' until then, increased for bigger updates
REVISION = '9' # revision number should be reset to '0' for every new version release, increased for small updates
BUILD = '2' # build number should be reset to '1' for every new development cycle
BUILD = '10' # build number should be reset to '1' for every new development cycle

SUCCESS = 0
FAILURE = 1
Expand Down
29 changes: 17 additions & 12 deletions pilot/util/loopingjob.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from pilot.util.container import execute #, execute_command
from pilot.util.filehandling import remove_files, find_latest_modified_file, verify_file_list, copy, list_mod_files
from pilot.util.parameters import convert_to_int
from pilot.util.processes import kill_processes, find_zombies, handle_zombies, reap_zombies
from pilot.util.processes import kill_process, find_zombies, handle_zombies, reap_zombies
from pilot.util.psutils import get_child_processes
from pilot.util.timing import time_stamp

Expand Down Expand Up @@ -200,21 +200,14 @@ def kill_looping_job(job):
_, stdout, _ = execute(cmd, mute=True)
logger.info(f"{cmd} + '\n': {stdout}")

parent_pid = os.getpid()
child_processes = get_child_processes(parent_pid)
if child_processes:
logger.info(f"child processes of PID {parent_pid}:")
for pid, cmdline in child_processes:
logger.info(f"PID {pid}: {cmdline}")

# set the relevant error code
if job.state == 'stagein':
job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.STAGEINTIMEOUT)
job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.STAGEINTIMEOUT, priority=True)
elif job.state == 'stageout':
job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.STAGEOUTTIMEOUT)
job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.STAGEOUTTIMEOUT, priority=True)
else:
# most likely in the 'running' state, but use the catch-all 'else'
job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.LOOPINGJOB)
job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.LOOPINGJOB, priority=True)
set_pilot_state(job=job, state="failed")

# remove any lingering input files from the work dir
Expand All @@ -224,7 +217,19 @@ def kill_looping_job(job):
if _ec != 0:
logger.warning('failed to remove all files')

kill_processes(job.pid)
parent_pid = os.getpid()
#logger.info(f'killing main command process id in case it is still running: {job.pid}')
#kill_processes(job.pid)

child_processes = get_child_processes(parent_pid)
if child_processes:
logger.info(f"child processes of pilot (PID {parent_pid}) to be killed:")
for pid, cmdline in child_processes:
logger.info(f"PID {pid}: {cmdline}")
#_, ps_cache, _ = execute("ps -eo pid,ppid -m", mute=True)
for pid, _ in child_processes:
#kill_processes(pid, korphans=False, ps_cache=ps_cache, nap=1)
kill_process(pid)


def get_looping_job_limit():
Expand Down
2 changes: 2 additions & 0 deletions pilot/util/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,13 +366,15 @@ def verify_looping_job(current_time, mt, job, args):
# only perform looping job check if desired and enough time has passed since start
pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
loopingjob_definitions = __import__('pilot.user.%s.loopingjob_definitions' % pilot_user, globals(), locals(), [pilot_user], 0)

runcheck = loopingjob_definitions.allow_loopingjob_detection()
if not job.looping_check and runcheck:
logger.debug('looping check not desired')
return 0, ""

time_since_start = get_time_since(job.jobid, PILOT_PRE_PAYLOAD, args) # payload walltime
looping_verification_time = convert_to_int(config.Pilot.looping_verification_time, default=600)

if time_since_start < looping_verification_time:
logger.debug(f'no point in running looping job algorithm since time since last payload start={time_since_start} s < '
f'looping verification time={looping_verification_time} s')
Expand Down
21 changes: 13 additions & 8 deletions pilot/util/processes.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,13 @@ def dump_stack_trace(pid):
logger.info("skipping pstack dump for zombie process")


def kill_processes(pid):
def kill_processes(pid, korphans=True, ps_cache=None, nap=10):
"""
Kill process belonging to the process group that the given pid belongs to.
:param pid: process id (int).
:param pid: process id (int)
:param nap: napping time between kill signals in seconds (int)
:param korphans: kill orphans (bool).
"""

# if there is a known subprocess pgrp, then it should be enough to kill the group in one go
Expand All @@ -174,12 +176,13 @@ def kill_processes(pid):
except Exception:
pgrp = 0
if pgrp != 0:
status = kill_process_group(pgrp)
status = kill_process_group(pgrp, nap=nap)

if not status:
# firstly find all the children process IDs to be killed
children = []
_, ps_cache, _ = execute("ps -eo pid,ppid -m", mute=True)
if not ps_cache:
_, ps_cache, _ = execute("ps -eo pid,ppid -m", mute=True)
find_processes_in_group(children, pid, ps_cache)

# reverse the process order so that the athena process is killed first (otherwise the stdout will be truncated)
Expand Down Expand Up @@ -213,7 +216,8 @@ def kill_processes(pid):
# kill any remaining orphan processes
# note: this should no longer be necessary since ctypes has made sure all subprocesses are parented
# if orphan process killing is not desired, set env var PILOT_NOKILL
kill_orphans()
if korphans:
kill_orphans()

# kill any lingering defunct processes
try:
Expand Down Expand Up @@ -254,7 +258,7 @@ def kill_defunct_children(pid):
pass


def kill_child_processes(pid):
def kill_child_processes(pid, ps_cache=None):
"""
Kill child processes.
Expand All @@ -263,7 +267,8 @@ def kill_child_processes(pid):
"""
# firstly find all the children process IDs to be killed
children = []
_, ps_cache, _ = execute("ps -eo pid,ppid -m", mute=True)
if not ps_cache:
_, ps_cache, _ = execute("ps -eo pid,ppid -m", mute=True)
find_processes_in_group(children, pid, ps_cache)

# reverse the process order so that the athena process is killed first (otherwise the stdout will be truncated)
Expand Down Expand Up @@ -304,7 +309,7 @@ def kill_process(pid, hardkillonly=False):
if not hardkillonly:
kill(pid, signal.SIGTERM)

_t = 10
_t = 3
logger.info("sleeping %d s to allow process to exit", _t)
time.sleep(_t)

Expand Down
4 changes: 2 additions & 2 deletions pilot/util/processgroups.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@
logger = logging.getLogger(__name__)


def kill_process_group(pgrp):
def kill_process_group(pgrp, nap=10):
"""
Kill the process group.
DO NOT MOVE TO PROCESSES.PY - will lead to circular import since execute() needs it as well.
:param pgrp: process group id (int).
:param nap: napping time between kill signals in seconds (int)
:return: boolean (True if SIGTERM followed by SIGKILL signalling was successful)
"""

Expand All @@ -39,7 +40,6 @@ def kill_process_group(pgrp):
logger.info(f"SIGTERM sent to process group {pgrp}")

if _sleep:
nap = 10
logger.info(f"sleeping {nap} s to allow processes to exit")
sleep(nap)

Expand Down
7 changes: 4 additions & 3 deletions pilot/util/psutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,18 +147,19 @@ def get_all_descendant_processes(parent_pid):
Recursively find child processes using the given parent pid as a starting point.
:param parent_pid: parent process id (int)
:return: descendant process ids (list).
:return: descendant process ids and cmdline (list).
"""

def find_descendant_processes(pid):
try:
descendants = []
for process in psutil.process_iter(attrs=['pid', 'ppid']):
for process in psutil.process_iter(attrs=['pid', 'ppid', 'cmdline']):
process_info = process.info
child_pid = process_info['pid']
ppid = process_info['ppid']
cmdline = process_info['cmdline']
if ppid == pid:
descendants.append(child_pid)
descendants.append((child_pid, cmdline))
descendants.extend(find_descendant_processes(child_pid))
return descendants
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
Expand Down

0 comments on commit 0a6fda2

Please sign in to comment.