Skip to content

Commit

Permalink
add support for fork and direct
Browse files Browse the repository at this point in the history
  • Loading branch information
tphung3 committed Oct 29, 2024
1 parent feca3d4 commit 8e4429d
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 13 deletions.
27 changes: 19 additions & 8 deletions poncho/src/poncho/library_network_code.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,15 @@
import socket
from threadpoolctl import threadpool_limits

def dd(msg):
if os.path.isfile('/tmp/tmp.log'):
mode = 'a'
else:
mode = 'w'
timestamp = datetime.now().strftime("%m/%d/%y %H:%M:%S.%f")
with open('/tmp/tmp.log', mode) as f:
print(timestamp, msg, file=f)

# self-pipe to turn a sigchld signal when a child finishes execution
# into an I/O event.
r, w = os.pipe()
Expand Down Expand Up @@ -302,7 +311,7 @@ def main():
help="pid of main vine worker to send sigchild to let it know theres some result.",
)
args = parser.parse_args()

dd('args parsed ok')
# check if library cores and function slots are valid
if args.function_slots > args.library_cores:
stdout_timed_message("error: function slots cannot be more than library cores")
Expand Down Expand Up @@ -351,19 +360,20 @@ def main():
# read in information about this library
with open('library_info.clpk', 'rb') as f:
library_info = cloudpickle.load(f)

dd('library_info loaded ok')
# register functions in this library to the global namespace
for func_name in library_info['function_list']:
func_code = remote_execute(cloudpickle.loads(library_info['function_list'][func_name]))
globals()[func_name] = func_code

dd(f'f{func_name} registered')
# load and execute this library's context
library_context_info = cloudpickle.loads(library_info['context_info'])
context_func = library_context_info[0]
context_args = library_context_info[1]
context_kwargs = library_context_info[2]
context_func(*context_args, **context_kwargs)

if library_context_info:
context_func = library_context_info[0]
context_args = library_context_info[1]
context_kwargs = library_context_info[2]
context_func(*context_args, **context_kwargs)
dd('library-context-info loaded ok')
# send configuration of library, just its name for now
config = {
"name": library_info['library_name']
Expand Down Expand Up @@ -427,6 +437,7 @@ def main():


if __name__ == '__main__':
dd('start main')
main()


Expand Down
8 changes: 7 additions & 1 deletion poncho/src/poncho/package_serverize.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,8 @@ def pack_library_code(path, envpath):
# @param function_list A list of functions in the library
# @param poncho_env The name of an already prepared poncho environment
# @param init_command A string describing a shell command to execute before the library task is run
# @param add_env Whether to automatically create and/or add environment to the library
# @param add_env Whether to automatically create and/or add environment to the library
# @param exec_mode The execution mode of functions in this library.
# @param hoisting_modules A list of modules imported at the preamble of library, including packages, functions and classes.
# @param library_context_info A list containing [library_context_func, library_context_args, library_context_kwargs]. Used to create the library context on remote nodes.
# @return A hash value.
Expand All @@ -183,6 +184,7 @@ def generate_library_hash(library_name,
poncho_env,
init_command,
add_env,
exec_mode,
hoisting_modules,
library_context_info):
library_info = [library_name]
Expand Down Expand Up @@ -221,6 +223,7 @@ def generate_library_hash(library_name,
library_info.append(str(poncho_env))
library_info.append(str(init_command))
library_info.append(str(add_env))
library_info.append(str(exec_mode))
library_info.append(str(hoisting_modules))

if library_context_info:
Expand Down Expand Up @@ -287,6 +290,7 @@ def generate_taskvine_library_code(library_path, hoisting_modules=None):
# @param functions list of functions to include in the library
# @param library_name name of the library
# @param need_pack whether to create a poncho environment tarball
# @param exec_mode execution mode of functions in this library
# @param hoisting_modules a list of modules to be imported at the preamble of library
# @param library_context_info a list containing a library's context to be created remotely
# @return name of the file containing serialized information about the library
Expand All @@ -297,6 +301,7 @@ def generate_library(library_cache_path,
functions,
library_name,
need_pack=True,
exec_mode='fork',
hoisting_modules=None,
library_context_info=None
):
Expand All @@ -306,6 +311,7 @@ def generate_library(library_cache_path,
for func in functions:
library_info['function_list'][func.__name__] = cloudpickle.dumps(func)
library_info['library_name'] = library_name
library_info['exec_mode'] = exec_mode
library_info['context_info'] = cloudpickle.dumps(library_context_info)
with open(library_info_path, 'wb') as f:
cloudpickle.dump(library_info, f)
Expand Down
10 changes: 9 additions & 1 deletion taskvine/src/bindings/python3/ndcctools/taskvine/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -943,7 +943,14 @@ def create_library_from_functions(self, library_name, *function_list, poncho_env
raise ValueError('A library cannot have 0 functions.')

# Create a unique hash of a library from all information that determine a library's uniqueness.
library_hash = package_serverize.generate_library_hash(library_name, function_list, poncho_env, init_command, add_env, hoisting_modules, library_context_info)
library_hash = package_serverize.generate_library_hash(library_name=library_name,
function_list=function_list,
poncho_env=poncho_env,
init_command=init_command,
add_env=add_env,
exec_mode=exec_mode,
hoisting_modules=hoisting_modules,
library_context_info=library_context_info)

# Create path for caching library code and environment based on function hash.
library_cache_dir_name = "vine-library-cache"
Expand Down Expand Up @@ -989,6 +996,7 @@ def create_library_from_functions(self, library_name, *function_list, poncho_env
functions=function_list,
library_name=library_name,
need_pack=need_pack,
exec_mode=exec_mode,
hoisting_modules=hoisting_modules,
library_context_info=library_context_info)

Expand Down
7 changes: 4 additions & 3 deletions taskvine/src/manager/vine_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -2728,10 +2728,11 @@ static vine_result_code_t start_one_task(struct vine_manager *q, struct vine_wor
*/

if (t->provides_library) {
if (t->function_slots_requested <= 0) {
t->function_slots_total = limits->cores;
} else if (!strncmp(t->func_exec_mode, "direct", strlen("direct"))) {
if (!strncmp(t->func_exec_mode, "direct", strlen("direct"))) {
t->function_slots_total = 1;
}
else if (t->function_slots_requested <= 0) {
t->function_slots_total = limits->cores;
} else {
t->function_slots_total = t->function_slots_requested;
}
Expand Down
2 changes: 2 additions & 0 deletions taskvine/src/manager/vine_task.c
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,8 @@ struct vine_task *vine_task_copy(const struct vine_task *task)
vine_task_set_library_required(new, task->needs_library);
if (task->provides_library)
vine_task_set_library_provided(new, task->provides_library);
if (task->func_exec_mode)
vine_task_set_function_exec_mode(new, task->func_exec_mode);
if (task->tag)
vine_task_set_tag(new, task->tag);
if (task->category)
Expand Down

0 comments on commit 8e4429d

Please sign in to comment.