diff --git a/taskvine/src/bindings/python3/ndcctools/taskvine/dask_executor.py b/taskvine/src/bindings/python3/ndcctools/taskvine/dask_executor.py index e2fed570a6..89157509d0 100644 --- a/taskvine/src/bindings/python3/ndcctools/taskvine/dask_executor.py +++ b/taskvine/src/bindings/python3/ndcctools/taskvine/dask_executor.py @@ -176,8 +176,8 @@ def get(self, dsk, keys, *, self.wrapper_proc = wrapper_proc self.prune_files = prune_files self.category_execution_time = defaultdict(list) - self.max_priority = 1e100 - self.min_priority = -1e100 + self.max_priority = float('inf') + self.min_priority = float('-inf') if submit_per_cycle is not None and submit_per_cycle < 1: submit_per_cycle = None @@ -350,8 +350,10 @@ def _enqueue_dask_calls(self, dag, tag, rs, retries, enqueued_calls): if self.scheduling_mode == 'random': priority = random.randint(self.min_priority, self.max_priority) elif self.scheduling_mode == 'depth-first': + # dig more information about different kinds of tasks priority = task_depth elif self.scheduling_mode == 'breadth-first': + # prefer to start all branches as soon as possible priority = -task_depth elif self.scheduling_mode == 'longest-first': # if no tasks have been executed in this category, set a high priority so that we know more information about each category @@ -360,8 +362,10 @@ def _enqueue_dask_calls(self, dag, tag, rs, retries, enqueued_calls): # if no tasks have been executed in this category, set a high priority so that we know more information about each category priority = -sum(self.category_execution_time[cat]) / len(self.category_execution_time[cat]) if len(self.category_execution_time[cat]) else self.max_priority elif self.scheduling_mode == 'FIFO': + # first in first out, the default behavior priority = -round(time.time(), 6) elif self.scheduling_mode == 'LIFO': + # last in first out, the opposite of FIFO priority = round(time.time(), 6) elif self.scheduling_mode == 'largest-input-first': # best for saving disk space (with pruing) @@ -667,6 +671,7 @@ def __init__(self, m, self.set_category(category) if worker_transfers: self.enable_temp_output() + if extra_files: for f, name in extra_files.items(): self.add_input(f, name)