Skip to content

Commit

Permalink
[WFRunner] Handle resource limits and CPU better
Browse files Browse the repository at this point in the history
* Account for relative CPU factor in case of sampling
  Studies have shown that being able to backfill tasks can have a
  difference for CPU efficiency, especially for transport.
  That is in particular important for high-efficiency jobs such as
  high-interaction-rate pp simulations.

* Abort by default if estimated resources exceed limits

* Run anyway, if --optimistic-resources is passed
  * Fix: Actually reset the overestimated resources to the limits as
    otherwise the runner would silently quit when nothing else can be
    done.

* In case of dynamically sampled resources and if a corresponding task
  has been run already:
  Reset the assigned resources to the limits if they exceed the
  boundaries.
  • Loading branch information
Benedikt Volkel committed Mar 17, 2024
1 parent 0aee2a3 commit c01d0ad
Showing 1 changed file with 52 additions and 20 deletions.
72 changes: 52 additions & 20 deletions MC/bin/o2_dpg_workflow_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -535,13 +535,16 @@ class TaskResources:
"""
Container holding resources of a single task
"""
def __init__(self, tid, name, cpu, mem, resource_boundaries):
def __init__(self, tid, name, cpu, cpu_relative, mem, resource_boundaries):
# the task ID belonging to these resources
self.tid = tid
self.name = name
# original CPUs/MEM assigned (persistent)
self.cpu_assigned_original = cpu
self.mem_assigned_original = mem
# relative CPU, to be multiplied with sampled CPU; set by the user, e.g. to allow to backfill tasks
# only takes effect when sampling resources; persistent
self.cpu_relative = cpu_relative if cpu_relative else 1
# CPUs/MEM assigned (transient)
self.cpu_assigned = cpu
self.mem_assigned = mem
Expand Down Expand Up @@ -571,6 +574,31 @@ def __init__(self, tid, name, cpu, mem, resource_boundaries):
def is_done(self):
return self.time_collect and not self.booked

def is_within_limits(self):
"""
Check if assigned resources respect limits
"""
cpu_within_limits = True
mem_within_limits = True
if self.cpu_assigned > self.resource_boundaries.cpu_limit:
cpu_within_limits = False
actionlogger.warning("CPU of task %s exceeds limits %d > %d", self.name, self.cpu_assigned, self.resource_boundaries.cpu_limit)
if self.cpu_assigned > self.resource_boundaries.mem_limit:
mem_within_limits = False
actionlogger.warning("MEM of task %s exceeds limits %d > %d", self.name, self.cpu_assigned, self.resource_boundaries.cpu_limit)
return cpu_within_limits and mem_within_limits

def limit_resources(self, cpu_limit=None, mem_limit=None):
"""
Limit resources of this specific task
"""
if not cpu_limit:
cpu_limit = self.resource_boundaries.cpu_limit
if not mem_limit:
mem_limit = self.resource_boundaries.mem_limit
self.cpu_assigned = min(self.cpu_assigned, cpu_limit)
self.mem_assigned = min(self.mem_assigned, mem_limit)

def add(self, time_passed, cpu, mem):
"""
Brief interface to add resources that were measured after time_passed
Expand Down Expand Up @@ -610,22 +638,23 @@ def sample_resources(self):
# This task ran already with the assigned resources, so let's set it to the limit
if cpu_sampled > self.resource_boundaries.cpu_limit:
actionlogger.warning("Sampled CPU (%.2f) exceeds assigned CPU limit (%.2f)", cpu_sampled, self.resource_boundaries.cpu_limit)
cpu_sampled = self.resource_boundaries.cpu_limit
elif cpu_sampled < 0:
actionlogger.debug("Sampled CPU for %s is %.2f < 0, setting to previously assigned value %.2f", self.name, cpu_sampled, self.cpu_assigned)
cpu_sampled = self.cpu_assigned

if mem_sampled > self.resource_boundaries.mem_limit:
actionlogger.warning("Sampled MEM (%.2f) exceeds assigned MEM limit (%.2f)", mem_sampled, self.resource_boundaries.mem_limit)
mem_sampled = self.resource_boundaries.mem_limit

if mem_sampled <= 0:
elif mem_sampled <= 0:
actionlogger.debug("Sampled memory for %s is %.2f <= 0, setting to previously assigned value %.2f", self.name, mem_sampled, self.mem_assigned)
mem_sampled = self.mem_assigned
if cpu_sampled < 0:
actionlogger.debug("Sampled CPU for %s is %.2f < 0, setting to previously assigned value %.2f", self.name, cpu_sampled, self.cpu_assigned)
cpu_sampled = self.cpu_assigned

for res in self.related_tasks:
if res.is_done or res.booked:
continue
res.cpu_assigned = cpu_sampled
res.cpu_assigned = cpu_sampled * res.cpu_relative
res.mem_assigned = mem_sampled
# This task has been run before, stay optimistic and limit the resources in case the sampled ones exceed limits
res.limit_resources()


class ResourceManager:
Expand Down Expand Up @@ -676,19 +705,18 @@ def __init__(self, cpu_limit, mem_limit, procs_parallel_max=100, dynamic_resourc
# add 19 to get nice value of low-priority tasks
self.nice_backfill = self.nice_default + 19

def add_task_resources(self, name, related_tasks_name, cpu, mem, semaphore_string=None):
def add_task_resources(self, name, related_tasks_name, cpu, cpu_relative, mem, semaphore_string=None):
"""
Construct and Add a new TaskResources object
"""
resources = TaskResources(len(self.resources), name, cpu, mem, self.resource_boundaries)
if cpu > self.resource_boundaries.cpu_limit or mem > self.resource_boundaries.mem_limit:
actionlogger.warning(f"Resource estimates of id {len(self.resources)} overestimates limits, CPU limit: {self.resource_boundaries.cpu_limit}, MEM limit: {self.resource_boundaries.mem_limit}; might not run")
if not self.resource_boundaries.optimistic_resources:
# exit if we don't dare to try
print(f"Resources of task {name} are exceeding the boundaries.\nCPU: {cpu} (estimate) vs. {self.resource_boundaries.cpu_limit} (boundary)\nMEM: {mem} (estimated) vs. {self.resource_boundaries.mem_limit} (boundary).")
exit(1)
# or we do dare, let's see what happens...
actionlogger.info("We will try to run this task anyway with maximum available resources")
resources = TaskResources(len(self.resources), name, cpu, cpu_relative, mem, self.resource_boundaries)
if not resources.is_within_limits() and not self.resource_boundaries.optimistic_resources:
# exit if we don't dare to try
print(f"Resources of task {name} are exceeding the boundaries.\nCPU: {cpu} (estimate) vs. {self.resource_boundaries.cpu_limit} (boundary)\nMEM: {mem} (estimated) vs. {self.resource_boundaries.mem_limit} (boundary).")
print("Pass --optimistic-resources to the runner to attempt the run anyway.")
exit(1)
# if we get here, either all is good or the user decided to be optimistic and we limit the resources, by default to the given CPU and mem limits.
resources.limit_resources()

self.resources.append(resources)
# do the following to have the same Semaphore object for all corresponding TaskResources so that we do not need a lookup
Expand Down Expand Up @@ -876,7 +904,11 @@ def __init__(self, workflowfile, args, jmax=100):
for task in self.workflowspec['stages']:
# ...and add all initial resource estimates
global_task_name = self.get_global_task_name(task["name"])
self.resource_manager.add_task_resources(task["name"], global_task_name, float(task["resources"]["cpu"]), float(task["resources"]["mem"]), task.get("semaphore"))
try:
cpu_relative = float(task["resources"]["relative_cpu"])
except TypeError:
cpu_relative = 1
self.resource_manager.add_task_resources(task["name"], global_task_name, float(task["resources"]["cpu"]), cpu_relative, float(task["resources"]["mem"]), task.get("semaphore"))

self.procstatus = { tid:'ToDo' for tid in range(len(self.workflowspec['stages'])) }
self.taskneeds= { t:set(self.getallrequirements(t)) for t in self.taskuniverse }
Expand Down

0 comments on commit c01d0ad

Please sign in to comment.