Skip to content

Commit

Permalink
add exec mode to library task
Browse files Browse the repository at this point in the history
  • Loading branch information
tphung3 committed Oct 28, 2024
1 parent eb70ab4 commit feca3d4
Show file tree
Hide file tree
Showing 8 changed files with 44 additions and 21 deletions.
2 changes: 1 addition & 1 deletion poncho/src/poncho/library_network_code.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ def main():

# register functions in this library to the global namespace
for func_name in library_info['function_list']:
func_code = cloudpickle.loads(library_info['function_list'][func_name])
func_code = remote_execute(cloudpickle.loads(library_info['function_list'][func_name]))
globals()[func_name] = func_code

# load and execute this library's context
Expand Down
25 changes: 8 additions & 17 deletions poncho/src/poncho/package_serverize.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,10 +176,15 @@ def pack_library_code(path, envpath):
# @param init_command A string describing a shell command to execute before the library task is run
# @param add_env Whether to automatically create and/or add environment to the library
# @param hoisting_modules A list of modules imported at the preamble of library, including packages, functions and classes.
# @param exec_mode Execution mode that the library should use to run function calls. Either 'direct' or 'fork'
# @param library_context_info A list containing [library_context_func, library_context_args, library_context_kwargs]. Used to create the library context on remote nodes.
# @return A hash value.
def generate_library_hash(library_name, function_list, poncho_env, init_command, add_env, hoisting_modules, exec_mode, library_context_info):
def generate_library_hash(library_name,
function_list,
poncho_env,
init_command,
add_env,
hoisting_modules,
library_context_info):
library_info = [library_name]
function_list = list(function_list)
function_names = set()
Expand Down Expand Up @@ -217,7 +222,6 @@ def generate_library_hash(library_name, function_list, poncho_env, init_command,
library_info.append(str(init_command))
library_info.append(str(add_env))
library_info.append(str(hoisting_modules))
library_info.append(str(exec_mode))

if library_context_info:
if isinstance(library_context_info[1], list):
Expand Down Expand Up @@ -284,7 +288,6 @@ def generate_taskvine_library_code(library_path, hoisting_modules=None):
# @param library_name name of the library
# @param need_pack whether to create a poncho environment tarball
# @param hoisting_modules a list of modules to be imported at the preamble of library
# @param exec_mode whether to execute invocations directly or by forking
# @param library_context_info a list containing a library's context to be created remotely
# @return name of the file containing serialized information about the library
def generate_library(library_cache_path,
Expand All @@ -295,7 +298,6 @@ def generate_library(library_cache_path,
library_name,
need_pack=True,
hoisting_modules=None,
library_exec_mode='direct',
library_context_info=None
):
# create library_info.clpk
Expand All @@ -304,7 +306,6 @@ def generate_library(library_cache_path,
for func in functions:
library_info['function_list'][func.__name__] = cloudpickle.dumps(func)
library_info['library_name'] = library_name
library_info['exec_mode'] = library_exec_mode
library_info['context_info'] = cloudpickle.dumps(library_context_info)
with open(library_info_path, 'wb') as f:
cloudpickle.dump(library_info, f)
Expand All @@ -326,18 +327,8 @@ def generate_library(library_cache_path,
# @param library_context_info a list containing a library's context to be created remotely
# @return name of the file containing serialized information about functions
def serverize_library_from_code(
path, functions, name, need_pack=True, hoisting_modules=None, exec_mode='direct', library_context_info=None
path, functions, name, need_pack=True, hoisting_modules=None, exec_mode='fork', library_context_info=None
):
library_info = {}
library_info['function_list'] = {}
for func in functions:
library_info['function_list'][func.__name__] = cloudpickle.dumps(func)

library_info['library_name'] = name
library_info['hoisting_modules'] = hoisting_modules
library_info['exec_mode'] = exec_mode
library_info['context_info'] = cloudpickle.dumps(library_context_info)

with open(f'{path}/library_info.clpk', 'wb') as f:
cloudpickle.dump(library_info, f)

Expand Down
10 changes: 7 additions & 3 deletions taskvine/src/bindings/python3/ndcctools/taskvine/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -933,7 +933,7 @@ def check_library_exists(self, library_name):
# @param exec_mode Execution mode that the library should use to run function calls. Either 'direct' or 'fork'
# @param library_context_info A list containing [library_context_func, library_context_args, library_context_kwargs]. Used to create the library context on remote nodes.
# @returns A task to be used with @ref ndcctools.taskvine.manager.Manager.install_library.
def create_library_from_functions(self, library_name, *function_list, poncho_env=None, init_command=None, add_env=True, hoisting_modules=None, exec_mode='direct', library_context_info=None):
def create_library_from_functions(self, library_name, *function_list, poncho_env=None, init_command=None, add_env=True, hoisting_modules=None, exec_mode='fork', library_context_info=None):
# Delay loading of poncho until here, to avoid bringing in poncho dependencies unless needed.
# Ensure poncho python library is available.
from ndcctools.poncho import package_serverize
Expand All @@ -943,7 +943,7 @@ def create_library_from_functions(self, library_name, *function_list, poncho_env
raise ValueError('A library cannot have 0 functions.')

# Create a unique hash of a library from all information that determine a library's uniqueness.
library_hash = package_serverize.generate_library_hash(library_name, function_list, poncho_env, init_command, add_env, hoisting_modules, exec_mode, library_context_info)
library_hash = package_serverize.generate_library_hash(library_name, function_list, poncho_env, init_command, add_env, hoisting_modules, library_context_info)

# Create path for caching library code and environment based on function hash.
library_cache_dir_name = "vine-library-cache"
Expand Down Expand Up @@ -990,7 +990,6 @@ def create_library_from_functions(self, library_name, *function_list, poncho_env
library_name=library_name,
need_pack=need_pack,
hoisting_modules=hoisting_modules,
library_exec_mode=exec_mode,
library_context_info=library_context_info)

# enable correct permissions for library code
Expand All @@ -1012,6 +1011,11 @@ def create_library_from_functions(self, library_name, *function_list, poncho_env
t.add_input(f, library_code_name)
f = self.declare_file(library_info_path, cache=True, peer_transfer=True)
t.add_input(f, library_info_name)

# Register execution mode of functions in this library
t.set_function_exec_mode(exec_mode)
if exec_mode == 'direct':
t.set_function_slots(1)
return t

# does not include default arguments of a function
Expand Down
9 changes: 9 additions & 0 deletions taskvine/src/bindings/python3/ndcctools/taskvine/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,15 @@ def provides_library(self, library):
def set_function_slots(self, nslots):
return cvine.vine_task_set_function_slots(self._task, nslots)

##
# Set the execution mode of functions in a library.
# This is not needed for regular tasks.
#
# @param self Reference to the current task object.
# @param exec_mode The execution mode of functions in a library.
def set_function_exec_mode(self, exec_mode):
return cvine.vine_task_set_function_exec_mode(self._task, exec_mode)

##
# Set the worker selection scheduler for task.
#
Expand Down
9 changes: 9 additions & 0 deletions taskvine/src/manager/taskvine.h
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,15 @@ If unset, the library will runs as many functions as it has cores available.
*/
void vine_task_set_function_slots(struct vine_task *t, int nslots);

/** Set the execution mode of functions inside a library.
A mode can either be "fork" where the library forks and executes functions, or
"direct" where the library executes a function in its memory space.
Note that "direct" will limit the number of function slots in a library to 1.
@param t A library object.
@param exec_mode A string denoting the execution mode of the library.
*/
void vine_task_set_function_exec_mode(struct vine_task *t, const char *exec_mode);

/** Add a general file object as a input to a task.
@param t A task object.
@param f A file object, created by @ref vine_declare_file, @ref vine_declare_url, @ref vine_declare_buffer, @ref
Expand Down
2 changes: 2 additions & 0 deletions taskvine/src/manager/vine_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -2730,6 +2730,8 @@ static vine_result_code_t start_one_task(struct vine_manager *q, struct vine_wor
if (t->provides_library) {
if (t->function_slots_requested <= 0) {
t->function_slots_total = limits->cores;
} else if (!strncmp(t->func_exec_mode, "direct", strlen("direct"))) {
t->function_slots_total = 1;
} else {
t->function_slots_total = t->function_slots_requested;
}
Expand Down
7 changes: 7 additions & 0 deletions taskvine/src/manager/vine_task.c
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,13 @@ void vine_task_set_function_slots(struct vine_task *t, int nslots)
t->function_slots_requested = nslots;
}

void vine_task_set_function_exec_mode(struct vine_task *t, const char *exec_mode)
{
if (exec_mode && t->provides_library) {
t->func_exec_mode = xxstrdup(exec_mode);
}
}

void vine_task_set_env_var(struct vine_task *t, const char *name, const char *value)
{
if (value) {
Expand Down
1 change: 1 addition & 0 deletions taskvine/src/manager/vine_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ struct vine_task {
char *needs_library; /**< If this is a FunctionTask, the name of the library used */
char *provides_library; /**< If this is a LibraryTask, the name of the library provided. */
int function_slots_requested; /**< If this is a LibraryTask, the number of function slots requested by the user. -1 causes the number of slots to match the number of cores. */
const char *func_exec_mode; /**< If this a LibraryTask, the execution mode of its functions. */

struct list *input_mounts; /**< The mounted files expected as inputs. */
struct list *output_mounts; /**< The mounted files expected as outputs. */
Expand Down

0 comments on commit feca3d4

Please sign in to comment.