diff --git a/MC/bin/o2_dpg_workflow_runner.py b/MC/bin/o2_dpg_workflow_runner.py index 7765189b0..a9d7ecd6c 100755 --- a/MC/bin/o2_dpg_workflow_runner.py +++ b/MC/bin/o2_dpg_workflow_runner.py @@ -40,6 +40,12 @@ def setup_logger(name, log_file, level=logging.INFO): # second file logger metriclogger = setup_logger('pipeline_metric_logger', 'pipeline_metric.log') +# for debugging without terminal access +# TODO: integrate into standard logger +def send_webhook(hook, t): + if hook!=None: + command="curl -X POST -H 'Content-type: application/json' --data '{\"text\":\" " + str(t) + "\"}' " + str(hook) + " &> /dev/null" + os.system(command) # A fallback solution to getting all child procs # in case psutil has problems (PermissionError). @@ -570,7 +576,6 @@ def stop_pipeline_and_exit(self, process_list): exit(1) - def monitor(self, process_list): self.internalmonitorcounter+=1 if self.internalmonitorcounter % 5 != 0: @@ -650,6 +655,7 @@ def monitor(self, process_list): resources_per_task[tid]={'iter':self.internalmonitorid, 'name':self.idtotask[tid], 'cpu':totalCPU, 'uss':totalUSS/1024./1024., 'pss':totalPSS/1024./1024, 'nice':proc.nice(), 'swap':totalSWAP, 'label':self.workflowspec['stages'][tid]['labels']} metriclogger.info(resources_per_task[tid]) + send_webhook(self.args.webhook, resources_per_task) for r in resources_per_task.values(): if r['nice']==os.nice(0): @@ -679,7 +685,7 @@ def waitforany(self, process_list, finished): if returncode!=None: actionlogger.info ('Task ' + str(pid) + ' ' + str(p[0])+':'+str(self.idtotask[p[0]]) + ' finished with status ' + str(returncode)) # account for cleared resources - if self.nicevalues[p[0]]==0: # --> change for a more robust way + if self.nicevalues[p[0]]==os.nice(0): self.curmembooked-=float(self.maxmemperid[p[0]]) self.curcpubooked-=float(self.cpuperid[p[0]]) else: @@ -865,6 +871,10 @@ def execute(self): finished = [] actionlogger.debug('Sorted current candidates: ' + str([(c,self.idtotask[c]) for c in candidates])) self.try_job_from_candidates(candidates, self.process_list, finished) + if len(candidates) > 0 and len(self.process_list) == 0: + actionlogger.info("Not able to make progress: Nothing scheduled although non-zero candidate set") + send_webhook(self.args.webhook,"Unable to make further progress: Quitting") + break finished_from_started = [] while self.waitforany(self.process_list, finished_from_started): @@ -889,6 +899,7 @@ def execute(self): candidates.append(candid) actionlogger.debug("New candidates " + str( candidates)) + send_webhook(self.args.webhook, "New candidates " + str(candidates)) if len(candidates)==0 and len(self.process_list)==0: break @@ -927,6 +938,7 @@ def execute(self): parser.add_argument('--cgroup', help='Execute pipeline under a given cgroup (e.g., 8coregrid) emulating resource constraints. This m\ ust exist and the tasks file must be writable to with the current user.') parser.add_argument('--stdout-on-failure', action='store_true', help='Print log files of failing tasks to stdout,') +parser.add_argument('--webhook', help=argparse.SUPPRESS) # log some infos to this webhook channel args = parser.parse_args() print (args)