Skip to content

Commit

Permalink
pipeline runner: psutil Stability fixes and workarounds
Browse files Browse the repository at this point in the history
It turns out that psutil.children() has not enough
priviledges to determine the list of child processes
on some GRID Unixes.

In such cases, we now use a custom solution based on
a bash function.
  • Loading branch information
sawenzel committed Mar 3, 2021
1 parent 6b1f33f commit 61ae62b
Showing 1 changed file with 53 additions and 7 deletions.
60 changes: 53 additions & 7 deletions MC/bin/o2_dpg_workflow_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,40 @@ def setup_logger(name, log_file, level=logging.INFO):
metriclogger = setup_logger('pipeline_metric_logger', 'pipeline_metric.log')


# A fallback solution to getting all child procs
# in case psutil has problems (PermissionError).
# It returns the same list as psutil.children(recursive=True).
def getChildProcs(basepid):
cmd='''
childprocs() {
local parent=$1
if [ ! "$2" ]; then
child_pid_list=""
fi
if [ "$parent" ] ; then
child_pid_list="$child_pid_list $parent"
for childpid in $(pgrep -P ${parent}); do
childprocs $childpid "nottoplevel"
done;
fi
# return via a string list (only if toplevel)
if [ ! "$2" ]; then
echo "${child_pid_list}"
fi
}
'''
cmd = cmd + '\n' + 'childprocs ' + str(basepid)
output = subprocess.check_output(cmd, shell=True)
plist = []
for p in output.strip().split():
try:
proc=psutil.Process(int(p))
except psutil.NoSuchProcess:
continue

plist.append(proc)
return plist

#
# Code section to find all topological orderings
# of a DAG. This is used to know when we can schedule
Expand Down Expand Up @@ -363,20 +397,28 @@ def __init__(self, workflowfile, args, jmax=100):
def SIGHandler(self, signum, frame):
# basically forcing shut down of all child processes
actionlogger.info("Signal " + str(signum) + " caught")
procs = psutil.Process().children(recursive=True)
try:
procs = psutil.Process().children(recursive=True)
except (psutil.NoSuchProcess):
pass
except (psutil.AccessDenied, PermissionError):
procs = getChildProcs(os.getpid())

for p in procs:
actionlogger.info("Terminating " + str(p))
try:
p.terminate()
p.terminate()
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass
pass

gone, alive = psutil.wait_procs(procs, timeout=3)
for p in alive:
actionlogger.info("Killing " + str(p))
try:
p.kill()
actionlogger.info("Killing " + str(p))
p.kill()
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass
pass

exit (1)

def getallrequirements(self, t):
Expand Down Expand Up @@ -434,6 +476,7 @@ def submit(self, tid, nice=0):
p.nice(nice)
self.nicevalues[tid]=nice
except (psutil.NoSuchProcess, psutil.AccessDenied):
actionlogger.error('Couldn\'t set nice value of ' + str(p.pid) + ' to ' + str(nice) + ' -- current value is ' + str(p.nice()))
self.nicevalues[tid]=0
return p

Expand Down Expand Up @@ -539,9 +582,12 @@ def monitor(self, process_list):
psutilProcs = [ proc ]
# use psutil for CPU measurement
psutilProcs = psutilProcs + proc.children(recursive=True)
except (psutil.NoSuchProcess, psutil.AccessDenied):
except (psutil.NoSuchProcess):
continue

except (psutil.AccessDenied, PermissionError):
psutilProcs = psutilProcs + getChildProcs(pid)

# accumulate total metrics (CPU, memory)
totalCPU = 0.
totalPSS = 0.
Expand Down

0 comments on commit 61ae62b

Please sign in to comment.