diff --git a/MC/bin/o2_dpg_workflow_runner.py b/MC/bin/o2_dpg_workflow_runner.py index 3c826476a..cfd3b2ce4 100755 --- a/MC/bin/o2_dpg_workflow_runner.py +++ b/MC/bin/o2_dpg_workflow_runner.py @@ -829,6 +829,44 @@ def ok_to_submit_backfill(res, backfill_cpu_factor=1.5, backfill_mem_factor=1.5) break + def adjust_command(self, tid, c): + """ + Adjust a command with reousrces it should take + + Some tasks can be configured via the command line that is run. + If possible and flagged in the command line, inject here the resources that should be taken + + Currently, only CPU + """ + # this is the regex to look for + regex_string = "(!WFR_CPU:[1-9][0-9]*!)" + # tokens around the regex + splits = re.split(regex_string, c) + # all matches between the above tokens + matches = re.findall(regex_string, c) + if not splits: + # simply return the original command line, no match found + return c + res = self.resources[tid] + # get the assigned CPU + cpu = round(res.cpu_assigned) + # here we assemble the string to be returned + c_final = [] + for sp, ma in zip(splits, matches): + if res.updated: + # take the updated assigned CPU value for this task, but only if it has been updated already + cpu_this = cpu + else: + # otherwise, extract the default value here + cpu_this = ma.strip("!").split(":")[-1] + # put the next split token followed by the CPU + c_final.append(sp) + c_final.append(str(cpu_this)) + # add the final split token and return + c_final.append(splits[-1]) + return " ".join(c_final) + + class WorkflowExecutor: # Constructor def __init__(self, workflowfile, args, jmax=100): @@ -1042,6 +1080,7 @@ def submit(self, tid, nice): """ actionlogger.debug("Submitting task " + str(self.idtotask[tid]) + " with nice value " + str(nice)) c = self.workflowspec['stages'][tid]['cmd'] + c = self.resource_manager.adjust_command(tid, c) workdir = self.workflowspec['stages'][tid]['cwd'] if workdir: if os.path.exists(workdir) and not os.path.isdir(workdir):