diff --git a/MC/bin/o2_dpg_workflow_runner.py b/MC/bin/o2_dpg_workflow_runner.py index 3c826476a..9ff7a87ab 100755 --- a/MC/bin/o2_dpg_workflow_runner.py +++ b/MC/bin/o2_dpg_workflow_runner.py @@ -571,6 +571,31 @@ def __init__(self, tid, name, cpu, mem, resource_boundaries): def is_done(self): return self.time_collect and not self.booked + def is_within_limits(self): + """ + Check if assigned resources respect limits + """ + cpu_within_limits = True + mem_within_limits = True + if self.cpu_assigned > self.resource_boundaries.cpu_limit: + cpu_within_limits = False + actionlogger.warning("CPU of task %s exceeds limits %d > %d", self.name, self.cpu_assigned, self.resource_boundaries.cpu_limit) + if self.cpu_assigned > self.resource_boundaries.mem_limit: + mem_within_limits = False + actionlogger.warning("MEM of task %s exceeds limits %d > %d", self.name, self.cpu_assigned, self.resource_boundaries.cpu_limit) + return cpu_within_limits and mem_within_limits + + def limit_resources(self, cpu_limit=None, mem_limit=None): + """ + Limit resources of this specific task + """ + if not cpu_limit: + cpu_limit = self.resource_boundaries.cpu_limit + if not mem_limit: + mem_limit = self.resource_boundaries.mem_limit + self.cpu_assigned = min(self.cpu_assigned, cpu_limit) + self.mem_assigned = min(self.mem_assigned, mem_limit) + def add(self, time_passed, cpu, mem): """ Brief interface to add resources that were measured after time_passed @@ -610,22 +635,23 @@ def sample_resources(self): # This task ran already with the assigned resources, so let's set it to the limit if cpu_sampled > self.resource_boundaries.cpu_limit: actionlogger.warning("Sampled CPU (%.2f) exceeds assigned CPU limit (%.2f)", cpu_sampled, self.resource_boundaries.cpu_limit) - cpu_sampled = self.resource_boundaries.cpu_limit + elif cpu_sampled < 0: + actionlogger.debug("Sampled CPU for %s is %.2f < 0, setting to previously assigned value %.2f", self.name, cpu_sampled, self.cpu_assigned) + cpu_sampled = self.cpu_assigned + if mem_sampled > self.resource_boundaries.mem_limit: actionlogger.warning("Sampled MEM (%.2f) exceeds assigned MEM limit (%.2f)", mem_sampled, self.resource_boundaries.mem_limit) - mem_sampled = self.resource_boundaries.mem_limit - - if mem_sampled <= 0: + elif mem_sampled <= 0: actionlogger.debug("Sampled memory for %s is %.2f <= 0, setting to previously assigned value %.2f", self.name, mem_sampled, self.mem_assigned) mem_sampled = self.mem_assigned - if cpu_sampled < 0: - actionlogger.debug("Sampled CPU for %s is %.2f < 0, setting to previously assigned value %.2f", self.name, cpu_sampled, self.cpu_assigned) - cpu_sampled = self.cpu_assigned + for res in self.related_tasks: if res.is_done or res.booked: continue res.cpu_assigned = cpu_sampled res.mem_assigned = mem_sampled + # This task has been run before, stay optimistic and limit the resources in case the sampled ones exceed limits + res.limit_resources() class ResourceManager: @@ -681,14 +707,12 @@ def add_task_resources(self, name, related_tasks_name, cpu, mem, semaphore_strin Construct and Add a new TaskResources object """ resources = TaskResources(len(self.resources), name, cpu, mem, self.resource_boundaries) - if cpu > self.resource_boundaries.cpu_limit or mem > self.resource_boundaries.mem_limit: - actionlogger.warning(f"Resource estimates of id {len(self.resources)} overestimates limits, CPU limit: {self.resource_boundaries.cpu_limit}, MEM limit: {self.resource_boundaries.mem_limit}; might not run") - if not self.resource_boundaries.optimistic_resources: - # exit if we don't dare to try - print(f"Resources of task {name} are exceeding the boundaries.\nCPU: {cpu} (estimate) vs. {self.resource_boundaries.cpu_limit} (boundary)\nMEM: {mem} (estimated) vs. {self.resource_boundaries.mem_limit} (boundary).") - exit(1) - # or we do dare, let's see what happens... - actionlogger.info("We will try to run this task anyway with maximum available resources") + if not resources.is_within_limits() and not self.resource_boundaries.optimistic_resources: + # exit if we don't dare to try + print(f"Resources of task {name} are exceeding the boundaries.\nCPU: {cpu} (estimate) vs. {self.resource_boundaries.cpu_limit} (boundary)\nMEM: {mem} (estimated) vs. {self.resource_boundaries.mem_limit} (boundary).") + exit(1) + # if we get here, either all is good or the user decided to be optimistic and we limit the resources, by default to the given CPU and mem limits. + resources.limit_resources() self.resources.append(resources) # do the following to have the same Semaphore object for all corresponding TaskResources so that we do not need a lookup