Skip to content

Commit

Permalink
[SimWF] Adjust cmds with resources
Browse files Browse the repository at this point in the history
  • Loading branch information
Benedikt Volkel committed Mar 13, 2024
1 parent fe71ee9 commit d1b7f5c
Showing 1 changed file with 39 additions and 0 deletions.
39 changes: 39 additions & 0 deletions MC/bin/o2_dpg_workflow_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -829,6 +829,44 @@ def ok_to_submit_backfill(res, backfill_cpu_factor=1.5, backfill_mem_factor=1.5)
break


def adjust_command(self, tid, c):
"""
Adjust a command with reousrces it should take
Some tasks can be configured via the command line that is run.
If possible and flagged in the command line, inject here the resources that should be taken
Currently, only CPU
"""
# this is the regex to look for
regex_string = "(!WFR_CPU:[1-9][0-9]*!)"
# tokens around the regex
splits = re.split(regex_string, c)
# all matches between the above tokens
matches = re.findall(regex_string, c)
if not splits:
# simply return the original command line, no match found
return c
res = self.resources[tid]
# get the assigned CPU
cpu = round(res.cpu_assigned)
# here we assemble the string to be returned
c_final = []
for sp, ma in zip(splits, matches):
if res.updated:
# take the updated assigned CPU value for this task, but only if it has been updated already
cpu_this = cpu
else:
# otherwise, extract the default value here
cpu_this = ma.strip("!").split(":")[-1]
# put the next split token followed by the CPU
c_final.append(sp)
c_final.append(str(cpu_this))
# add the final split token and return
c_final.append(splits[-1])
return " ".join(c_final)


class WorkflowExecutor:
# Constructor
def __init__(self, workflowfile, args, jmax=100):
Expand Down Expand Up @@ -1042,6 +1080,7 @@ def submit(self, tid, nice):
"""
actionlogger.debug("Submitting task " + str(self.idtotask[tid]) + " with nice value " + str(nice))
c = self.workflowspec['stages'][tid]['cmd']
c = self.resource_manager.adjust_command(tid, c)
workdir = self.workflowspec['stages'][tid]['cwd']
if workdir:
if os.path.exists(workdir) and not os.path.isdir(workdir):
Expand Down

0 comments on commit d1b7f5c

Please sign in to comment.