Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WFRunner] Handle resource limits and CPU better #1532

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 52 additions & 20 deletions MC/bin/o2_dpg_workflow_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -535,13 +535,16 @@ class TaskResources:
"""
Container holding resources of a single task
"""
def __init__(self, tid, name, cpu, mem, resource_boundaries):
def __init__(self, tid, name, cpu, cpu_relative, mem, resource_boundaries):
# the task ID belonging to these resources
self.tid = tid
self.name = name
# original CPUs/MEM assigned (persistent)
self.cpu_assigned_original = cpu
self.mem_assigned_original = mem
# relative CPU, to be multiplied with sampled CPU; set by the user, e.g. to allow to backfill tasks
# only takes effect when sampling resources; persistent
self.cpu_relative = cpu_relative if cpu_relative else 1
# CPUs/MEM assigned (transient)
self.cpu_assigned = cpu
self.mem_assigned = mem
Expand Down Expand Up @@ -571,6 +574,31 @@ def __init__(self, tid, name, cpu, mem, resource_boundaries):
def is_done(self):
return self.time_collect and not self.booked

def is_within_limits(self):
"""
Check if assigned resources respect limits
"""
cpu_within_limits = True
mem_within_limits = True
if self.cpu_assigned > self.resource_boundaries.cpu_limit:
cpu_within_limits = False
actionlogger.warning("CPU of task %s exceeds limits %d > %d", self.name, self.cpu_assigned, self.resource_boundaries.cpu_limit)
if self.cpu_assigned > self.resource_boundaries.mem_limit:
mem_within_limits = False
actionlogger.warning("MEM of task %s exceeds limits %d > %d", self.name, self.cpu_assigned, self.resource_boundaries.cpu_limit)
return cpu_within_limits and mem_within_limits

def limit_resources(self, cpu_limit=None, mem_limit=None):
"""
Limit resources of this specific task
"""
if not cpu_limit:
cpu_limit = self.resource_boundaries.cpu_limit
if not mem_limit:
mem_limit = self.resource_boundaries.mem_limit
self.cpu_assigned = min(self.cpu_assigned, cpu_limit)
self.mem_assigned = min(self.mem_assigned, mem_limit)

def add(self, time_passed, cpu, mem):
"""
Brief interface to add resources that were measured after time_passed
Expand Down Expand Up @@ -610,22 +638,23 @@ def sample_resources(self):
# This task ran already with the assigned resources, so let's set it to the limit
if cpu_sampled > self.resource_boundaries.cpu_limit:
actionlogger.warning("Sampled CPU (%.2f) exceeds assigned CPU limit (%.2f)", cpu_sampled, self.resource_boundaries.cpu_limit)
cpu_sampled = self.resource_boundaries.cpu_limit
elif cpu_sampled < 0:
actionlogger.debug("Sampled CPU for %s is %.2f < 0, setting to previously assigned value %.2f", self.name, cpu_sampled, self.cpu_assigned)
cpu_sampled = self.cpu_assigned

if mem_sampled > self.resource_boundaries.mem_limit:
actionlogger.warning("Sampled MEM (%.2f) exceeds assigned MEM limit (%.2f)", mem_sampled, self.resource_boundaries.mem_limit)
mem_sampled = self.resource_boundaries.mem_limit

if mem_sampled <= 0:
elif mem_sampled <= 0:
actionlogger.debug("Sampled memory for %s is %.2f <= 0, setting to previously assigned value %.2f", self.name, mem_sampled, self.mem_assigned)
mem_sampled = self.mem_assigned
if cpu_sampled < 0:
actionlogger.debug("Sampled CPU for %s is %.2f < 0, setting to previously assigned value %.2f", self.name, cpu_sampled, self.cpu_assigned)
cpu_sampled = self.cpu_assigned

for res in self.related_tasks:
if res.is_done or res.booked:
continue
res.cpu_assigned = cpu_sampled
res.cpu_assigned = cpu_sampled * res.cpu_relative
res.mem_assigned = mem_sampled
# This task has been run before, stay optimistic and limit the resources in case the sampled ones exceed limits
res.limit_resources()


class ResourceManager:
Expand Down Expand Up @@ -676,19 +705,18 @@ def __init__(self, cpu_limit, mem_limit, procs_parallel_max=100, dynamic_resourc
# add 19 to get nice value of low-priority tasks
self.nice_backfill = self.nice_default + 19

def add_task_resources(self, name, related_tasks_name, cpu, mem, semaphore_string=None):
def add_task_resources(self, name, related_tasks_name, cpu, cpu_relative, mem, semaphore_string=None):
"""
Construct and Add a new TaskResources object
"""
resources = TaskResources(len(self.resources), name, cpu, mem, self.resource_boundaries)
if cpu > self.resource_boundaries.cpu_limit or mem > self.resource_boundaries.mem_limit:
actionlogger.warning(f"Resource estimates of id {len(self.resources)} overestimates limits, CPU limit: {self.resource_boundaries.cpu_limit}, MEM limit: {self.resource_boundaries.mem_limit}; might not run")
if not self.resource_boundaries.optimistic_resources:
# exit if we don't dare to try
print(f"Resources of task {name} are exceeding the boundaries.\nCPU: {cpu} (estimate) vs. {self.resource_boundaries.cpu_limit} (boundary)\nMEM: {mem} (estimated) vs. {self.resource_boundaries.mem_limit} (boundary).")
exit(1)
# or we do dare, let's see what happens...
actionlogger.info("We will try to run this task anyway with maximum available resources")
resources = TaskResources(len(self.resources), name, cpu, cpu_relative, mem, self.resource_boundaries)
if not resources.is_within_limits() and not self.resource_boundaries.optimistic_resources:
# exit if we don't dare to try
print(f"Resources of task {name} are exceeding the boundaries.\nCPU: {cpu} (estimate) vs. {self.resource_boundaries.cpu_limit} (boundary)\nMEM: {mem} (estimated) vs. {self.resource_boundaries.mem_limit} (boundary).")
print("Pass --optimistic-resources to the runner to attempt the run anyway.")
exit(1)
# if we get here, either all is good or the user decided to be optimistic and we limit the resources, by default to the given CPU and mem limits.
resources.limit_resources()

self.resources.append(resources)
# do the following to have the same Semaphore object for all corresponding TaskResources so that we do not need a lookup
Expand Down Expand Up @@ -876,7 +904,11 @@ def __init__(self, workflowfile, args, jmax=100):
for task in self.workflowspec['stages']:
# ...and add all initial resource estimates
global_task_name = self.get_global_task_name(task["name"])
self.resource_manager.add_task_resources(task["name"], global_task_name, float(task["resources"]["cpu"]), float(task["resources"]["mem"]), task.get("semaphore"))
try:
cpu_relative = float(task["resources"]["relative_cpu"])
except TypeError:
cpu_relative = 1
self.resource_manager.add_task_resources(task["name"], global_task_name, float(task["resources"]["cpu"]), cpu_relative, float(task["resources"]["mem"]), task.get("semaphore"))

self.procstatus = { tid:'ToDo' for tid in range(len(self.workflowspec['stages'])) }
self.taskneeds= { t:set(self.getallrequirements(t)) for t in self.taskuniverse }
Expand Down