Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Vine Shared Filesystem Access via Symlinks #3976

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions taskvine/src/bindings/python3/ndcctools/taskvine/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ def __del__(self):
pass

@staticmethod
def _determine_mount_flags(watch=False, failure_only=False, success_only=False, strict_input=False, mount_symlink=False):
def _determine_mount_flags(watch=False, failure_only=False, success_only=False, strict_input=False ):
flags = cvine.VINE_TRANSFER_ALWAYS
if watch:
flags |= cvine.VINE_WATCH
Expand All @@ -136,8 +136,6 @@ def _determine_mount_flags(watch=False, failure_only=False, success_only=False,
flags |= cvine.VINE_SUCCESS_ONLY
if strict_input:
flags |= cvine.VINE_FIXED_LOCATION
if mount_symlink:
flags |= cvine.VINE_MOUNT_SYMLINK
return flags

@staticmethod
Expand Down Expand Up @@ -341,12 +339,12 @@ def add_feature(self, name):
# >>> f = m.declare_untar(url)
# >>> task.add_input(f,"data")
# @endcode
def add_input(self, file, remote_name, strict_input=False, mount_symlink=False):
def add_input(self, file, remote_name, strict_input=False ):
# SWIG expects strings
if not isinstance(remote_name, str):
raise TypeError(f"remote_name {remote_name} is not a str")

flags = Task._determine_mount_flags(strict_input=strict_input, mount_symlink=mount_symlink)
flags = Task._determine_mount_flags(strict_input=strict_input)

if cvine.vine_task_add_input(self._task, file._file, remote_name, flags) == 0:
raise ValueError("invalid file description")
Expand Down
1 change: 1 addition & 0 deletions taskvine/src/manager/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ SOURCES = \
vine_task.c \
vine_file.c \
vine_mount.c \
vine_symlink.c \
vine_txn_log.c \
vine_taskgraph_log.c \
vine_cached_name.c \
Expand Down
10 changes: 9 additions & 1 deletion taskvine/src/manager/taskvine.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ typedef enum {
VINE_SUCCESS_ONLY = 8, /**< Only return this output file if the task succeeded. */
VINE_RETRACT_ON_RESET = 16, /**< Remove this file from the mount lists if the task is reset. (TaskVine internal
use only.) */
VINE_MOUNT_SYMLINK = 32, /**< Permit this directory to be mounted via symlink instead of hardlink. */
VINE_MOUNT_MKDIR = 64 /**< Create this empty output directory in the task sandbox prior to execution. */
} vine_mount_flags_t;

Expand Down Expand Up @@ -446,6 +445,15 @@ feature.
*/
void vine_task_add_feature(struct vine_task *t, const char *name);

/** Add a symbolic link to the sandbox namespace of the task.
This can be used to access a file within a shared filesystem,
without causing the file to be transferred using the taskvine caching infrastructure.
@param t A task object;
@param name The name of the symlink; must be a relative path in the task sandbox.
@param target The target of the symlink; must be an absolute path to an outside filesystem.
*/
void vine_task_add_symlink(struct vine_task *t, const char *name, const char *target);

/** Specify the priority of this task relative to others in the manager.
Tasks with a higher priority value run first. If no priority is given, a task is placed at the end of the ready list,
regardless of the priority.
Expand Down
13 changes: 13 additions & 0 deletions taskvine/src/manager/vine_manager_put.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ See the file COPYING for details.
#include "vine_file_replica_table.h"
#include "vine_mount.h"
#include "vine_protocol.h"
#include "vine_symlink.h"
#include "vine_task.h"
#include "vine_txn_log.h"
#include "vine_worker_info.h"
Expand Down Expand Up @@ -560,6 +561,18 @@ vine_result_code_t vine_manager_put_task(
}
}

if (t->symlink_list) {
struct vine_symlink *s;
LIST_ITERATE(t->symlink_list, s)
{
char name_encoded[PATH_MAX];
char target_encoded[PATH_MAX];
url_encode(s->name, name_encoded, PATH_MAX);
url_encode(s->target, target_encoded, PATH_MAX);
vine_manager_send(q, w, "symlink %s %s\n", name_encoded, target_encoded);
}
}

// vine_manager_send returns the number of bytes sent, or a number less than
// zero to indicate errors. We are lazy here, we only check the last
// message we sent to the worker (other messages may have failed above).
Expand Down
27 changes: 27 additions & 0 deletions taskvine/src/manager/vine_symlink.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@

#include "vine_symlink.h"

#include "xxmalloc.h"
#include <stdlib.h>

struct vine_symlink *vine_symlink_create(const char *name, const char *target)
{
struct vine_symlink *s = malloc(sizeof(*s));
s->name = xxstrdup(name);
s->target = xxstrdup(target);
return s;
}

struct vine_symlink *vine_symlink_copy(struct vine_symlink *s)
{
return vine_symlink_create(s->name, s->target);
}

void vine_symlink_delete(struct vine_symlink *s)
{
if (!s)
return;
free(s->name);
free(s->target);
free(s);
}
29 changes: 29 additions & 0 deletions taskvine/src/manager/vine_symlink.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
Copyright (C) 2024- The University of Notre Dame
This software is distributed under the GNU General Public License.
See the file COPYING for details.
*/

#ifndef VINE_SYMLINK_H
#define VINE_SYMLINK_H

/*
A vine symlink is a lightweight structure representing a symlink
to be added to the sandbox namespace of a task as it runs.
Note that a symlink is not treated as either an input file
or an output file in the vine infrastructure, because there are
no file contents to cache or move. It is simply an addition
to the task namespace performed just prior to execution.
*/

struct vine_symlink {
char *name;
char *target;
};

struct vine_symlink * vine_symlink_create( const char *name, const char *target );
struct vine_symlink * vine_symlink_copy( struct vine_symlink *s );
void vine_symlink_delete( struct vine_symlink *s );

#endif

24 changes: 23 additions & 1 deletion taskvine/src/manager/vine_task.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ See the file COPYING for details.
#include "vine_file.h"
#include "vine_manager.h"
#include "vine_mount.h"
#include "vine_symlink.h"
#include "vine_worker_info.h"

#include "debug.h"
Expand Down Expand Up @@ -51,6 +52,7 @@ struct vine_task *vine_task_create(const char *command_line)

t->input_mounts = list_create();
t->output_mounts = list_create();
t->symlink_list = list_create();
t->env_list = list_create();
t->feature_list = list_create();

Expand Down Expand Up @@ -177,6 +179,17 @@ static void vine_task_mount_list_copy(struct list *destination, struct list *lis
}
}

static void vine_task_symlink_list_copy(struct list *destination, struct list *list)
{
struct vine_symlink *old_symlink, *new_symlink;

LIST_ITERATE(list, old_symlink)
{
new_symlink = vine_symlink_copy(old_symlink);
list_push_tail(destination, new_symlink);
}
}

static void vine_task_string_list_copy(struct list *destination, struct list *string_list)
{
char *var;
Expand Down Expand Up @@ -230,6 +243,7 @@ struct vine_task *vine_task_copy(const struct vine_task *task)

vine_task_mount_list_copy(new->input_mounts, task->input_mounts);
vine_task_mount_list_copy(new->output_mounts, task->output_mounts);
vine_task_symlink_list_copy(new->symlink_list, task->symlink_list);
vine_task_string_list_copy(new->env_list, task->env_list);
vine_task_string_list_copy(new->feature_list, task->feature_list);
new->function_slots_requested = task->function_slots_requested;
Expand Down Expand Up @@ -636,6 +650,11 @@ int vine_task_add_environment(struct vine_task *t, struct vine_file *f)
return vine_task_add_execution_context(t, f);
}

void vine_task_add_symlink(struct vine_task *t, const char *name, const char *target)
{
list_push_tail(t->symlink_list, vine_symlink_create(name, target));
}

int vine_task_add_execution_context(struct vine_task *t, struct vine_file *context_file)
{
if (!context_file) {
Expand All @@ -644,7 +663,7 @@ int vine_task_add_execution_context(struct vine_task *t, struct vine_file *conte
}

char *env_name = string_format("__vine_env_%s", context_file->cached_name);
vine_task_add_input(t, context_file, env_name, VINE_MOUNT_SYMLINK);
vine_task_add_input(t, context_file, env_name, 0);

char *new_cmd = string_format("%s/bin/run_in_env %s", env_name, t->command_line);
vine_task_set_command(t, new_cmd);
Expand Down Expand Up @@ -754,6 +773,9 @@ void vine_task_delete(struct vine_task *t)
list_clear(t->output_mounts, (void *)vine_mount_delete);
list_delete(t->output_mounts);

list_clear(t->symlink_list, (void *)vine_symlink_delete);
list_delete(t->symlink_list);

list_clear(t->env_list, (void *)free);
list_delete(t->env_list);

Expand Down
5 changes: 3 additions & 2 deletions taskvine/src/manager/vine_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@ struct vine_task {
int function_slots_requested; /**< If this is a LibraryTask, the number of function slots requested by the user. -1 causes the number of slots to match the number of cores. */
vine_task_func_exec_mode_t func_exec_mode; /**< If this a LibraryTask, the execution mode of its functions. */

struct list *input_mounts; /**< The mounted files expected as inputs. */
struct list *output_mounts; /**< The mounted files expected as outputs. */
struct list *input_mounts; /**< The mounted files expected as inputs. */
struct list *output_mounts; /**< The mounted files expected as outputs. */
struct list *symlink_list; /**< Additional symlinks to add to the sandbox namespace. */
struct list *env_list; /**< Environment variables applied to the task. */
struct list *feature_list; /**< User-defined features this task requires. (See vine_worker's --feature option.) */

Expand Down
27 changes: 17 additions & 10 deletions taskvine/src/worker/vine_sandbox.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ See the file COPYING for details.
#include "vine_cache_file.h"
#include "vine_file.h"
#include "vine_mount.h"
#include "vine_symlink.h"
#include "vine_task.h"
#include "vine_worker.h"

Expand Down Expand Up @@ -83,18 +84,12 @@ static int stage_input_file(struct vine_process *p, struct vine_mount *m, struct
if (status == VINE_CACHE_STATUS_READY) {
create_dir_parents(sandbox_path, 0777);
debug(D_VINE, "input: link %s -> %s", cache_path, sandbox_path);
if (m->flags & VINE_MOUNT_SYMLINK) {
/* If the user has requested a symlink, just do that b/c it is faster for large dirs. */
result = symlink(cache_path, sandbox_path);
/* Change sense of Unix result to true/false. */
result = !result;
} else {
/* Otherwise recursively hard-link the object into the sandbox. */
result = file_link_recursive(cache_path, sandbox_path, 1);
}

result = symlink(cache_path, sandbox_path);
/* Change sense of Unix result to true/false. */
result = !result;
if (!result)
debug(D_VINE, "couldn't link %s into sandbox as %s: %s", cache_path, sandbox_path, strerror(errno));

} else {
debug(D_VINE, "input: %s is not ready in the cache!", f->cached_name);
result = 0;
Expand Down Expand Up @@ -134,6 +129,7 @@ int vine_sandbox_stagein(struct vine_process *p, struct vine_cache *cache)
int result = 1;

struct vine_mount *m;
struct vine_symlink *s;

/* For each input mount, stage it into the sandbox. */

Expand All @@ -144,6 +140,17 @@ int vine_sandbox_stagein(struct vine_process *p, struct vine_cache *cache)
break;
}

/* For each requested symlink, create it in the sandbox */

LIST_ITERATE(t->symlink_list, s)
{
result = symlink(s->target, s->name);
if (result != 0) {
debug(D_VINE, "unable to symlink %s -> %s: %s", s->name, s->target, strerror(errno));
break;
}
}

/* If any of the output mounts have the MKDIR flag, then create those empty dirs. */

LIST_ITERATE(t->output_mounts, m)
Expand Down
6 changes: 6 additions & 0 deletions taskvine/src/worker/vine_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -834,6 +834,8 @@ static struct vine_task *do_task_body(struct link *manager, int task_id, time_t
char localname[VINE_LINE_MAX];
char taskname[VINE_LINE_MAX];
char taskname_encoded[VINE_LINE_MAX];
char target[VINE_LINE_MAX];
char target_encoded[VINE_LINE_MAX];
char library_name[VINE_LINE_MAX];
char category[VINE_LINE_MAX];
int flags, length;
Expand Down Expand Up @@ -873,6 +875,10 @@ static struct vine_task *do_task_body(struct link *manager, int task_id, time_t
url_decode(taskname_encoded, taskname, VINE_LINE_MAX);
vine_hack_do_not_compute_cached_name = 1;
vine_task_add_output_file(task, localname, taskname, flags);
} else if (sscanf(line, "symlink %s %s", taskname_encoded, target_encoded) == 2) {
url_decode(taskname_encoded, taskname, VINE_LINE_MAX);
url_decode(target_encoded, target, VINE_LINE_MAX);
vine_task_add_symlink(task, taskname, target);
} else if (sscanf(line, "cores %" PRId64, &n)) {
vine_task_set_cores(task, n);
} else if (sscanf(line, "memory %" PRId64, &n)) {
Expand Down
13 changes: 0 additions & 13 deletions taskvine/test/vine_python.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,19 +116,6 @@ def next_output_name():
t = q.wait(wait_time)
report_task(t, "success", 0, [path.join(test_dir, output_name)])

# same thing, but this time symlink the input directory.
output_name = next_output_name()
t = vine.Task(f"cd my_dir && ./{exec_name} {input_name} 2>&1 > {output_name}")
in_dir = q.declare_file(test_dir, cache=True)
t.add_input(exec_file, exec_name)
t.add_input(in_dir, "my_dir", mount_symlink=True)
output_file = q.declare_file(path.join(test_dir, output_name), cache=False)
t.add_output(output_file, path.join("my_dir", output_name))

q.submit(t)
t = q.wait(wait_time)
report_task(t, "success", 0, [path.join(test_dir, output_name)])

# we bring back the outputs from a directory:
output_name = next_output_name()
t = vine.Task(f"mkdir outs && ./{exec_name} {input_name} 2>&1 > outs/{output_name}")
Expand Down
Loading