Skip to content

Commit

Permalink
longest task first scheduling
Browse files Browse the repository at this point in the history
  • Loading branch information
JinZhou5042 committed Aug 22, 2024
1 parent 6403a31 commit 6ef5fef
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 13 deletions.
39 changes: 26 additions & 13 deletions taskvine/src/bindings/python3/ndcctools/taskvine/dask_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,15 @@
from .task import PythonTask
from .task import FunctionCall
from .dask_dag import DaskVineDag
from .cvine import VINE_TEMP, vine_task_set_priority
from .cvine import VINE_TEMP

import contextlib
import cloudpickle
import os
import time
from uuid import uuid4
import random
import contextlib
import cloudpickle
from uuid import uuid4
from collections import defaultdict

try:
import rich
Expand Down Expand Up @@ -170,6 +171,7 @@ def get(self, dsk, keys, *,
self.wrapper = wrapper
self.wrapper_proc = wrapper_proc
self.prune_files = prune_files
self.category_execution_time = defaultdict(list)

if submit_per_cycle is not None and submit_per_cycle < 1:
submit_per_cycle = None
Expand Down Expand Up @@ -273,6 +275,7 @@ def _dask_execute(self, dsk, keys):
print(f"{t.key} ran on {t.hostname}")

if t.successful():
self.category_execution_time[t.category].append(t.execution_time)
result_file = DaskVineFile(t.output_file, t.key, dag, self.task_mode)
rs = dag.set_result(t.key, result_file)
self._enqueue_dask_calls(dag, tag, rs, self.retries, enqueued_calls)
Expand Down Expand Up @@ -334,42 +337,52 @@ def _enqueue_dask_calls(self, dag, tag, rs, retries, enqueued_calls):
if lazy and self.checkpoint_fn:
lazy = self.checkpoint_fn(dag, k)

# each task has a category name
category = self.category_name(sexpr)
category_avg_execution_time = 0
if len(self.category_execution_time[category]):
category_avg_execution_time = sum(self.category_execution_time[category]) / len(self.category_execution_time[category])

task_depth = dag.depth_of(k)
if self.scheduling_mode == 'random':
priority = round(random.uniform(-1, 1), 10)
elif self.scheduling_mode == 'depth-first':
priority = task_depth
elif self.scheduling_mode == 'breadth-first':
priority = -task_depth
elif self.scheduling_mode == 'longest-first':
priority = category_avg_execution_time
elif self.scheduling_mode == 'shortest-first':
priority = -category_avg_execution_time
elif self.scheduling_mode == 'FIFO':
priority = -round(time.time(), 6)
elif self.scheduling_mode == 'LIFO':
priority = round(time.time(), 6)
else:
priority = 0
cat = self.category_name(sexpr)

if self.task_mode == 'tasks':
if cat not in self._categories_known:
if category not in self._categories_known:
if self.resources:
self.set_category_resources_max(cat, self.resources)
self.set_category_resources_max(category, self.resources)
if self.resources_mode:
self.set_category_mode(cat, self.resources_mode)
self.set_category_mode(category, self.resources_mode)

if not self._categories_known:
self.enable_monitoring()
self._categories_known.add(cat)
self._categories_known.add(category)

t = PythonTaskDask(self,
dag, k, sexpr,
category=cat,
category=category,
environment=self.environment,
extra_files=self.extra_files,
env_vars=self.env_vars,
retries=retries,
worker_transfers=lazy,
wrapper=self.wrapper)

vine_task_set_priority(t._task, priority)
t.set_priority(priority)
if self.env_per_task:
t.set_command(
f"mkdir envdir && tar -xf {self._environment_name} -C envdir && envdir/bin/run_in_env {t._command}")
Expand All @@ -381,13 +394,13 @@ def _enqueue_dask_calls(self, dag, tag, rs, retries, enqueued_calls):
if self.task_mode == 'function-calls':
t = FunctionCallDask(self,
dag, k, sexpr,
category=cat,
category=category,
extra_files=self.extra_files,
retries=retries,
worker_transfers=lazy,
wrapper=self.wrapper)

vine_task_set_priority(t._task, priority)
t.set_priority(priority)
t.set_tag(tag) # tag that identifies this dag

enqueued_calls.append(t)
Expand Down
7 changes: 7 additions & 0 deletions taskvine/src/bindings/python3/ndcctools/taskvine/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -811,6 +811,13 @@ def resources_allocated(self):
return None
return cvine.vine_task_get_resources(self._task, "allocated")

##
# Get the execution time of the task in seconds.
#
@property
def execution_time(self):
return cvine.vine_task_get_execution_time(self._task) / 1e6

##
# Adds inputs for nopen library and rules file and sets LD_PRELOAD
#
Expand Down
7 changes: 7 additions & 0 deletions taskvine/src/manager/taskvine.h
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,13 @@ regardless of the priority.

void vine_task_set_priority(struct vine_task *t, double priority);

/** Get the actual execution time of the task.
execution_time = t->time_workers_execute_last_end - t->time_workers_execute_last_start.
@param t A task object.
@return The actual execution time of the task in seconds.
*/
double vine_task_get_execution_time(struct vine_task *t);

/** Specify an environment variable to be added to the task.
@param t A task object
@param name Name of the variable.
Expand Down
5 changes: 5 additions & 0 deletions taskvine/src/manager/vine_task.c
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,11 @@ void vine_task_set_priority(struct vine_task *t, double priority)
t->priority = priority;
}

double vine_task_get_execution_time(struct vine_task *t)
{
return t->time_workers_execute_last_end - t->time_workers_execute_last_start;
}

int vine_task_set_monitor_output(struct vine_task *t, const char *monitor_output_directory)
{

Expand Down

0 comments on commit 6ef5fef

Please sign in to comment.