From 8e4429ddfac0717532dd4ca81b0d1747629fecac Mon Sep 17 00:00:00 2001 From: Thanh Son Phung Date: Tue, 29 Oct 2024 07:58:11 -0400 Subject: [PATCH] add support for fork and direct --- poncho/src/poncho/library_network_code.py | 27 +++++++++++++------ poncho/src/poncho/package_serverize.py | 8 +++++- .../python3/ndcctools/taskvine/manager.py | 10 ++++++- taskvine/src/manager/vine_manager.c | 7 ++--- taskvine/src/manager/vine_task.c | 2 ++ 5 files changed, 41 insertions(+), 13 deletions(-) diff --git a/poncho/src/poncho/library_network_code.py b/poncho/src/poncho/library_network_code.py index 8a0cc263c8..ff2bc9c9c6 100755 --- a/poncho/src/poncho/library_network_code.py +++ b/poncho/src/poncho/library_network_code.py @@ -22,6 +22,15 @@ import socket from threadpoolctl import threadpool_limits +def dd(msg): + if os.path.isfile('/tmp/tmp.log'): + mode = 'a' + else: + mode = 'w' + timestamp = datetime.now().strftime("%m/%d/%y %H:%M:%S.%f") + with open('/tmp/tmp.log', mode) as f: + print(timestamp, msg, file=f) + # self-pipe to turn a sigchld signal when a child finishes execution # into an I/O event. r, w = os.pipe() @@ -302,7 +311,7 @@ def main(): help="pid of main vine worker to send sigchild to let it know theres some result.", ) args = parser.parse_args() - + dd('args parsed ok') # check if library cores and function slots are valid if args.function_slots > args.library_cores: stdout_timed_message("error: function slots cannot be more than library cores") @@ -351,19 +360,20 @@ def main(): # read in information about this library with open('library_info.clpk', 'rb') as f: library_info = cloudpickle.load(f) - + dd('library_info loaded ok') # register functions in this library to the global namespace for func_name in library_info['function_list']: func_code = remote_execute(cloudpickle.loads(library_info['function_list'][func_name])) globals()[func_name] = func_code - + dd(f'f{func_name} registered') # load and execute this library's context library_context_info = cloudpickle.loads(library_info['context_info']) - context_func = library_context_info[0] - context_args = library_context_info[1] - context_kwargs = library_context_info[2] - context_func(*context_args, **context_kwargs) - + if library_context_info: + context_func = library_context_info[0] + context_args = library_context_info[1] + context_kwargs = library_context_info[2] + context_func(*context_args, **context_kwargs) + dd('library-context-info loaded ok') # send configuration of library, just its name for now config = { "name": library_info['library_name'] @@ -427,6 +437,7 @@ def main(): if __name__ == '__main__': + dd('start main') main() diff --git a/poncho/src/poncho/package_serverize.py b/poncho/src/poncho/package_serverize.py index 8eb40af154..6eb5cfe202 100755 --- a/poncho/src/poncho/package_serverize.py +++ b/poncho/src/poncho/package_serverize.py @@ -174,7 +174,8 @@ def pack_library_code(path, envpath): # @param function_list A list of functions in the library # @param poncho_env The name of an already prepared poncho environment # @param init_command A string describing a shell command to execute before the library task is run -# @param add_env Whether to automatically create and/or add environment to the library +# @param add_env Whether to automatically create and/or add environment to the library +# @param exec_mode The execution mode of functions in this library. # @param hoisting_modules A list of modules imported at the preamble of library, including packages, functions and classes. # @param library_context_info A list containing [library_context_func, library_context_args, library_context_kwargs]. Used to create the library context on remote nodes. # @return A hash value. @@ -183,6 +184,7 @@ def generate_library_hash(library_name, poncho_env, init_command, add_env, + exec_mode, hoisting_modules, library_context_info): library_info = [library_name] @@ -221,6 +223,7 @@ def generate_library_hash(library_name, library_info.append(str(poncho_env)) library_info.append(str(init_command)) library_info.append(str(add_env)) + library_info.append(str(exec_mode)) library_info.append(str(hoisting_modules)) if library_context_info: @@ -287,6 +290,7 @@ def generate_taskvine_library_code(library_path, hoisting_modules=None): # @param functions list of functions to include in the library # @param library_name name of the library # @param need_pack whether to create a poncho environment tarball +# @param exec_mode execution mode of functions in this library # @param hoisting_modules a list of modules to be imported at the preamble of library # @param library_context_info a list containing a library's context to be created remotely # @return name of the file containing serialized information about the library @@ -297,6 +301,7 @@ def generate_library(library_cache_path, functions, library_name, need_pack=True, + exec_mode='fork', hoisting_modules=None, library_context_info=None ): @@ -306,6 +311,7 @@ def generate_library(library_cache_path, for func in functions: library_info['function_list'][func.__name__] = cloudpickle.dumps(func) library_info['library_name'] = library_name + library_info['exec_mode'] = exec_mode library_info['context_info'] = cloudpickle.dumps(library_context_info) with open(library_info_path, 'wb') as f: cloudpickle.dump(library_info, f) diff --git a/taskvine/src/bindings/python3/ndcctools/taskvine/manager.py b/taskvine/src/bindings/python3/ndcctools/taskvine/manager.py index 37266796a5..96c044fb05 100644 --- a/taskvine/src/bindings/python3/ndcctools/taskvine/manager.py +++ b/taskvine/src/bindings/python3/ndcctools/taskvine/manager.py @@ -943,7 +943,14 @@ def create_library_from_functions(self, library_name, *function_list, poncho_env raise ValueError('A library cannot have 0 functions.') # Create a unique hash of a library from all information that determine a library's uniqueness. - library_hash = package_serverize.generate_library_hash(library_name, function_list, poncho_env, init_command, add_env, hoisting_modules, library_context_info) + library_hash = package_serverize.generate_library_hash(library_name=library_name, + function_list=function_list, + poncho_env=poncho_env, + init_command=init_command, + add_env=add_env, + exec_mode=exec_mode, + hoisting_modules=hoisting_modules, + library_context_info=library_context_info) # Create path for caching library code and environment based on function hash. library_cache_dir_name = "vine-library-cache" @@ -989,6 +996,7 @@ def create_library_from_functions(self, library_name, *function_list, poncho_env functions=function_list, library_name=library_name, need_pack=need_pack, + exec_mode=exec_mode, hoisting_modules=hoisting_modules, library_context_info=library_context_info) diff --git a/taskvine/src/manager/vine_manager.c b/taskvine/src/manager/vine_manager.c index 1b557e98a0..f4229be0d8 100644 --- a/taskvine/src/manager/vine_manager.c +++ b/taskvine/src/manager/vine_manager.c @@ -2728,10 +2728,11 @@ static vine_result_code_t start_one_task(struct vine_manager *q, struct vine_wor */ if (t->provides_library) { - if (t->function_slots_requested <= 0) { - t->function_slots_total = limits->cores; - } else if (!strncmp(t->func_exec_mode, "direct", strlen("direct"))) { + if (!strncmp(t->func_exec_mode, "direct", strlen("direct"))) { t->function_slots_total = 1; + } + else if (t->function_slots_requested <= 0) { + t->function_slots_total = limits->cores; } else { t->function_slots_total = t->function_slots_requested; } diff --git a/taskvine/src/manager/vine_task.c b/taskvine/src/manager/vine_task.c index b7a4122c5b..d67b1abdae 100644 --- a/taskvine/src/manager/vine_task.c +++ b/taskvine/src/manager/vine_task.c @@ -211,6 +211,8 @@ struct vine_task *vine_task_copy(const struct vine_task *task) vine_task_set_library_required(new, task->needs_library); if (task->provides_library) vine_task_set_library_provided(new, task->provides_library); + if (task->func_exec_mode) + vine_task_set_function_exec_mode(new, task->func_exec_mode); if (task->tag) vine_task_set_tag(new, task->tag); if (task->category)