Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vine: daskvine priority scheduling #3923

24 changes: 20 additions & 4 deletions taskvine/src/bindings/python3/ndcctools/taskvine/dask_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def __init__(self, dsk, low_memory_mode=False):
self._pending_parents_of = defaultdict(lambda: set())

# key->depth. The shallowest level the key is found
self._depth_of = defaultdict(lambda: float('inf'))
self._depth_of = defaultdict(lambda: 0)

# target keys that the dag should compute
self._targets = set()
Expand All @@ -102,17 +102,33 @@ def depth_of(self, key):
def initialize_graph(self):
for key, sexpr in self._working_graph.items():
self.set_relations(key, sexpr)
for key, sexpr in self._working_graph.items():
self.set_depth(key)

def find_dependencies(self, sexpr, depth=0):
def find_dependencies(self, sexpr):
dependencies = set()
if self.graph_keyp(sexpr):
dependencies.add(sexpr)
self._depth_of[sexpr] = min(depth, self._depth_of[sexpr])
elif not DaskVineDag.symbolp(sexpr):
for sub in sexpr:
dependencies.update(self.find_dependencies(sub, depth + 1))
dependencies.update(self.find_dependencies(sub))
return dependencies

def set_depth(self, key):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about this change. Didn't the old way of computing depth worked?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@btovar No, the old way didn't give me the right depth.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In which way it didn't work? Do you have an example? Is it a different definition of graph depth?

Copy link
Member Author

@JinZhou5042 JinZhou5042 Nov 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tested it about 2 months ago, my impression is that the depth of every task in the graph was always 0 or 1 or something fixed, and then I just wrote a new function to calculate the depth.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It did work for me when I was testing checkpoints. Please check again because it may indicate that something else in the code broke. Also, it is a much easier code to follow, which may help with dealing with future bugs.

if key not in self._children_of or not self._children_of[key]:
self._depth_of[key] = 1
return 1

max_children_depth = 0
for child in self._children_of[key]:
if child not in self._depth_of:
child_depth = self.set_depth(child)
else:
child_depth = self._depth_of[child]
max_children_depth = max(max_children_depth, child_depth)
self._depth_of[key] = max_children_depth + 1
return self._depth_of[key]

def set_relations(self, key, sexpr):
sexpr = self._working_graph[key]
self._children_of[key] = self.find_dependencies(sexpr)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@
from .dask_dag import DaskVineDag
from .cvine import VINE_TEMP

import os
import time
import random
import contextlib
import cloudpickle
import os
from uuid import uuid4
from collections import defaultdict

try:
import rich
Expand Down Expand Up @@ -123,6 +126,7 @@ def get(self, dsk, keys, *,
lib_command=None,
lib_modules=None,
task_mode='tasks',
scheduling_mode='FIFO',
env_per_task=False,
progress_disable=False,
progress_label="[green]tasks",
Expand Down Expand Up @@ -164,12 +168,16 @@ def get(self, dsk, keys, *,
else:
self.lib_modules = hoisting_modules if hoisting_modules else import_modules # Deprecated
self.task_mode = task_mode
self.scheduling_mode = scheduling_mode
self.env_per_task = env_per_task
self.progress_disable = progress_disable
self.progress_label = progress_label
self.wrapper = wrapper
self.wrapper_proc = wrapper_proc
self.prune_files = prune_files
self.category_execution_time = defaultdict(list)
self.max_priority = 1e100
JinZhou5042 marked this conversation as resolved.
Show resolved Hide resolved
self.min_priority = -1e100

if submit_per_cycle is not None and submit_per_cycle < 1:
submit_per_cycle = None
Expand Down Expand Up @@ -273,6 +281,7 @@ def _dask_execute(self, dsk, keys):
print(f"{t.key} ran on {t.hostname}")

if t.successful():
self.category_execution_time[t.category].append(t.execution_time)
result_file = DaskVineFile(t.output_file, t.key, dag, self.task_mode)
rs = dag.set_result(t.key, result_file)
self._enqueue_dask_calls(dag, tag, rs, self.retries, enqueued_calls)
Expand Down Expand Up @@ -334,7 +343,32 @@ def _enqueue_dask_calls(self, dag, tag, rs, retries, enqueued_calls):
if lazy and self.checkpoint_fn:
lazy = self.checkpoint_fn(dag, k)

# each task has a category name
cat = self.category_name(sexpr)

task_depth = dag.depth_of(k)
if self.scheduling_mode == 'random':
priority = random.randint(self.min_priority, self.max_priority)
elif self.scheduling_mode == 'depth-first':
priority = task_depth
elif self.scheduling_mode == 'breadth-first':
priority = -task_depth
elif self.scheduling_mode == 'longest-first':
# if no tasks have been executed in this category, set a high priority so that we know more information about each category
priority = sum(self.category_execution_time[cat]) / len(self.category_execution_time[cat]) if len(self.category_execution_time[cat]) else self.max_priority
elif self.scheduling_mode == 'shortest-first':
# if no tasks have been executed in this category, set a high priority so that we know more information about each category
priority = -sum(self.category_execution_time[cat]) / len(self.category_execution_time[cat]) if len(self.category_execution_time[cat]) else self.max_priority
elif self.scheduling_mode == 'FIFO':
priority = -round(time.time(), 6)
elif self.scheduling_mode == 'LIFO':
priority = round(time.time(), 6)
elif self.scheduling_mode == 'largest-input-first':
# best for saving disk space (with pruing)
priority = sum([len(dag.get_result(c)._file) for c in dag.get_children(k)])
else:
raise ValueError(f"Unknown scheduling mode {self.scheduling_mode}")

if self.task_mode == 'tasks':
if cat not in self._categories_known:
if self.resources:
Expand All @@ -356,6 +390,7 @@ def _enqueue_dask_calls(self, dag, tag, rs, retries, enqueued_calls):
worker_transfers=lazy,
wrapper=self.wrapper)

t.set_priority(priority)
if self.env_per_task:
t.set_command(
f"mkdir envdir && tar -xf {self._environment_name} -C envdir && envdir/bin/run_in_env {t._command}")
Expand All @@ -373,6 +408,7 @@ def _enqueue_dask_calls(self, dag, tag, rs, retries, enqueued_calls):
worker_transfers=lazy,
wrapper=self.wrapper)

t.set_priority(priority)
t.set_tag(tag) # tag that identifies this dag

enqueued_calls.append(t)
Expand Down
7 changes: 7 additions & 0 deletions taskvine/src/bindings/python3/ndcctools/taskvine/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -829,6 +829,13 @@ def resources_allocated(self):
return None
return cvine.vine_task_get_resources(self._task, "allocated")

##
# Get the execution time of the task in seconds.
#
@property
def execution_time(self):
return cvine.vine_task_get_execution_time(self._task) / 1e6

##
# Adds inputs for nopen library and rules file and sets LD_PRELOAD
#
Expand Down
7 changes: 7 additions & 0 deletions taskvine/src/manager/taskvine.h
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,13 @@ regardless of the priority.

void vine_task_set_priority(struct vine_task *t, double priority);

/** Get the actual execution time of the task.
execution_time = t->time_workers_execute_last_end - t->time_workers_execute_last_start.
@param t A task object.
@return The actual execution time of the task in seconds.
*/
double vine_task_get_execution_time(struct vine_task *t);

/** Specify an environment variable to be added to the task.
@param t A task object
@param name Name of the variable.
Expand Down
5 changes: 5 additions & 0 deletions taskvine/src/manager/vine_task.c
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,11 @@ void vine_task_set_priority(struct vine_task *t, double priority)
t->priority = priority;
}

double vine_task_get_execution_time(struct vine_task *t)
{
return t->time_workers_execute_last_end - t->time_workers_execute_last_start;
}

int vine_task_set_monitor_output(struct vine_task *t, const char *monitor_output_directory)
{

Expand Down
Loading