diff --git a/taskvine/src/bindings/python3/ndcctools/taskvine/dask_executor.py b/taskvine/src/bindings/python3/ndcctools/taskvine/dask_executor.py index 2c79b47ffb..d9b21a4cd8 100644 --- a/taskvine/src/bindings/python3/ndcctools/taskvine/dask_executor.py +++ b/taskvine/src/bindings/python3/ndcctools/taskvine/dask_executor.py @@ -12,14 +12,15 @@ from .task import PythonTask from .task import FunctionCall from .dask_dag import DaskVineDag -from .cvine import VINE_TEMP, vine_task_set_priority +from .cvine import VINE_TEMP -import contextlib -import cloudpickle import os import time -from uuid import uuid4 import random +import contextlib +import cloudpickle +from uuid import uuid4 +from collections import defaultdict try: import rich @@ -170,6 +171,7 @@ def get(self, dsk, keys, *, self.wrapper = wrapper self.wrapper_proc = wrapper_proc self.prune_files = prune_files + self.category_execution_time = defaultdict(list) if submit_per_cycle is not None and submit_per_cycle < 1: submit_per_cycle = None @@ -273,6 +275,7 @@ def _dask_execute(self, dsk, keys): print(f"{t.key} ran on {t.hostname}") if t.successful(): + self.category_execution_time[t.category].append(t.execution_time) result_file = DaskVineFile(t.output_file, t.key, dag, self.task_mode) rs = dag.set_result(t.key, result_file) self._enqueue_dask_calls(dag, tag, rs, self.retries, enqueued_calls) @@ -334,6 +337,12 @@ def _enqueue_dask_calls(self, dag, tag, rs, retries, enqueued_calls): if lazy and self.checkpoint_fn: lazy = self.checkpoint_fn(dag, k) + # each task has a category name + category = self.category_name(sexpr) + category_avg_execution_time = 0 + if len(self.category_execution_time[category]): + category_avg_execution_time = sum(self.category_execution_time[category]) / len(self.category_execution_time[category]) + task_depth = dag.depth_of(k) if self.scheduling_mode == 'random': priority = round(random.uniform(-1, 1), 10) @@ -341,27 +350,31 @@ def _enqueue_dask_calls(self, dag, tag, rs, retries, enqueued_calls): priority = task_depth elif self.scheduling_mode == 'breadth-first': priority = -task_depth + elif self.scheduling_mode == 'longest-first': + priority = category_avg_execution_time + elif self.scheduling_mode == 'shortest-first': + priority = -category_avg_execution_time elif self.scheduling_mode == 'FIFO': priority = -round(time.time(), 6) elif self.scheduling_mode == 'LIFO': priority = round(time.time(), 6) else: priority = 0 - cat = self.category_name(sexpr) + if self.task_mode == 'tasks': - if cat not in self._categories_known: + if category not in self._categories_known: if self.resources: - self.set_category_resources_max(cat, self.resources) + self.set_category_resources_max(category, self.resources) if self.resources_mode: - self.set_category_mode(cat, self.resources_mode) + self.set_category_mode(category, self.resources_mode) if not self._categories_known: self.enable_monitoring() - self._categories_known.add(cat) + self._categories_known.add(category) t = PythonTaskDask(self, dag, k, sexpr, - category=cat, + category=category, environment=self.environment, extra_files=self.extra_files, env_vars=self.env_vars, @@ -369,7 +382,7 @@ def _enqueue_dask_calls(self, dag, tag, rs, retries, enqueued_calls): worker_transfers=lazy, wrapper=self.wrapper) - vine_task_set_priority(t._task, priority) + t.set_priority(priority) if self.env_per_task: t.set_command( f"mkdir envdir && tar -xf {self._environment_name} -C envdir && envdir/bin/run_in_env {t._command}") @@ -381,13 +394,13 @@ def _enqueue_dask_calls(self, dag, tag, rs, retries, enqueued_calls): if self.task_mode == 'function-calls': t = FunctionCallDask(self, dag, k, sexpr, - category=cat, + category=category, extra_files=self.extra_files, retries=retries, worker_transfers=lazy, wrapper=self.wrapper) - vine_task_set_priority(t._task, priority) + t.set_priority(priority) t.set_tag(tag) # tag that identifies this dag enqueued_calls.append(t) diff --git a/taskvine/src/bindings/python3/ndcctools/taskvine/task.py b/taskvine/src/bindings/python3/ndcctools/taskvine/task.py index 04b6dff7cc..0b0121e3b8 100644 --- a/taskvine/src/bindings/python3/ndcctools/taskvine/task.py +++ b/taskvine/src/bindings/python3/ndcctools/taskvine/task.py @@ -811,6 +811,13 @@ def resources_allocated(self): return None return cvine.vine_task_get_resources(self._task, "allocated") + ## + # Get the execution time of the task in seconds. + # + @property + def execution_time(self): + return cvine.vine_task_get_execution_time(self._task) / 1e6 + ## # Adds inputs for nopen library and rules file and sets LD_PRELOAD # diff --git a/taskvine/src/manager/taskvine.h b/taskvine/src/manager/taskvine.h index 8bb5f772ab..4e61a5c05b 100644 --- a/taskvine/src/manager/taskvine.h +++ b/taskvine/src/manager/taskvine.h @@ -445,6 +445,13 @@ regardless of the priority. void vine_task_set_priority(struct vine_task *t, double priority); +/** Get the actual execution time of the task. +execution_time = t->time_workers_execute_last_end - t->time_workers_execute_last_start. +@param t A task object. +@return The actual execution time of the task in seconds. +*/ +double vine_task_get_execution_time(struct vine_task *t); + /** Specify an environment variable to be added to the task. @param t A task object @param name Name of the variable. diff --git a/taskvine/src/manager/vine_task.c b/taskvine/src/manager/vine_task.c index ea1307185a..b99e7c7d7d 100644 --- a/taskvine/src/manager/vine_task.c +++ b/taskvine/src/manager/vine_task.c @@ -655,6 +655,11 @@ void vine_task_set_priority(struct vine_task *t, double priority) t->priority = priority; } +double vine_task_get_execution_time(struct vine_task *t) +{ + return t->time_workers_execute_last_end - t->time_workers_execute_last_start; +} + int vine_task_set_monitor_output(struct vine_task *t, const char *monitor_output_directory) {