Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3.6.9.10 #104

Merged
merged 4 commits into from
Oct 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion PILOTVERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.6.8.29
3.6.9.10
2 changes: 1 addition & 1 deletion pilot/control/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -3001,7 +3001,7 @@ def preliminary_server_update(job, args, diagnostics):
:param diagnostics: error diagnostics (string).
"""

logger.debug(f'could have sent diagnostics={diagnostics}')
logger.warning(f'will send preliminary diagnostics (and pretend job is still running)={diagnostics}')
piloterrorcode = job.piloterrorcode
piloterrorcodes = job.piloterrorcodes
piloterrordiags = job.piloterrordiags
Expand Down
2 changes: 1 addition & 1 deletion pilot/control/payloads/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ def get_payload_command(self, job):
try:
pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
user = __import__('pilot.user.%s.common' % pilot_user, globals(), locals(), [pilot_user], 0)
cmd = user.get_payload_command(job) # + 'sleep 900' # to test looping jobs
cmd = user.get_payload_command(job) #+ 'sleep 900' # to test looping jobs
except PilotException as error:
self.post_setup(job)
import traceback
Expand Down
4 changes: 2 additions & 2 deletions pilot/util/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
# Pilot version
RELEASE = '3' # released number should be fixed at 3 for Pilot 3
VERSION = '6' # version number is '1' for first release, '0' until then, increased for bigger updates
REVISION = '8' # revision number should be reset to '0' for every new version release, increased for small updates
BUILD = '29' # build number should be reset to '1' for every new development cycle
REVISION = '9' # revision number should be reset to '0' for every new version release, increased for small updates
BUILD = '10' # build number should be reset to '1' for every new development cycle

SUCCESS = 0
FAILURE = 1
Expand Down
32 changes: 19 additions & 13 deletions pilot/util/loopingjob.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@
# http://www.apache.org/licenses/LICENSE-2.0
#
# Authors:
# - Paul Nilsson, [email protected], 2018-20223
# - Paul Nilsson, [email protected], 2018-2023

from pilot.common.errorcodes import ErrorCodes
from pilot.util.auxiliary import whoami, set_pilot_state, cut_output, locate_core_file
from pilot.util.config import config
from pilot.util.container import execute #, execute_command
from pilot.util.filehandling import remove_files, find_latest_modified_file, verify_file_list, copy, list_mod_files
from pilot.util.parameters import convert_to_int
from pilot.util.processes import kill_processes, find_zombies, handle_zombies, get_child_processes, reap_zombies
from pilot.util.processes import kill_process, find_zombies, handle_zombies, reap_zombies
from pilot.util.psutils import get_child_processes
from pilot.util.timing import time_stamp

import os
Expand Down Expand Up @@ -199,21 +200,14 @@ def kill_looping_job(job):
_, stdout, _ = execute(cmd, mute=True)
logger.info(f"{cmd} + '\n': {stdout}")

parent_pid = os.getpid()
child_processes = get_child_processes(parent_pid)
if child_processes:
logger.info(f"child processes of PID {parent_pid}:")
for pid, cmdline in child_processes:
logger.info(f"PID {pid}: {cmdline}")

# set the relevant error code
if job.state == 'stagein':
job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.STAGEINTIMEOUT)
job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.STAGEINTIMEOUT, priority=True)
elif job.state == 'stageout':
job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.STAGEOUTTIMEOUT)
job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.STAGEOUTTIMEOUT, priority=True)
else:
# most likely in the 'running' state, but use the catch-all 'else'
job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.LOOPINGJOB)
job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.LOOPINGJOB, priority=True)
set_pilot_state(job=job, state="failed")

# remove any lingering input files from the work dir
Expand All @@ -223,7 +217,19 @@ def kill_looping_job(job):
if _ec != 0:
logger.warning('failed to remove all files')

kill_processes(job.pid)
parent_pid = os.getpid()
#logger.info(f'killing main command process id in case it is still running: {job.pid}')
#kill_processes(job.pid)

child_processes = get_child_processes(parent_pid)
if child_processes:
logger.info(f"child processes of pilot (PID {parent_pid}) to be killed:")
for pid, cmdline in child_processes:
logger.info(f"PID {pid}: {cmdline}")
#_, ps_cache, _ = execute("ps -eo pid,ppid -m", mute=True)
for pid, _ in child_processes:
#kill_processes(pid, korphans=False, ps_cache=ps_cache, nap=1)
kill_process(pid)


def get_looping_job_limit():
Expand Down
2 changes: 2 additions & 0 deletions pilot/util/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,13 +366,15 @@ def verify_looping_job(current_time, mt, job, args):
# only perform looping job check if desired and enough time has passed since start
pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
loopingjob_definitions = __import__('pilot.user.%s.loopingjob_definitions' % pilot_user, globals(), locals(), [pilot_user], 0)

runcheck = loopingjob_definitions.allow_loopingjob_detection()
if not job.looping_check and runcheck:
logger.debug('looping check not desired')
return 0, ""

time_since_start = get_time_since(job.jobid, PILOT_PRE_PAYLOAD, args) # payload walltime
looping_verification_time = convert_to_int(config.Pilot.looping_verification_time, default=600)

if time_since_start < looping_verification_time:
logger.debug(f'no point in running looping job algorithm since time since last payload start={time_since_start} s < '
f'looping verification time={looping_verification_time} s')
Expand Down
51 changes: 13 additions & 38 deletions pilot/util/processes.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,13 @@ def dump_stack_trace(pid):
logger.info("skipping pstack dump for zombie process")


def kill_processes(pid):
def kill_processes(pid, korphans=True, ps_cache=None, nap=10):
"""
Kill process belonging to the process group that the given pid belongs to.

:param pid: process id (int).
:param pid: process id (int)
:param nap: napping time between kill signals in seconds (int)
:param korphans: kill orphans (bool).
"""

# if there is a known subprocess pgrp, then it should be enough to kill the group in one go
Expand All @@ -174,12 +176,13 @@ def kill_processes(pid):
except Exception:
pgrp = 0
if pgrp != 0:
status = kill_process_group(pgrp)
status = kill_process_group(pgrp, nap=nap)

if not status:
# firstly find all the children process IDs to be killed
children = []
_, ps_cache, _ = execute("ps -eo pid,ppid -m", mute=True)
if not ps_cache:
_, ps_cache, _ = execute("ps -eo pid,ppid -m", mute=True)
find_processes_in_group(children, pid, ps_cache)

# reverse the process order so that the athena process is killed first (otherwise the stdout will be truncated)
Expand Down Expand Up @@ -213,7 +216,8 @@ def kill_processes(pid):
# kill any remaining orphan processes
# note: this should no longer be necessary since ctypes has made sure all subprocesses are parented
# if orphan process killing is not desired, set env var PILOT_NOKILL
kill_orphans()
if korphans:
kill_orphans()

# kill any lingering defunct processes
try:
Expand Down Expand Up @@ -254,7 +258,7 @@ def kill_defunct_children(pid):
pass


def kill_child_processes(pid):
def kill_child_processes(pid, ps_cache=None):
"""
Kill child processes.

Expand All @@ -263,7 +267,8 @@ def kill_child_processes(pid):
"""
# firstly find all the children process IDs to be killed
children = []
_, ps_cache, _ = execute("ps -eo pid,ppid -m", mute=True)
if not ps_cache:
_, ps_cache, _ = execute("ps -eo pid,ppid -m", mute=True)
find_processes_in_group(children, pid, ps_cache)

# reverse the process order so that the athena process is killed first (otherwise the stdout will be truncated)
Expand Down Expand Up @@ -304,7 +309,7 @@ def kill_process(pid, hardkillonly=False):
if not hardkillonly:
kill(pid, signal.SIGTERM)

_t = 10
_t = 3
logger.info("sleeping %d s to allow process to exit", _t)
time.sleep(_t)

Expand Down Expand Up @@ -940,36 +945,6 @@ def handle_zombies(zombies, job=None):
job.zombies.append(pid)


def get_child_processes(parent_pid):
child_processes = []

# Iterate through all directories in /proc
for pid in os.listdir('/proc'):
if not pid.isdigit():
continue # Skip non-numeric directories

pid = int(pid)
try:
# Read the command line of the process
with open(f'/proc/{pid}/cmdline', 'rb') as cmdline_file:
cmdline = cmdline_file.read().decode().replace('\x00', ' ')

# Read the parent PID of the process
with open(f'/proc/{pid}/stat', 'rb') as stat_file:
stat_info = stat_file.read().decode()
parts = stat_info.split()
ppid = int(parts[3])

# Check if the parent PID matches the specified parent process
if ppid == parent_pid:
child_processes.append((pid, cmdline))

except (FileNotFoundError, PermissionError):
continue # Process may have terminated or we don't have permission

return child_processes


def reap_zombies(pid: int = -1):
"""
Check for and reap zombie processes.
Expand Down
4 changes: 2 additions & 2 deletions pilot/util/processgroups.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@
logger = logging.getLogger(__name__)


def kill_process_group(pgrp):
def kill_process_group(pgrp, nap=10):
"""
Kill the process group.
DO NOT MOVE TO PROCESSES.PY - will lead to circular import since execute() needs it as well.
:param pgrp: process group id (int).
:param nap: napping time between kill signals in seconds (int)
:return: boolean (True if SIGTERM followed by SIGKILL signalling was successful)
"""

Expand All @@ -39,7 +40,6 @@ def kill_process_group(pgrp):
logger.info(f"SIGTERM sent to process group {pgrp}")

if _sleep:
nap = 10
logger.info(f"sleeping {nap} s to allow processes to exit")
sleep(nap)

Expand Down
88 changes: 88 additions & 0 deletions pilot/util/psutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,91 @@ def get_parent_pid(pid):
return parent_pid
except psutil.NoSuchProcess:
return None


def get_child_processes(parent_pid):
"""
Return a list of all child processes belonging to the same parent process id.
Using a fallback to /proc/{pid} in case psutil is not available.

:param parent_pid: parent process id (int)
:return: child processes (list).
"""

if not _is_psutil_available:
logger.warning('get_child_processes(): psutil not available - using legacy code as a fallback')
return get_child_processes_legacy(parent_pid)
else:
return get_all_descendant_processes(parent_pid)


def get_all_descendant_processes(parent_pid):
"""
Recursively find child processes using the given parent pid as a starting point.

:param parent_pid: parent process id (int)
:return: descendant process ids and cmdline (list).
"""

def find_descendant_processes(pid):
try:
descendants = []
for process in psutil.process_iter(attrs=['pid', 'ppid', 'cmdline']):
process_info = process.info
child_pid = process_info['pid']
ppid = process_info['ppid']
cmdline = process_info['cmdline']
if ppid == pid:
descendants.append((child_pid, cmdline))
descendants.extend(find_descendant_processes(child_pid))
return descendants
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
return []

all_descendant_processes = find_descendant_processes(parent_pid)
return all_descendant_processes


def get_child_processes_legacy(parent_pid):
"""
Return a list of all child processes belonging to the same parent process id.
Note: this approach is not efficient if one is to find all child processes using
the parent pid as a starting point. Better to use a recursive function using psutil.
This method should be removed once psutil is available everywhere.

:param parent_pid: parent process id (int)
:return: child processes (list).
"""

child_processes = []

# Iterate through all directories in /proc
for _pid in os.listdir('/proc'):
if not _pid.isdigit():
continue # Skip non-numeric directories

try:
pid = int(_pid)
except ValueError as exc:
logger.warning(f'exception caught: got an unexpected value for pid={_pid}: {exc}')
continue

try:
# Read the command line of the process
with open(f'/proc/{pid}/cmdline', 'rb') as cmdline_file:
cmdline = cmdline_file.read().decode().replace('\x00', ' ')

# Read the parent PID of the process
with open(f'/proc/{pid}/stat', 'rb') as stat_file:
stat_info = stat_file.read().decode()
parts = stat_info.split()
ppid = int(parts[3]) # can throw a ValueError

# Check if the parent PID matches the specified parent process
if ppid == parent_pid:
child_processes.append((pid, cmdline))

except (ValueError, FileNotFoundError, PermissionError):
continue # Process may have terminated or we don't have permission

return child_processes