From 61ae62b6bf18310152a3819ce5297789944d6694 Mon Sep 17 00:00:00 2001 From: swenzel Date: Wed, 3 Mar 2021 09:41:31 +0100 Subject: [PATCH] pipeline runner: psutil Stability fixes and workarounds It turns out that psutil.children() has not enough priviledges to determine the list of child processes on some GRID Unixes. In such cases, we now use a custom solution based on a bash function. --- MC/bin/o2_dpg_workflow_runner.py | 60 ++++++++++++++++++++++++++++---- 1 file changed, 53 insertions(+), 7 deletions(-) diff --git a/MC/bin/o2_dpg_workflow_runner.py b/MC/bin/o2_dpg_workflow_runner.py index ce47f184a..bcd6e098c 100755 --- a/MC/bin/o2_dpg_workflow_runner.py +++ b/MC/bin/o2_dpg_workflow_runner.py @@ -40,6 +40,40 @@ def setup_logger(name, log_file, level=logging.INFO): metriclogger = setup_logger('pipeline_metric_logger', 'pipeline_metric.log') +# A fallback solution to getting all child procs +# in case psutil has problems (PermissionError). +# It returns the same list as psutil.children(recursive=True). +def getChildProcs(basepid): + cmd=''' + childprocs() { + local parent=$1 + if [ ! "$2" ]; then + child_pid_list="" + fi + if [ "$parent" ] ; then + child_pid_list="$child_pid_list $parent" + for childpid in $(pgrep -P ${parent}); do + childprocs $childpid "nottoplevel" + done; + fi + # return via a string list (only if toplevel) + if [ ! "$2" ]; then + echo "${child_pid_list}" + fi + } + ''' + cmd = cmd + '\n' + 'childprocs ' + str(basepid) + output = subprocess.check_output(cmd, shell=True) + plist = [] + for p in output.strip().split(): + try: + proc=psutil.Process(int(p)) + except psutil.NoSuchProcess: + continue + + plist.append(proc) + return plist + # # Code section to find all topological orderings # of a DAG. This is used to know when we can schedule @@ -363,20 +397,28 @@ def __init__(self, workflowfile, args, jmax=100): def SIGHandler(self, signum, frame): # basically forcing shut down of all child processes actionlogger.info("Signal " + str(signum) + " caught") - procs = psutil.Process().children(recursive=True) + try: + procs = psutil.Process().children(recursive=True) + except (psutil.NoSuchProcess): + pass + except (psutil.AccessDenied, PermissionError): + procs = getChildProcs(os.getpid()) + for p in procs: actionlogger.info("Terminating " + str(p)) try: - p.terminate() + p.terminate() except (psutil.NoSuchProcess, psutil.AccessDenied): - pass + pass + gone, alive = psutil.wait_procs(procs, timeout=3) for p in alive: - actionlogger.info("Killing " + str(p)) try: - p.kill() + actionlogger.info("Killing " + str(p)) + p.kill() except (psutil.NoSuchProcess, psutil.AccessDenied): - pass + pass + exit (1) def getallrequirements(self, t): @@ -434,6 +476,7 @@ def submit(self, tid, nice=0): p.nice(nice) self.nicevalues[tid]=nice except (psutil.NoSuchProcess, psutil.AccessDenied): + actionlogger.error('Couldn\'t set nice value of ' + str(p.pid) + ' to ' + str(nice) + ' -- current value is ' + str(p.nice())) self.nicevalues[tid]=0 return p @@ -539,9 +582,12 @@ def monitor(self, process_list): psutilProcs = [ proc ] # use psutil for CPU measurement psutilProcs = psutilProcs + proc.children(recursive=True) - except (psutil.NoSuchProcess, psutil.AccessDenied): + except (psutil.NoSuchProcess): continue + except (psutil.AccessDenied, PermissionError): + psutilProcs = psutilProcs + getChildProcs(pid) + # accumulate total metrics (CPU, memory) totalCPU = 0. totalPSS = 0.