Skip to content

Commit

Permalink
vine: dask executor worker transfers segfault fix (#3831)
Browse files Browse the repository at this point in the history
* vine: tasks and funcalls behave the same wrt undeclaring files

The caller should be in charge of undeclaring outputs.

* vine: renamed lazy_transfers to worker_transfers

* add missing comma

* remove debug statement

* fix typo
  • Loading branch information
btovar committed Jun 14, 2024
1 parent 4e14eb5 commit e14eb6e
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 26 deletions.
31 changes: 16 additions & 15 deletions taskvine/src/bindings/python3/ndcctools/taskvine/dask_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,16 @@ class DaskVine(Manager):
# @param environment A taskvine file representing an environment to run the tasks.
# @param extra_files A dictionary of {taskvine.File: "remote_name"} to add to each
# task.
# @param lazy_transfers Whether to keep intermediate results only at workers (True)
# or to bring back each result to the manager (False, default).
# @param worker_transfers Whether to keep intermediate results only at workers (True, default)
# or to bring back each result to the manager (False).
# True is more IO efficient, but runs the risk of needing to
# recompute results if workers are lost.
# @param env_vars A dictionary of VAR=VALUE environment variables to set per task. A value
# should be either a string, or a function that accepts as arguments the manager
# and task, and that returns a string.
# @param low_memory_mode Split graph vertices to reduce memory needed per function call. It
# removes some of the dask graph optimizations, thus proceed with care.
# @param checkpoint_fn When using lazy_transfers, a predicate with arguments (dag, key)
# @param checkpoint_fn When using worker_transfers, a predicate with arguments (dag, key)
# called before submitting a task. If True, the result is brought back
# to the manager.
# @param resources A dictionary with optional keys of cores, memory and disk (MB)
Expand Down Expand Up @@ -107,7 +107,7 @@ class DaskVine(Manager):
def get(self, dsk, keys, *,
environment=None,
extra_files=None,
lazy_transfers=False,
worker_transfers=True,
env_vars=None,
low_memory_mode=False,
checkpoint_fn=None,
Expand All @@ -127,7 +127,8 @@ def get(self, dsk, keys, *,
progress_label="[green]tasks",
wrapper=None,
wrapper_proc=print,
import_modules=None # Deprecated, use lib_modules
import_modules=None, # Deprecated, use lib_modules
lazy_transfers=True, # Deprecated, use worker_tranfers
):
try:
self.set_property("framework", "dask")
Expand All @@ -140,7 +141,7 @@ def get(self, dsk, keys, *,
self.environment = environment

self.extra_files = extra_files
self.lazy_transfers = lazy_transfers
self.worker_transfers = worker_transfers or lazy_transfers
self.env_vars = env_vars
self.low_memory_mode = low_memory_mode
self.checkpoint_fn = checkpoint_fn
Expand Down Expand Up @@ -320,7 +321,7 @@ def category_name(self, sexpr):
def _enqueue_dask_calls(self, dag, tag, rs, retries, enqueued_calls):
targets = dag.get_targets()
for (k, sexpr) in rs:
lazy = self.lazy_transfers and k not in targets
lazy = self.worker_transfers and k not in targets
if lazy and self.checkpoint_fn:
lazy = self.checkpoint_fn(dag, k)

Expand All @@ -343,7 +344,7 @@ def _enqueue_dask_calls(self, dag, tag, rs, retries, enqueued_calls):
extra_files=self.extra_files,
env_vars=self.env_vars,
retries=retries,
lazy_transfers=lazy,
worker_transfers=lazy,
wrapper=self.wrapper)

if self.env_per_task:
Expand All @@ -360,7 +361,7 @@ def _enqueue_dask_calls(self, dag, tag, rs, retries, enqueued_calls):
category=cat,
extra_files=self.extra_files,
retries=retries,
lazy_transfers=lazy,
worker_transfers=lazy,
wrapper=self.wrapper)

t.set_tag(tag) # tag that identifies this dag
Expand Down Expand Up @@ -490,7 +491,7 @@ class PythonTaskDask(PythonTask):
# @param extra_files Additional files to provide to the task.
# @param env_vars A dictionary of environment variables.
# @param retries Number of times to retry failed task.
# @param lazy_transfers If true, do not return outputs to manager until required.
# @param worker_transfers If true, do not return outputs to manager until required.
# @param wrapper
#
def __init__(self, m,
Expand All @@ -500,7 +501,7 @@ def __init__(self, m,
extra_files=None,
env_vars=None,
retries=5,
lazy_transfers=False,
worker_transfers=False,
wrapper=None):
self._key = key
self._sexpr = sexpr
Expand Down Expand Up @@ -533,7 +534,7 @@ def __init__(self, m,

if category:
self.set_category(category)
if lazy_transfers:
if worker_transfers:
self.enable_temp_output()
if environment:
self.add_environment(environment)
Expand Down Expand Up @@ -592,7 +593,7 @@ class FunctionCallDask(FunctionCall):
# @param resources Resources to be set for a FunctionCall.
# @param extra_files Additional files to provide to the task.
# @param retries Number of times to retry failed task.
# @param lazy_transfers If true, do not return outputs to manager until required.
# @param worker_transfers If true, do not return outputs to manager until required.
#

def __init__(self, m,
Expand All @@ -601,7 +602,7 @@ def __init__(self, m,
resources=None,
extra_files=None,
retries=5,
lazy_transfers=False,
worker_transfers=False,
wrapper=None):

self._key = key
Expand Down Expand Up @@ -632,7 +633,7 @@ def __init__(self, m,

if category:
self.set_category(category)
if lazy_transfers:
if worker_transfers:
self.enable_temp_output()
if extra_files:
for f, name in extra_files.items():
Expand Down
12 changes: 2 additions & 10 deletions taskvine/src/bindings/python3/ndcctools/taskvine/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -878,6 +878,7 @@ def __del__(self):
try:
if self._input_file:
self.manager.undeclare_file(self._input_file)
self._input_file = None
super().__del__()
except TypeError:
# in case the interpreter is shuting down. staging files will be deleted by manager atexit function.
Expand Down Expand Up @@ -1133,26 +1134,17 @@ def output(self):
else:
self._output = FunctionCallNoResult()

self.manager.undeclare_file(self._input_file)
self._input_file = None

self.manager.undeclare_file(self._output_file)
self._output_file = None

self._output_loaded = True
return self._output

# Remove input and output buffers if self.output was not called.
def __del__(self):
try:
if self._input_file:
self.manager.undeclare_file(self._input_file)
self._input_file = None
if self._output_file and not self._tmp_output_enabled:
self.manager.undeclare_file(self._output_file)
self._output_file = None
super().__del__()
except TypeError:
# in case the interpreter is shuting down. staging files will be deleted by manager atexit function.
pass


Expand Down
2 changes: 1 addition & 1 deletion taskvine/src/manager/vine_file_replica_table.c
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ int vine_file_replica_table_replicate(struct vine_manager *m, struct vine_file *
return found;
}

debug(D_VINE, "Found %d workers to holding %s, %d replicas needed", nsources, f->cached_name, to_find);
debug(D_VINE, "Found %d workers holding %s, %d replicas needed", nsources, f->cached_name, to_find);

/* get the elements of set so we can insert new replicas to sources */
struct vine_worker_info **sources_frozen = (struct vine_worker_info **)set_values(sources);
Expand Down

0 comments on commit e14eb6e

Please sign in to comment.