diff --git a/MC/bin/o2_dpg_workflow_runner.py b/MC/bin/o2_dpg_workflow_runner.py index ce47f184a..bcd6e098c 100755 --- a/MC/bin/o2_dpg_workflow_runner.py +++ b/MC/bin/o2_dpg_workflow_runner.py @@ -40,6 +40,40 @@ def setup_logger(name, log_file, level=logging.INFO): metriclogger = setup_logger('pipeline_metric_logger', 'pipeline_metric.log') +# A fallback solution to getting all child procs +# in case psutil has problems (PermissionError). +# It returns the same list as psutil.children(recursive=True). +def getChildProcs(basepid): + cmd=''' + childprocs() { + local parent=$1 + if [ ! "$2" ]; then + child_pid_list="" + fi + if [ "$parent" ] ; then + child_pid_list="$child_pid_list $parent" + for childpid in $(pgrep -P ${parent}); do + childprocs $childpid "nottoplevel" + done; + fi + # return via a string list (only if toplevel) + if [ ! "$2" ]; then + echo "${child_pid_list}" + fi + } + ''' + cmd = cmd + '\n' + 'childprocs ' + str(basepid) + output = subprocess.check_output(cmd, shell=True) + plist = [] + for p in output.strip().split(): + try: + proc=psutil.Process(int(p)) + except psutil.NoSuchProcess: + continue + + plist.append(proc) + return plist + # # Code section to find all topological orderings # of a DAG. This is used to know when we can schedule @@ -363,20 +397,28 @@ def __init__(self, workflowfile, args, jmax=100): def SIGHandler(self, signum, frame): # basically forcing shut down of all child processes actionlogger.info("Signal " + str(signum) + " caught") - procs = psutil.Process().children(recursive=True) + try: + procs = psutil.Process().children(recursive=True) + except (psutil.NoSuchProcess): + pass + except (psutil.AccessDenied, PermissionError): + procs = getChildProcs(os.getpid()) + for p in procs: actionlogger.info("Terminating " + str(p)) try: - p.terminate() + p.terminate() except (psutil.NoSuchProcess, psutil.AccessDenied): - pass + pass + gone, alive = psutil.wait_procs(procs, timeout=3) for p in alive: - actionlogger.info("Killing " + str(p)) try: - p.kill() + actionlogger.info("Killing " + str(p)) + p.kill() except (psutil.NoSuchProcess, psutil.AccessDenied): - pass + pass + exit (1) def getallrequirements(self, t): @@ -434,6 +476,7 @@ def submit(self, tid, nice=0): p.nice(nice) self.nicevalues[tid]=nice except (psutil.NoSuchProcess, psutil.AccessDenied): + actionlogger.error('Couldn\'t set nice value of ' + str(p.pid) + ' to ' + str(nice) + ' -- current value is ' + str(p.nice())) self.nicevalues[tid]=0 return p @@ -539,9 +582,12 @@ def monitor(self, process_list): psutilProcs = [ proc ] # use psutil for CPU measurement psutilProcs = psutilProcs + proc.children(recursive=True) - except (psutil.NoSuchProcess, psutil.AccessDenied): + except (psutil.NoSuchProcess): continue + except (psutil.AccessDenied, PermissionError): + psutilProcs = psutilProcs + getChildProcs(pid) + # accumulate total metrics (CPU, memory) totalCPU = 0. totalPSS = 0.