diff --git a/taskvine/src/bindings/python3/ndcctools/taskvine/task.py b/taskvine/src/bindings/python3/ndcctools/taskvine/task.py index c043d62626..353f85981f 100644 --- a/taskvine/src/bindings/python3/ndcctools/taskvine/task.py +++ b/taskvine/src/bindings/python3/ndcctools/taskvine/task.py @@ -126,7 +126,7 @@ def __del__(self): pass @staticmethod - def _determine_mount_flags(watch=False, failure_only=False, success_only=False, strict_input=False, mount_symlink=False): + def _determine_mount_flags(watch=False, failure_only=False, success_only=False, strict_input=False, mount_hardlink=False ): flags = cvine.VINE_TRANSFER_ALWAYS if watch: flags |= cvine.VINE_WATCH @@ -136,8 +136,8 @@ def _determine_mount_flags(watch=False, failure_only=False, success_only=False, flags |= cvine.VINE_SUCCESS_ONLY if strict_input: flags |= cvine.VINE_FIXED_LOCATION - if mount_symlink: - flags |= cvine.VINE_MOUNT_SYMLINK + if mount_hardlink: + flags |= cvine.VINE_MOUNT_HARDLINK return flags @staticmethod @@ -341,12 +341,12 @@ def add_feature(self, name): # >>> f = m.declare_untar(url) # >>> task.add_input(f,"data") # @endcode - def add_input(self, file, remote_name, strict_input=False, mount_symlink=False): + def add_input(self, file, remote_name, strict_input=False, mount_hardlink=False ): # SWIG expects strings if not isinstance(remote_name, str): raise TypeError(f"remote_name {remote_name} is not a str") - flags = Task._determine_mount_flags(strict_input=strict_input, mount_symlink=mount_symlink) + flags = Task._determine_mount_flags(strict_input=strict_input,mount_hardlink=mount_hardlink) if cvine.vine_task_add_input(self._task, file._file, remote_name, flags) == 0: raise ValueError("invalid file description") diff --git a/taskvine/src/manager/Makefile b/taskvine/src/manager/Makefile index 849cb50262..ec78e9d5fa 100644 --- a/taskvine/src/manager/Makefile +++ b/taskvine/src/manager/Makefile @@ -15,6 +15,7 @@ SOURCES = \ vine_task.c \ vine_file.c \ vine_mount.c \ + vine_symlink.c \ vine_txn_log.c \ vine_taskgraph_log.c \ vine_cached_name.c \ diff --git a/taskvine/src/manager/taskvine.h b/taskvine/src/manager/taskvine.h index 2f9c5c683e..80bf9e6da2 100644 --- a/taskvine/src/manager/taskvine.h +++ b/taskvine/src/manager/taskvine.h @@ -38,7 +38,7 @@ expected events. #define VINE_RANDOM_PORT 0 /**< Indicates that any port may be chosen. */ #define VINE_WAIT_FOREVER -1 /**< Timeout value to wait for a task to complete before returning. */ -/** Select optional handling for input and output files: caching, unpacking, watching, etc. **/ +/** Select optional handling for mounting input and output files to the task namespace. */ typedef enum { VINE_TRANSFER_ALWAYS = 0, /**< Always transfer this file when needed. */ @@ -50,8 +50,8 @@ typedef enum { VINE_SUCCESS_ONLY = 8, /**< Only return this output file if the task succeeded. */ VINE_RETRACT_ON_RESET = 16, /**< Remove this file from the mount lists if the task is reset. (TaskVine internal use only.) */ - VINE_MOUNT_SYMLINK = 32, /**< Permit this directory to be mounted via symlink instead of hardlink. */ - VINE_MOUNT_MKDIR = 64 /**< Create this empty output directory in the task sandbox prior to execution. */ + VINE_MOUNT_MKDIR = 64, /**< Create this empty output directory in the task sandbox prior to execution. */ + VINE_MOUNT_HARDLINK = 128, /**< Hard-link this input file (or directory) from the cache to the sandbox. */ } vine_mount_flags_t; /** Control caching and sharing behavior of file objects. **/ @@ -446,6 +446,15 @@ feature. */ void vine_task_add_feature(struct vine_task *t, const char *name); +/** Add a symbolic link to the sandbox namespace of the task. +This can be used to access a file within a shared filesystem, +without causing the file to be transferred using the taskvine caching infrastructure. +@param t A task object; +@param name The name of the symlink; must be a relative path in the task sandbox. +@param target The target of the symlink; must be an absolute path to an outside filesystem. +*/ +void vine_task_add_symlink(struct vine_task *t, const char *name, const char *target); + /** Specify the priority of this task relative to others in the manager. Tasks with a higher priority value run first. If no priority is given, a task is placed at the end of the ready list, regardless of the priority. diff --git a/taskvine/src/manager/vine_manager_put.c b/taskvine/src/manager/vine_manager_put.c index 83dbaeb64f..58612134bd 100644 --- a/taskvine/src/manager/vine_manager_put.c +++ b/taskvine/src/manager/vine_manager_put.c @@ -11,6 +11,7 @@ See the file COPYING for details. #include "vine_file_replica_table.h" #include "vine_mount.h" #include "vine_protocol.h" +#include "vine_symlink.h" #include "vine_task.h" #include "vine_txn_log.h" #include "vine_worker_info.h" @@ -560,6 +561,18 @@ vine_result_code_t vine_manager_put_task( } } + if (t->symlink_list) { + struct vine_symlink *s; + LIST_ITERATE(t->symlink_list, s) + { + char name_encoded[PATH_MAX]; + char target_encoded[PATH_MAX]; + url_encode(s->name, name_encoded, PATH_MAX); + url_encode(s->target, target_encoded, PATH_MAX); + vine_manager_send(q, w, "symlink %s %s\n", name_encoded, target_encoded); + } + } + // vine_manager_send returns the number of bytes sent, or a number less than // zero to indicate errors. We are lazy here, we only check the last // message we sent to the worker (other messages may have failed above). diff --git a/taskvine/src/manager/vine_symlink.c b/taskvine/src/manager/vine_symlink.c new file mode 100644 index 0000000000..88df252edc --- /dev/null +++ b/taskvine/src/manager/vine_symlink.c @@ -0,0 +1,27 @@ + +#include "vine_symlink.h" + +#include "xxmalloc.h" +#include + +struct vine_symlink *vine_symlink_create(const char *name, const char *target) +{ + struct vine_symlink *s = malloc(sizeof(*s)); + s->name = xxstrdup(name); + s->target = xxstrdup(target); + return s; +} + +struct vine_symlink *vine_symlink_copy(struct vine_symlink *s) +{ + return vine_symlink_create(s->name, s->target); +} + +void vine_symlink_delete(struct vine_symlink *s) +{ + if (!s) + return; + free(s->name); + free(s->target); + free(s); +} diff --git a/taskvine/src/manager/vine_symlink.h b/taskvine/src/manager/vine_symlink.h new file mode 100644 index 0000000000..a9420afca1 --- /dev/null +++ b/taskvine/src/manager/vine_symlink.h @@ -0,0 +1,29 @@ +/* +Copyright (C) 2024- The University of Notre Dame +This software is distributed under the GNU General Public License. +See the file COPYING for details. +*/ + +#ifndef VINE_SYMLINK_H +#define VINE_SYMLINK_H + +/* +A vine symlink is a lightweight structure representing a symlink +to be added to the sandbox namespace of a task as it runs. +Note that a symlink is not treated as either an input file +or an output file in the vine infrastructure, because there are +no file contents to cache or move. It is simply an addition +to the task namespace performed just prior to execution. +*/ + +struct vine_symlink { + char *name; + char *target; +}; + +struct vine_symlink * vine_symlink_create( const char *name, const char *target ); +struct vine_symlink * vine_symlink_copy( struct vine_symlink *s ); +void vine_symlink_delete( struct vine_symlink *s ); + +#endif + diff --git a/taskvine/src/manager/vine_task.c b/taskvine/src/manager/vine_task.c index c0ea564b27..1201c137d8 100644 --- a/taskvine/src/manager/vine_task.c +++ b/taskvine/src/manager/vine_task.c @@ -9,6 +9,7 @@ See the file COPYING for details. #include "vine_file.h" #include "vine_manager.h" #include "vine_mount.h" +#include "vine_symlink.h" #include "vine_worker_info.h" #include "debug.h" @@ -51,6 +52,7 @@ struct vine_task *vine_task_create(const char *command_line) t->input_mounts = list_create(); t->output_mounts = list_create(); + t->symlink_list = list_create(); t->env_list = list_create(); t->feature_list = list_create(); @@ -177,6 +179,17 @@ static void vine_task_mount_list_copy(struct list *destination, struct list *lis } } +static void vine_task_symlink_list_copy(struct list *destination, struct list *list) +{ + struct vine_symlink *old_symlink, *new_symlink; + + LIST_ITERATE(list, old_symlink) + { + new_symlink = vine_symlink_copy(old_symlink); + list_push_tail(destination, new_symlink); + } +} + static void vine_task_string_list_copy(struct list *destination, struct list *string_list) { char *var; @@ -230,6 +243,7 @@ struct vine_task *vine_task_copy(const struct vine_task *task) vine_task_mount_list_copy(new->input_mounts, task->input_mounts); vine_task_mount_list_copy(new->output_mounts, task->output_mounts); + vine_task_symlink_list_copy(new->symlink_list, task->symlink_list); vine_task_string_list_copy(new->env_list, task->env_list); vine_task_string_list_copy(new->feature_list, task->feature_list); new->function_slots_requested = task->function_slots_requested; @@ -636,6 +650,11 @@ int vine_task_add_environment(struct vine_task *t, struct vine_file *f) return vine_task_add_execution_context(t, f); } +void vine_task_add_symlink(struct vine_task *t, const char *name, const char *target) +{ + list_push_tail(t->symlink_list, vine_symlink_create(name, target)); +} + int vine_task_add_execution_context(struct vine_task *t, struct vine_file *context_file) { if (!context_file) { @@ -644,7 +663,7 @@ int vine_task_add_execution_context(struct vine_task *t, struct vine_file *conte } char *env_name = string_format("__vine_env_%s", context_file->cached_name); - vine_task_add_input(t, context_file, env_name, VINE_MOUNT_SYMLINK); + vine_task_add_input(t, context_file, env_name, 0); char *new_cmd = string_format("%s/bin/run_in_env %s", env_name, t->command_line); vine_task_set_command(t, new_cmd); @@ -754,6 +773,9 @@ void vine_task_delete(struct vine_task *t) list_clear(t->output_mounts, (void *)vine_mount_delete); list_delete(t->output_mounts); + list_clear(t->symlink_list, (void *)vine_symlink_delete); + list_delete(t->symlink_list); + list_clear(t->env_list, (void *)free); list_delete(t->env_list); diff --git a/taskvine/src/manager/vine_task.h b/taskvine/src/manager/vine_task.h index 4c53ece509..af586f839c 100644 --- a/taskvine/src/manager/vine_task.h +++ b/taskvine/src/manager/vine_task.h @@ -59,8 +59,9 @@ struct vine_task { int function_slots_requested; /**< If this is a LibraryTask, the number of function slots requested by the user. -1 causes the number of slots to match the number of cores. */ vine_task_func_exec_mode_t func_exec_mode; /**< If this a LibraryTask, the execution mode of its functions. */ - struct list *input_mounts; /**< The mounted files expected as inputs. */ - struct list *output_mounts; /**< The mounted files expected as outputs. */ + struct list *input_mounts; /**< The mounted files expected as inputs. */ + struct list *output_mounts; /**< The mounted files expected as outputs. */ + struct list *symlink_list; /**< Additional symlinks to add to the sandbox namespace. */ struct list *env_list; /**< Environment variables applied to the task. */ struct list *feature_list; /**< User-defined features this task requires. (See vine_worker's --feature option.) */ diff --git a/taskvine/src/worker/vine_sandbox.c b/taskvine/src/worker/vine_sandbox.c index 060869d591..4a5435bef0 100644 --- a/taskvine/src/worker/vine_sandbox.c +++ b/taskvine/src/worker/vine_sandbox.c @@ -9,6 +9,7 @@ See the file COPYING for details. #include "vine_cache_file.h" #include "vine_file.h" #include "vine_mount.h" +#include "vine_symlink.h" #include "vine_task.h" #include "vine_worker.h" @@ -81,20 +82,25 @@ static int stage_input_file(struct vine_process *p, struct vine_mount *m, struct vine_cache_status_t status; status = vine_cache_ensure(cache, f->cached_name); if (status == VINE_CACHE_STATUS_READY) { + /* The sandbox path is permitted to have leading directory elements. */ create_dir_parents(sandbox_path, 0777); - debug(D_VINE, "input: link %s -> %s", cache_path, sandbox_path); - if (m->flags & VINE_MOUNT_SYMLINK) { - /* If the user has requested a symlink, just do that b/c it is faster for large dirs. */ + + if(m->flags&VINE_MOUNT_HARDLINK) { + /* Rare case: If requested, hard-link each element of the cache object */ + /* This can be quite expensive for large directory trees. */ + debug(D_VINE, "input: hardlink %s -> %s", cache_path, sandbox_path); + result = file_link_recursive(cache_path, sandbox_path, 1); + } else { + /* Normally, just symlink from the sandbox to the cache path. */ + /* This is a relatively cheap operation, and most apps won't notice. */ + debug(D_VINE, "input: symlink %s -> %s", cache_path, sandbox_path); result = symlink(cache_path, sandbox_path); /* Change sense of Unix result to true/false. */ result = !result; - } else { - /* Otherwise recursively hard-link the object into the sandbox. */ - result = file_link_recursive(cache_path, sandbox_path, 1); } - if (!result) debug(D_VINE, "couldn't link %s into sandbox as %s: %s", cache_path, sandbox_path, strerror(errno)); + } else { debug(D_VINE, "input: %s is not ready in the cache!", f->cached_name); result = 0; @@ -134,6 +140,7 @@ int vine_sandbox_stagein(struct vine_process *p, struct vine_cache *cache) int result = 1; struct vine_mount *m; + struct vine_symlink *s; /* For each input mount, stage it into the sandbox. */ @@ -144,6 +151,17 @@ int vine_sandbox_stagein(struct vine_process *p, struct vine_cache *cache) break; } + /* For each requested symlink, create it in the sandbox */ + + LIST_ITERATE(t->symlink_list, s) + { + result = symlink(s->target, s->name); + if (result != 0) { + debug(D_VINE, "unable to symlink %s -> %s: %s", s->name, s->target, strerror(errno)); + break; + } + } + /* If any of the output mounts have the MKDIR flag, then create those empty dirs. */ LIST_ITERATE(t->output_mounts, m) diff --git a/taskvine/src/worker/vine_worker.c b/taskvine/src/worker/vine_worker.c index dd2eddc555..f0270b752e 100644 --- a/taskvine/src/worker/vine_worker.c +++ b/taskvine/src/worker/vine_worker.c @@ -834,6 +834,8 @@ static struct vine_task *do_task_body(struct link *manager, int task_id, time_t char localname[VINE_LINE_MAX]; char taskname[VINE_LINE_MAX]; char taskname_encoded[VINE_LINE_MAX]; + char target[VINE_LINE_MAX]; + char target_encoded[VINE_LINE_MAX]; char library_name[VINE_LINE_MAX]; char category[VINE_LINE_MAX]; int flags, length; @@ -873,6 +875,10 @@ static struct vine_task *do_task_body(struct link *manager, int task_id, time_t url_decode(taskname_encoded, taskname, VINE_LINE_MAX); vine_hack_do_not_compute_cached_name = 1; vine_task_add_output_file(task, localname, taskname, flags); + } else if (sscanf(line, "symlink %s %s", taskname_encoded, target_encoded) == 2) { + url_decode(taskname_encoded, taskname, VINE_LINE_MAX); + url_decode(target_encoded, target, VINE_LINE_MAX); + vine_task_add_symlink(task, taskname, target); } else if (sscanf(line, "cores %" PRId64, &n)) { vine_task_set_cores(task, n); } else if (sscanf(line, "memory %" PRId64, &n)) { diff --git a/taskvine/test/vine_python.py b/taskvine/test/vine_python.py index ddd40dd956..96397b15d4 100755 --- a/taskvine/test/vine_python.py +++ b/taskvine/test/vine_python.py @@ -116,19 +116,19 @@ def next_output_name(): t = q.wait(wait_time) report_task(t, "success", 0, [path.join(test_dir, output_name)]) - # same thing, but this time symlink the input directory. + # same thing, but this time hardlink the input directory. output_name = next_output_name() t = vine.Task(f"cd my_dir && ./{exec_name} {input_name} 2>&1 > {output_name}") in_dir = q.declare_file(test_dir, cache=True) t.add_input(exec_file, exec_name) - t.add_input(in_dir, "my_dir", mount_symlink=True) + t.add_input(in_dir, "my_dir", mount_hardlink=True) output_file = q.declare_file(path.join(test_dir, output_name), cache=False) t.add_output(output_file, path.join("my_dir", output_name)) q.submit(t) t = q.wait(wait_time) report_task(t, "success", 0, [path.join(test_dir, output_name)]) - + # we bring back the outputs from a directory: output_name = next_output_name() t = vine.Task(f"mkdir outs && ./{exec_name} {input_name} 2>&1 > outs/{output_name}") @@ -141,6 +141,19 @@ def next_output_name(): t = q.wait(wait_time) report_task(t, "success", 0, [path.join(test_dir, "outs", output_name)]) + # use symlink to access a shared filesystem: + output_name = next_output_name() + t = vine.Task(f"mkdir outs && ./{exec_name} {input_name} 2>&1 > outs/{output_name}") + t.add_symlink("input","infile" ) + t.add_symlink("output","outfile" ) + + outs = q.declare_file(path.join(test_dir, "outs"), cache=False) + t.add_output(outs, "outs") + + q.submit(t) + t = q.wait(wait_time) + report_task(t, "success", 0, [path.join(test_dir, "outs", output_name)]) + # Execute a task that only communicates through buffers: inbuf = q.declare_buffer(bytes("This is only a test!", "utf-8")) outbuf1 = q.declare_buffer()