Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Robust Error Checking to Library and Worker's Communication Patterns #3974

Merged
merged 9 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions doc/manuals/taskvine/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -2815,13 +2815,24 @@ The `compute` call above may receive the following keyword arguments:
| resources\_mode | [Automatic resource management](#automatic-resource-management) to use, e.g., "fixed", "max", or "max throughput"|
| task\_mode | Mode to execute individual tasks, such as [function calls](#serverless-computing). to use, e.g., "tasks", or "function-calls"|

## Appendix for Developers

### Library - Worker Communication Patterns

### Further Information
This subsection describes the communication patterns between a library and a worker, agnostic of programming languages a library is implemented in.

Upon library startup, it should send to its worker a json object as a byte stream.
The json object should have the following keys and associated values' types: `{"name": type-string, "taskid": type-int, "exec\_mode": type-string}`.
`"name"` should be the name of the library.
`"taskid"` should be the library' taskid as assigned by a taskvine manager.
`"exec\_mode"` should be the function execution mode of the library.
A worker upon receiving a proper library startup message should check all keys against what it knows about the library, and mark the library as ready to receive function calls if the library passes the worker's startup check.

## Further Information

For more information, please see [Getting Help](../help.md) or visit the [Cooperative Computing Lab](http://ccl.cse.nd.edu) website.

### Copyright
## Copyright

CCTools is Copyright (C) 2022 The University of Notre Dame. This software is distributed under the GNU General Public License Version 2. See the file COPYING for
details.
12 changes: 7 additions & 5 deletions poncho/src/poncho/library_network_code.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,16 +379,18 @@ def main():
if context_vars:
(load_variable_from_library.__globals__).update(context_vars)

# set execution mode of functions in this library
global exec_method
exec_method = library_info['exec_mode']

# send configuration of library, just its name for now
config = {
"name": library_info['library_name']
"name": library_info['library_name'],
"taskid": args.task_id,
"exec_mode": exec_method,
}
send_configuration(config, out_pipe_fd, args.worker_pid)

# set execution mode of functions in this library
global exec_method
exec_method = library_info['exec_mode']

# register sigchld handler to turn a sigchld signal into an I/O event
signal.signal(signal.SIGCHLD, sigchld_handler)

Expand Down
7 changes: 5 additions & 2 deletions taskvine/src/manager/vine_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -1461,9 +1461,10 @@ static vine_msg_code_t handle_taskvine(struct vine_manager *q, struct vine_worke
{
char items[4][VINE_LINE_MAX];
int worker_protocol;
int worker_library_protocol;

int n = sscanf(line, "taskvine %d %s %s %s %s", &worker_protocol, items[0], items[1], items[2], items[3]);
if (n != 5)
int n = sscanf(line, "taskvine %d %d %s %s %s %s", &worker_protocol, &worker_library_protocol, items[0], items[1], items[2], items[3]);
tphung3 marked this conversation as resolved.
Show resolved Hide resolved
if (n != 6)
return VINE_MSG_FAILURE;

if (worker_protocol != VINE_PROTOCOL_VERSION) {
Expand All @@ -1472,6 +1473,8 @@ static vine_msg_code_t handle_taskvine(struct vine_manager *q, struct vine_worke
return VINE_MSG_FAILURE;
}

w->library_protocol_version = worker_library_protocol;

if (w->hostname)
free(w->hostname);
if (w->os)
Expand Down
1 change: 1 addition & 0 deletions taskvine/src/manager/vine_manager_put.c
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,7 @@ vine_result_code_t vine_manager_put_task(
if (t->provides_library) {
vine_manager_send(q, w, "provides_library %s\n", t->provides_library);
vine_manager_send(q, w, "function_slots %d\n", t->function_slots_total);
vine_manager_send(q, w, "func_exec_mode %d\n", t->func_exec_mode);
}

vine_manager_send(q, w, "category %s\n", t->category);
Expand Down
4 changes: 3 additions & 1 deletion taskvine/src/manager/vine_protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ worker, and catalog, but should not be visible to the public user API.
#ifndef VINE_PROTOCOL_H
#define VINE_PROTOCOL_H

#define VINE_PROTOCOL_VERSION 11
#define VINE_PROTOCOL_VERSION 12

#define VINE_LIBRARY_PROTOCOL_VERSION 1
tphung3 marked this conversation as resolved.
Show resolved Hide resolved

#define VINE_LINE_MAX 4096 /**< Maximum length of a vine message line. */

Expand Down
7 changes: 7 additions & 0 deletions taskvine/src/manager/vine_schedule.c
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ See the file COPYING for details.
#include "vine_file.h"
#include "vine_file_replica.h"
#include "vine_mount.h"
#include "vine_protocol.h"

#include "debug.h"
#include "hash_table.h"
Expand Down Expand Up @@ -203,6 +204,12 @@ int check_worker_against_task(struct vine_manager *q, struct vine_worker_info *w
return 0;
}

/* Check if the library and the worker have the same library protocol version. */
if (t->provides_library && (w->library_protocol_version != VINE_LIBRARY_PROTOCOL_VERSION)) {
debug(D_VINE, "Worker %s can't run library with id %d due to mismatched library protocol version.", w->workerid, t->task_id);
return 0;
}

/* Compute the resources to allocate to this task. */
struct rmsummary *l = vine_manager_choose_resources_for_task(q, w, t);

Expand Down
32 changes: 27 additions & 5 deletions taskvine/src/manager/vine_task.c
Original file line number Diff line number Diff line change
Expand Up @@ -328,11 +328,7 @@ void vine_task_set_function_exec_mode(struct vine_task *t, vine_task_func_exec_m
void vine_task_set_function_exec_mode_from_string(struct vine_task *t, const char *exec_mode)
{
if (exec_mode && t->provides_library) {
if (!strncmp(exec_mode, "fork", strlen("fork"))) {
t->func_exec_mode = VINE_TASK_FUNC_EXEC_MODE_FORK;
} else {
t->func_exec_mode = VINE_TASK_FUNC_EXEC_MODE_DIRECT;
}
t->func_exec_mode = vine_task_func_exec_mode_from_string(exec_mode);
}
}

Expand Down Expand Up @@ -1005,3 +1001,29 @@ char *vine_task_to_json(struct vine_task *t)
buffer_free(&b);
return json;
}

tphung3 marked this conversation as resolved.
Show resolved Hide resolved
/* Converts an int64_t to a valid vine_task_func_exec_mode_t.
* Returns VINE_TASK_FUNC_EXEC_MODE_INVALID if there's no valid mode for the integer. */
vine_task_func_exec_mode_t vine_task_func_exec_mode_from_int64_t(int64_t n)
{
if (n == 1) {
return VINE_TASK_FUNC_EXEC_MODE_DIRECT;
}
if (n == 2) {
return VINE_TASK_FUNC_EXEC_MODE_FORK;
}
return VINE_TASK_FUNC_EXEC_MODE_INVALID;
}

/* Converts a string to a valid vine_task_func_exec_mode_t.
* Returns VINE_TASK_FUNC_EXEC_MODE_INVALID if there's no valid mode for the string. */
vine_task_func_exec_mode_t vine_task_func_exec_mode_from_string(const char *exec_mode)
{
if (!strncmp(exec_mode, "direct", strlen("direct"))) {
return VINE_TASK_FUNC_EXEC_MODE_DIRECT;
}
if (!strncmp(exec_mode, "fork", strlen("fork"))) {
return VINE_TASK_FUNC_EXEC_MODE_FORK;
}
return VINE_TASK_FUNC_EXEC_MODE_INVALID;
}
6 changes: 5 additions & 1 deletion taskvine/src/manager/vine_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ typedef enum {
} vine_task_state_t;

typedef enum {
VINE_TASK_FUNC_EXEC_MODE_DIRECT = 0, /**< A library task will execute function calls directly in its process **/
VINE_TASK_FUNC_EXEC_MODE_INVALID = -1,
VINE_TASK_FUNC_EXEC_MODE_DIRECT = 1, /**< A library task will execute function calls directly in its process **/
VINE_TASK_FUNC_EXEC_MODE_FORK, /**< A library task will fork and execute each function call. **/
} vine_task_func_exec_mode_t;

Expand Down Expand Up @@ -164,6 +165,9 @@ const char *vine_task_state_to_string( vine_task_state_t task_state );
struct jx * vine_task_to_jx( struct vine_manager *q, struct vine_task *t );
char * vine_task_to_json(struct vine_task *t);

vine_task_func_exec_mode_t vine_task_func_exec_mode_from_int64_t(int64_t n);
vine_task_func_exec_mode_t vine_task_func_exec_mode_from_string(const char *exec_mode);


/** Attach an input or outputs to tasks without declaring files to manager.
* Only really useful at the worker where tasks are created without a manager. */
Expand Down
3 changes: 3 additions & 0 deletions taskvine/src/manager/vine_worker_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ struct vine_worker_info {
/* Connection to the worker or other client. */
struct link *link;

/* Library protocol version of this worker. */
int library_protocol_version;

/* Static properties reported by worker when it connects. */
char *hostname;
char *os;
Expand Down
35 changes: 29 additions & 6 deletions taskvine/src/worker/vine_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -564,8 +564,9 @@ static void report_worker_ready(struct link *manager)
}

send_async_message(manager,
"taskvine %d %s %s %s %d.%d.%d\n",
"taskvine %d %d %s %s %s %d.%d.%d\n",
VINE_PROTOCOL_VERSION,
VINE_LIBRARY_PROTOCOL_VERSION,
hostname,
options->os_name,
options->arch_name,
Expand Down Expand Up @@ -865,6 +866,14 @@ static struct vine_task *do_task_body(struct link *manager, int task_id, time_t
task->function_slots_requested = n;
/* Also set the total number determined by the manager. */
task->function_slots_total = n;
} else if (sscanf(line, "func_exec_mode %" PRId64, &n) == 1) {
vine_task_func_exec_mode_t func_exec_mode = vine_task_func_exec_mode_from_int64_t(n);
if (func_exec_mode == VINE_TASK_FUNC_EXEC_MODE_INVALID) {
debug(D_VINE | D_NOTICE, "invalid func_exec_mode from manager: %s", line);
vine_task_delete(task);
return 0;
}
task->func_exec_mode = func_exec_mode;
} else if (sscanf(line, "infile %s %s %d", localname, taskname_encoded, &flags)) {
url_decode(taskname_encoded, taskname, VINE_LINE_MAX);
vine_hack_do_not_compute_cached_name = 1;
Expand Down Expand Up @@ -1546,10 +1555,25 @@ static int check_library_startup(struct vine_process *p)
struct jx *response = jx_parse_string(buffer);

const char *name = jx_lookup_string(response, "name");
int taskid = jx_lookup_integer(response, "taskid");
const char *exec_mode = jx_lookup_string(response, "exec_mode");

int ok = 1;

int ok = 0;
if (!strcmp(name, p->task->provides_library)) {
ok = 1;
if (!name || !taskid || !exec_mode) {
ok = 0;
} else {
vine_task_func_exec_mode_t converted_exec_mode = vine_task_func_exec_mode_from_string(exec_mode);

if (!p->task->provides_library || strcmp(name, p->task->provides_library)) {
ok = 0;
}
if (taskid != p->task->task_id) {
ok = 0;
}
if (p->task->func_exec_mode && converted_exec_mode != p->task->func_exec_mode) {
ok = 0;
}
}
if (response) {
jx_delete(response);
Expand Down Expand Up @@ -1585,8 +1609,7 @@ static void check_libraries_ready(struct link *manager)
debug(D_VINE, "Library %s reports ready to execute functions.", library_process->task->provides_library);
library_process->library_ready = 1;
} else {
/* Kill library if the name reported back doesn't match its name or
* if there's any problem. */
/* Kill library if it fails the startup check. */
debug(D_VINE,
"Library %s task id %" PRIu64 " verification failed (unexpected response). Killing it.",
library_process->task->provides_library,
Expand Down
Loading