Skip to content

Commit

Permalink
Merge pull request #104 from PalNilsson/next
Browse files Browse the repository at this point in the history
3.6.9.10
PalNilsson authored Oct 3, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
2 parents efecbae + 0a6fda2 commit 7f56648
Showing 9 changed files with 129 additions and 58 deletions.
2 changes: 1 addition & 1 deletion PILOTVERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.6.8.29
3.6.9.10
2 changes: 1 addition & 1 deletion pilot/control/job.py
Original file line number Diff line number Diff line change
@@ -3001,7 +3001,7 @@ def preliminary_server_update(job, args, diagnostics):
:param diagnostics: error diagnostics (string).
"""

logger.debug(f'could have sent diagnostics={diagnostics}')
logger.warning(f'will send preliminary diagnostics (and pretend job is still running)={diagnostics}')
piloterrorcode = job.piloterrorcode
piloterrorcodes = job.piloterrorcodes
piloterrordiags = job.piloterrordiags
2 changes: 1 addition & 1 deletion pilot/control/payloads/generic.py
Original file line number Diff line number Diff line change
@@ -575,7 +575,7 @@ def get_payload_command(self, job):
try:
pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
user = __import__('pilot.user.%s.common' % pilot_user, globals(), locals(), [pilot_user], 0)
cmd = user.get_payload_command(job) # + 'sleep 900' # to test looping jobs
cmd = user.get_payload_command(job) #+ 'sleep 900' # to test looping jobs
except PilotException as error:
self.post_setup(job)
import traceback
4 changes: 2 additions & 2 deletions pilot/util/constants.py
Original file line number Diff line number Diff line change
@@ -13,8 +13,8 @@
# Pilot version
RELEASE = '3' # released number should be fixed at 3 for Pilot 3
VERSION = '6' # version number is '1' for first release, '0' until then, increased for bigger updates
REVISION = '8' # revision number should be reset to '0' for every new version release, increased for small updates
BUILD = '29' # build number should be reset to '1' for every new development cycle
REVISION = '9' # revision number should be reset to '0' for every new version release, increased for small updates
BUILD = '10' # build number should be reset to '1' for every new development cycle

SUCCESS = 0
FAILURE = 1
32 changes: 19 additions & 13 deletions pilot/util/loopingjob.py
Original file line number Diff line number Diff line change
@@ -5,15 +5,16 @@
# http://www.apache.org/licenses/LICENSE-2.0
#
# Authors:
# - Paul Nilsson, [email protected], 2018-20223
# - Paul Nilsson, [email protected], 2018-2023

from pilot.common.errorcodes import ErrorCodes
from pilot.util.auxiliary import whoami, set_pilot_state, cut_output, locate_core_file
from pilot.util.config import config
from pilot.util.container import execute #, execute_command
from pilot.util.filehandling import remove_files, find_latest_modified_file, verify_file_list, copy, list_mod_files
from pilot.util.parameters import convert_to_int
from pilot.util.processes import kill_processes, find_zombies, handle_zombies, get_child_processes, reap_zombies
from pilot.util.processes import kill_process, find_zombies, handle_zombies, reap_zombies
from pilot.util.psutils import get_child_processes
from pilot.util.timing import time_stamp

import os
@@ -199,21 +200,14 @@ def kill_looping_job(job):
_, stdout, _ = execute(cmd, mute=True)
logger.info(f"{cmd} + '\n': {stdout}")

parent_pid = os.getpid()
child_processes = get_child_processes(parent_pid)
if child_processes:
logger.info(f"child processes of PID {parent_pid}:")
for pid, cmdline in child_processes:
logger.info(f"PID {pid}: {cmdline}")

# set the relevant error code
if job.state == 'stagein':
job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.STAGEINTIMEOUT)
job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.STAGEINTIMEOUT, priority=True)
elif job.state == 'stageout':
job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.STAGEOUTTIMEOUT)
job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.STAGEOUTTIMEOUT, priority=True)
else:
# most likely in the 'running' state, but use the catch-all 'else'
job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.LOOPINGJOB)
job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.LOOPINGJOB, priority=True)
set_pilot_state(job=job, state="failed")

# remove any lingering input files from the work dir
@@ -223,7 +217,19 @@ def kill_looping_job(job):
if _ec != 0:
logger.warning('failed to remove all files')

kill_processes(job.pid)
parent_pid = os.getpid()
#logger.info(f'killing main command process id in case it is still running: {job.pid}')
#kill_processes(job.pid)

child_processes = get_child_processes(parent_pid)
if child_processes:
logger.info(f"child processes of pilot (PID {parent_pid}) to be killed:")
for pid, cmdline in child_processes:
logger.info(f"PID {pid}: {cmdline}")
#_, ps_cache, _ = execute("ps -eo pid,ppid -m", mute=True)
for pid, _ in child_processes:
#kill_processes(pid, korphans=False, ps_cache=ps_cache, nap=1)
kill_process(pid)


def get_looping_job_limit():
2 changes: 2 additions & 0 deletions pilot/util/monitoring.py
Original file line number Diff line number Diff line change
@@ -366,13 +366,15 @@ def verify_looping_job(current_time, mt, job, args):
# only perform looping job check if desired and enough time has passed since start
pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
loopingjob_definitions = __import__('pilot.user.%s.loopingjob_definitions' % pilot_user, globals(), locals(), [pilot_user], 0)

runcheck = loopingjob_definitions.allow_loopingjob_detection()
if not job.looping_check and runcheck:
logger.debug('looping check not desired')
return 0, ""

time_since_start = get_time_since(job.jobid, PILOT_PRE_PAYLOAD, args) # payload walltime
looping_verification_time = convert_to_int(config.Pilot.looping_verification_time, default=600)

if time_since_start < looping_verification_time:
logger.debug(f'no point in running looping job algorithm since time since last payload start={time_since_start} s < '
f'looping verification time={looping_verification_time} s')
51 changes: 13 additions & 38 deletions pilot/util/processes.py
Original file line number Diff line number Diff line change
@@ -160,11 +160,13 @@ def dump_stack_trace(pid):
logger.info("skipping pstack dump for zombie process")


def kill_processes(pid):
def kill_processes(pid, korphans=True, ps_cache=None, nap=10):
"""
Kill process belonging to the process group that the given pid belongs to.
:param pid: process id (int).
:param pid: process id (int)
:param nap: napping time between kill signals in seconds (int)
:param korphans: kill orphans (bool).
"""

# if there is a known subprocess pgrp, then it should be enough to kill the group in one go
@@ -174,12 +176,13 @@ def kill_processes(pid):
except Exception:
pgrp = 0
if pgrp != 0:
status = kill_process_group(pgrp)
status = kill_process_group(pgrp, nap=nap)

if not status:
# firstly find all the children process IDs to be killed
children = []
_, ps_cache, _ = execute("ps -eo pid,ppid -m", mute=True)
if not ps_cache:
_, ps_cache, _ = execute("ps -eo pid,ppid -m", mute=True)
find_processes_in_group(children, pid, ps_cache)

# reverse the process order so that the athena process is killed first (otherwise the stdout will be truncated)
@@ -213,7 +216,8 @@ def kill_processes(pid):
# kill any remaining orphan processes
# note: this should no longer be necessary since ctypes has made sure all subprocesses are parented
# if orphan process killing is not desired, set env var PILOT_NOKILL
kill_orphans()
if korphans:
kill_orphans()

# kill any lingering defunct processes
try:
@@ -254,7 +258,7 @@ def kill_defunct_children(pid):
pass


def kill_child_processes(pid):
def kill_child_processes(pid, ps_cache=None):
"""
Kill child processes.
@@ -263,7 +267,8 @@ def kill_child_processes(pid):
"""
# firstly find all the children process IDs to be killed
children = []
_, ps_cache, _ = execute("ps -eo pid,ppid -m", mute=True)
if not ps_cache:
_, ps_cache, _ = execute("ps -eo pid,ppid -m", mute=True)
find_processes_in_group(children, pid, ps_cache)

# reverse the process order so that the athena process is killed first (otherwise the stdout will be truncated)
@@ -304,7 +309,7 @@ def kill_process(pid, hardkillonly=False):
if not hardkillonly:
kill(pid, signal.SIGTERM)

_t = 10
_t = 3
logger.info("sleeping %d s to allow process to exit", _t)
time.sleep(_t)

@@ -940,36 +945,6 @@ def handle_zombies(zombies, job=None):
job.zombies.append(pid)


def get_child_processes(parent_pid):
child_processes = []

# Iterate through all directories in /proc
for pid in os.listdir('/proc'):
if not pid.isdigit():
continue # Skip non-numeric directories

pid = int(pid)
try:
# Read the command line of the process
with open(f'/proc/{pid}/cmdline', 'rb') as cmdline_file:
cmdline = cmdline_file.read().decode().replace('\x00', ' ')

# Read the parent PID of the process
with open(f'/proc/{pid}/stat', 'rb') as stat_file:
stat_info = stat_file.read().decode()
parts = stat_info.split()
ppid = int(parts[3])

# Check if the parent PID matches the specified parent process
if ppid == parent_pid:
child_processes.append((pid, cmdline))

except (FileNotFoundError, PermissionError):
continue # Process may have terminated or we don't have permission

return child_processes


def reap_zombies(pid: int = -1):
"""
Check for and reap zombie processes.
4 changes: 2 additions & 2 deletions pilot/util/processgroups.py
Original file line number Diff line number Diff line change
@@ -17,11 +17,12 @@
logger = logging.getLogger(__name__)


def kill_process_group(pgrp):
def kill_process_group(pgrp, nap=10):
"""
Kill the process group.
DO NOT MOVE TO PROCESSES.PY - will lead to circular import since execute() needs it as well.
:param pgrp: process group id (int).
:param nap: napping time between kill signals in seconds (int)
:return: boolean (True if SIGTERM followed by SIGKILL signalling was successful)
"""

@@ -39,7 +40,6 @@ def kill_process_group(pgrp):
logger.info(f"SIGTERM sent to process group {pgrp}")

if _sleep:
nap = 10
logger.info(f"sleeping {nap} s to allow processes to exit")
sleep(nap)

88 changes: 88 additions & 0 deletions pilot/util/psutils.py
Original file line number Diff line number Diff line change
@@ -124,3 +124,91 @@ def get_parent_pid(pid):
return parent_pid
except psutil.NoSuchProcess:
return None


def get_child_processes(parent_pid):
"""
Return a list of all child processes belonging to the same parent process id.
Using a fallback to /proc/{pid} in case psutil is not available.
:param parent_pid: parent process id (int)
:return: child processes (list).
"""

if not _is_psutil_available:
logger.warning('get_child_processes(): psutil not available - using legacy code as a fallback')
return get_child_processes_legacy(parent_pid)
else:
return get_all_descendant_processes(parent_pid)


def get_all_descendant_processes(parent_pid):
"""
Recursively find child processes using the given parent pid as a starting point.
:param parent_pid: parent process id (int)
:return: descendant process ids and cmdline (list).
"""

def find_descendant_processes(pid):
try:
descendants = []
for process in psutil.process_iter(attrs=['pid', 'ppid', 'cmdline']):
process_info = process.info
child_pid = process_info['pid']
ppid = process_info['ppid']
cmdline = process_info['cmdline']
if ppid == pid:
descendants.append((child_pid, cmdline))
descendants.extend(find_descendant_processes(child_pid))
return descendants
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
return []

all_descendant_processes = find_descendant_processes(parent_pid)
return all_descendant_processes


def get_child_processes_legacy(parent_pid):
"""
Return a list of all child processes belonging to the same parent process id.
Note: this approach is not efficient if one is to find all child processes using
the parent pid as a starting point. Better to use a recursive function using psutil.
This method should be removed once psutil is available everywhere.
:param parent_pid: parent process id (int)
:return: child processes (list).
"""

child_processes = []

# Iterate through all directories in /proc
for _pid in os.listdir('/proc'):
if not _pid.isdigit():
continue # Skip non-numeric directories

try:
pid = int(_pid)
except ValueError as exc:
logger.warning(f'exception caught: got an unexpected value for pid={_pid}: {exc}')
continue

try:
# Read the command line of the process
with open(f'/proc/{pid}/cmdline', 'rb') as cmdline_file:
cmdline = cmdline_file.read().decode().replace('\x00', ' ')

# Read the parent PID of the process
with open(f'/proc/{pid}/stat', 'rb') as stat_file:
stat_info = stat_file.read().decode()
parts = stat_info.split()
ppid = int(parts[3]) # can throw a ValueError

# Check if the parent PID matches the specified parent process
if ppid == parent_pid:
child_processes.append((pid, cmdline))

except (ValueError, FileNotFoundError, PermissionError):
continue # Process may have terminated or we don't have permission

return child_processes

0 comments on commit 7f56648

Please sign in to comment.