From 24520c9a1324fa798565f759fa46d5f32ae9dc1b Mon Sep 17 00:00:00 2001 From: Colin Thomas <33940547+colinthomas-z80@users.noreply.github.com> Date: Mon, 30 Sep 2024 11:09:16 -0400 Subject: [PATCH 1/8] vine: measure and report worker sandbox used (#3889) * measure sandbox on completion and report to manageR * fix disk check bug * remove task inputs from sandbox measurement * remove debug * update measurement strategy * format * refer to sandbox usage for category minimum * format * move max_disk_use to vine_stats->max_sandbox * use hash_table to encode excluded paths * minimum disk requested is always at least input size * do not add one to sandbox_used * pad with at least bucket_size/2 as we do for first allocations * update wq * treat sandbox as any other disk allocation * add VINE_RESULT_SANDBOX_EXHAUSTION * use padded sandbox as min * treat sandboxes differently than regular allocations otherwise FIXED allocations that did not specify disk are not retried. * format * add API call to enable/disable proportional resources * api * consider min resources for proportion * use user values * disable whole task proportion if specific resource was given * rename max_sandbox -> min_sandbox * do not double measure disk * set minimum disk usage from sandbox * update comments * disk only specified output and ephemeral files * proportion with available disk - cached * use already inuse_cache value in count_worker_resources * use available_disk with t->input_files_size * turn off prop whole tasks only for mem and disk * check for user resources not specified * fix conflict, input_size * format * macros to conver bytes to megabytes, etc. * correctly account for inuse_cache units * add DISK to vine_worker test * format --------- Co-authored-by: Benjamin Tovar --- doc/manuals/taskvine/index.md | 6 +- dttools/src/category.c | 5 + dttools/src/category.h | 3 + dttools/src/macros.h | 7 + dttools/src/path_disk_size_info.c | 10 +- dttools/src/path_disk_size_info.h | 8 +- dttools/src/rmonitor_poll.c | 4 +- taskvine/src/manager/taskvine.h | 18 +- .../src/manager/vine_file_replica_table.c | 14 ++ taskvine/src/manager/vine_manager.c | 191 +++++++++++++----- taskvine/src/manager/vine_manager.h | 2 + taskvine/src/manager/vine_protocol.h | 2 +- taskvine/src/manager/vine_schedule.c | 41 +++- taskvine/src/manager/vine_task.c | 1 + taskvine/src/manager/vine_task.h | 3 + taskvine/src/worker/vine_cache_file.c | 2 +- taskvine/src/worker/vine_process.c | 34 +++- taskvine/src/worker/vine_worker.c | 18 +- taskvine/test/TR_vine_single.sh | 1 + taskvine/test/vine_allocations.py | 2 +- taskvine/test/vine_common.sh | 2 +- work_queue/src/work_queue_process.c | 2 +- work_queue/src/work_queue_worker.c | 2 +- 23 files changed, 301 insertions(+), 77 deletions(-) diff --git a/doc/manuals/taskvine/index.md b/doc/manuals/taskvine/index.md index 2490ad873b..8904d91635 100644 --- a/doc/manuals/taskvine/index.md +++ b/doc/manuals/taskvine/index.md @@ -1928,8 +1928,8 @@ Consider now that the task requires 1 cores, 6GB of memory, and 27 GB of disk: !!! note If you want TaskVine to exactly allocate the resources you have - specified, use the `proportional-resources` and `proportional-whole-tasks` - parameters as shown [here](#tuning-specialized-execution-parameters). In + specified, use `m.disable_proportional_resources()` (see also `proportional-whole-tasks` + [here](#tuning-specialized-execution-parameters). In general, however, we have found that using proportions nicely adapts to the underlying available resources, and leads to very few resource exhaustion failures while still using worker resources efficiently. @@ -2535,10 +2535,10 @@ change. | min-transfer-timeout | Set the minimum number of seconds to wait for files to be transferred to or from a worker. | 10 | | monitor-interval | Maximum number of seconds between resource monitor measurements. If less than 1, use default. | 5 | | prefer-dispatch | If 1, try to dispatch tasks even if there are retrieved tasks ready to be reportedas done. | 0 | -| proportional-resources | If set to 0, do not assign resources proportionally to tasks. The default is to use proportions. (See [task resources.](#task-resources) | 1 | | proportional-whole-tasks | Round up resource proportions such that only an integer number of tasks could be fit in the worker. The default is to use proportions. (See [task resources.](#task-resources) | 1 | | ramp-down-heuristic | If set to 1 and there are more workers than tasks waiting, then tasks are allocated all the free resources of a worker large enough to run them. If monitoring watchdog is not enabled, then this heuristic has no effect. | 0 | | resource-submit-multiplier | Assume that workers have `resource x resources-submit-multiplier` available.
This overcommits resources at the worker, causing tasks to be sent to workers that cannot be immediately executed.
The extra tasks wait at the worker until resources become available. | 1 | +| sandbox-grow-factor | When task disk sandboxes are exhausted, increase the allocation using their measured valued times this factor. Minimum is 1.1. | 2 | | short-timeout | Set the minimum timeout in seconds when sending a brief message to a single worker. | 5 | | temp-replica-count | Number of temp file replicas created across workers | 0 | | transfer-outlier-factor | Transfer that are this many times slower than the average will be terminated. | 10 | diff --git a/dttools/src/category.c b/dttools/src/category.c index 29c2c97611..570bba3beb 100644 --- a/dttools/src/category.c +++ b/dttools/src/category.c @@ -921,6 +921,11 @@ const struct rmsummary *category_task_min_resources(struct category *c, struct r /* but don't go below the minimum defined for the category. */ rmsummary_merge_max(internal, c->min_allocation); + /* nor below the observed sandboxes if not in an auto mode */ + if (c->allocation_mode == CATEGORY_ALLOCATION_MODE_FIXED && user && user->disk < 0) { + internal->disk = MAX(internal->disk, c->min_vine_sandbox); + } + return internal; } diff --git a/dttools/src/category.h b/dttools/src/category.h index 0779d4fecf..91b7ed51d9 100644 --- a/dttools/src/category.h +++ b/dttools/src/category.h @@ -104,6 +104,9 @@ struct category { /* stats for taskvine */ struct vine_stats *vine_stats; + /* Max sandbox disk space observed, in MB. This is the minimum sandbox size needed if nothing else is known about the task.*/ + int64_t min_vine_sandbox; + /* variables for makeflow */ /* Mappings between variable names defined in the makeflow file and their values. */ struct hash_table *mf_variables; diff --git a/dttools/src/macros.h b/dttools/src/macros.h index 640c927fb8..43e617f8c3 100644 --- a/dttools/src/macros.h +++ b/dttools/src/macros.h @@ -39,6 +39,13 @@ See the file COPYING for details. #define TERABYTE TERA #define PETABYTE PETA +#define BYTES_TO_STORAGE_UNIT(x, unit) (((double) x) / unit) +#define BYTES_TO_KILOBYTES(x) BYTES_TO_STORAGE_UNIT(x, KILOBYTE) +#define BYTES_TO_MEGABYTES(x) BYTES_TO_STORAGE_UNIT(x, MEGABYTE) +#define BYTES_TO_GIGABYTES(x) BYTES_TO_STORAGE_UNIT(x, GIGABYTE) +#define BYTES_TO_TERABYTES(x) BYTES_TO_STORAGE_UNIT(x, TERABYTE) +#define BYTES_TO_PETABYTES(x) BYTES_TO_STORAGE_UNIT(x, PETABYTE) + #define USECOND 1000000 #endif diff --git a/dttools/src/path_disk_size_info.c b/dttools/src/path_disk_size_info.c index 55b8b401a1..79c7cce49f 100644 --- a/dttools/src/path_disk_size_info.c +++ b/dttools/src/path_disk_size_info.c @@ -23,7 +23,7 @@ struct DIR_with_name { char *name; }; -int path_disk_size_info_get(const char *path, int64_t *measured_size, int64_t *number_of_files) +int path_disk_size_info_get(const char *path, int64_t *measured_size, int64_t *number_of_files, struct hash_table *exclude_paths) { struct stat info; @@ -31,7 +31,7 @@ int path_disk_size_info_get(const char *path, int64_t *measured_size, int64_t *n if (result == 0) { if (S_ISDIR(info.st_mode)) { struct path_disk_size_info *state = NULL; - result = path_disk_size_info_get_r(path, -1, &state); + result = path_disk_size_info_get_r(path, -1, &state, exclude_paths); *measured_size = state->last_byte_size_complete; *number_of_files = state->last_file_count_complete; @@ -46,7 +46,7 @@ int path_disk_size_info_get(const char *path, int64_t *measured_size, int64_t *n return result; } -int path_disk_size_info_get_r(const char *path, int64_t max_secs, struct path_disk_size_info **state) +int path_disk_size_info_get_r(const char *path, int64_t max_secs, struct path_disk_size_info **state, struct hash_table *exclude_paths) { int64_t start_time = time(0); int result = 0; @@ -115,6 +115,10 @@ int path_disk_size_info_get_r(const char *path, int64_t max_secs, struct path_di snprintf(composed_path, PATH_MAX, "%s/%s", tail->name, entry->d_name); } + if (exclude_paths && hash_table_lookup(exclude_paths, composed_path)) { + continue; + } + if (lstat(composed_path, &file_info) < 0) { if (errno == ENOENT) { /* our DIR structure is stale, and a file went away. We simply do nothing. */ diff --git a/dttools/src/path_disk_size_info.h b/dttools/src/path_disk_size_info.h index f2c2f2d674..2208b962b0 100644 --- a/dttools/src/path_disk_size_info.h +++ b/dttools/src/path_disk_size_info.h @@ -9,6 +9,7 @@ See the file COPYING for details. #include "int_sizes.h" #include "list.h" +#include "hash_table.h" struct path_disk_size_info { int complete_measurement; @@ -29,9 +30,10 @@ Query disk space on the given directory. @param path Directory to be measured. @param *measured_size A pointer to an integer that will be filled with the total space in bytes. @param *number_of_files A pointer to an integer that will be filled with the total number of files, directories, and symbolic links. +@param exclude_paths Hash table with strings of paths that should not be measured. Values of the hash table are ignored. @return zero on success, -1 if an error is encounterd (see errno). */ -int path_disk_size_info_get(const char *path, int64_t *measured_size, int64_t *number_of_files); +int path_disk_size_info_get(const char *path, int64_t *measured_size, int64_t *number_of_files, struct hash_table *exclude_paths); /** Get a (perhaps partial) disk usage on path, but measure by max_secs at a time. If *state is NULL, start a new measurement, otherwise continue from @@ -40,9 +42,11 @@ When the function returns, if *state->complete_measurement is 1, then the measur @param path Directory to be measured. @param max_secs Maximum number of seconds to spend in the measurement. @param *state State of the measurement. +@param exclude_paths Hash table with strings of paths that should not be measured. Values of the hash table are ignored. @return zero on success, -1 if an error is encounterd (see errno). */ -int path_disk_size_info_get_r(const char *path, int64_t max_secs, struct path_disk_size_info **state); +int path_disk_size_info_get_r(const char *path, int64_t max_secs, struct path_disk_size_info **state, struct hash_table *exclude_paths); + void path_disk_size_info_delete_state(struct path_disk_size_info *state); diff --git a/dttools/src/rmonitor_poll.c b/dttools/src/rmonitor_poll.c index 9a25a2b626..532e5fb5d2 100644 --- a/dttools/src/rmonitor_poll.c +++ b/dttools/src/rmonitor_poll.c @@ -780,7 +780,7 @@ int rmonitor_get_wd_usage(struct rmonitor_wdir_info *d, int max_time_for_measure { /* We need a pointer to a pointer, which it is not possible from a struct. Use a dummy variable. */ struct path_disk_size_info *state = d->state; - int status = path_disk_size_info_get_r(d->path, max_time_for_measurement, &state); + int status = path_disk_size_info_get_r(d->path, max_time_for_measurement, &state, NULL); d->state = state; @@ -945,7 +945,7 @@ struct rmsummary *rmonitor_measure_host(char *path) struct rmsummary *tr = rmsummary_create(-1); if (path) { - path_disk_size_info_get(path, &total_disk, &file_count); + path_disk_size_info_get(path, &total_disk, &file_count, NULL); tr->disk = ((double)total_disk) / ONE_MEGABYTE; tr->total_files = file_count; } diff --git a/taskvine/src/manager/taskvine.h b/taskvine/src/manager/taskvine.h index cdcfaf6ec3..f74e2d8f9a 100644 --- a/taskvine/src/manager/taskvine.h +++ b/taskvine/src/manager/taskvine.h @@ -105,7 +105,8 @@ typedef enum { VINE_RESULT_FIXED_LOCATION_MISSING = 10 << 3, /**< The task failed because no worker could satisfy the fixed location input file requirements. */ VINE_RESULT_CANCELLED = 11 << 3, /**< The task was cancelled by the caller. */ - VINE_RESULT_LIBRARY_EXIT = 12 << 3 /**< Task is a library that has terminated. **/ + VINE_RESULT_LIBRARY_EXIT = 12 << 3, /**< Task is a library that has terminated. **/ + VINE_RESULT_SANDBOX_EXHAUSTION = 13 << 3 /**< The task used more disk than the allowed sandbox. **/ } vine_result_t; /** Select how to allocate resources for similar tasks with @ref vine_set_category_mode */ @@ -1088,6 +1089,21 @@ int vine_enable_peer_transfers(struct vine_manager *m); /** Disable taskvine peer transfers to be scheduled by the manager **/ int vine_disable_peer_transfers(struct vine_manager *m); +/** When enabled, resources to tasks in are assigned in proportion to the size +of the worker. If a resource is specified (e.g. with @ref vine_task_set_cores), +proportional resources never go below explicit specifications. This mode is most +useful when only some of the resources are explicitely specified, or +with automatic resource allocation. By default it is enabled. +@param m A manager object + **/ +int vine_enable_proportional_resources(struct vine_manager *m); + +/** Disable proportional resources. See @ref vine_enable_proportional_resources. + * Proportional resources are enabled by default. +@param m A manager object + **/ +int vine_disable_proportional_resources(struct vine_manager *m); + /** Set the minimum task_id of future submitted tasks. Further submitted tasks are guaranteed to have a task_id larger or equal to minid. This function is useful to make task_ids consistent in a workflow that diff --git a/taskvine/src/manager/vine_file_replica_table.c b/taskvine/src/manager/vine_file_replica_table.c index d577d5def7..1395658331 100644 --- a/taskvine/src/manager/vine_file_replica_table.c +++ b/taskvine/src/manager/vine_file_replica_table.c @@ -4,6 +4,8 @@ Copyright (C) 2022- The University of Notre Dame See the file COPYING for details. */ +#include + #include "vine_file_replica_table.h" #include "set.h" #include "vine_blocklist.h" @@ -25,6 +27,12 @@ int vine_file_replica_table_insert(struct vine_manager *m, struct vine_worker_in w->inuse_cache += replica->size; hash_table_insert(w->current_files, cachename, replica); + double prev_available = w->resources->disk.total - ceil(BYTES_TO_MEGABYTES(w->inuse_cache + replica->size)); + if (prev_available >= m->current_max_worker->disk) { + /* the current worker may have been the one with the maximum available space, so we update it. */ + m->current_max_worker->disk = w->resources->disk.total - ceil(BYTES_TO_MEGABYTES(w->inuse_cache)); + } + struct set *workers = hash_table_lookup(m->file_worker_table, cachename); if (!workers) { workers = set_create(4); @@ -44,6 +52,12 @@ struct vine_file_replica *vine_file_replica_table_remove(struct vine_manager *m, w->inuse_cache -= replica->size; } + double available = w->resources->disk.total - BYTES_TO_MEGABYTES(w->inuse_cache); + if (available > m->current_max_worker->disk) { + /* the current worker has more space than we knew before for all workers, so we update it. */ + m->current_max_worker->disk = available; + } + struct set *workers = hash_table_lookup(m->file_worker_table, cachename); if (workers) { diff --git a/taskvine/src/manager/vine_manager.c b/taskvine/src/manager/vine_manager.c index 6c508354a4..2be3d275a2 100644 --- a/taskvine/src/manager/vine_manager.c +++ b/taskvine/src/manager/vine_manager.c @@ -500,7 +500,7 @@ static vine_result_code_t get_completion_result(struct vine_manager *q, struct v struct vine_task *t; int task_status, exit_status; uint64_t task_id; - int64_t output_length, bytes_sent; + int64_t output_length, bytes_sent, sandbox_used; timestamp_t execution_time, start_time, end_time; timestamp_t observed_execution_time; @@ -508,13 +508,14 @@ static vine_result_code_t get_completion_result(struct vine_manager *q, struct v // Format: task completion status, exit status (exit code or signal), output length, bytes_sent, execution time, // task_id int n = sscanf(line, - "complete %d %d %" SCNd64 " %" SCNd64 " %" SCNd64 " %" SCNd64 " %" SCNd64 "", + "complete %d %d %" SCNd64 " %" SCNd64 " %" SCNd64 " %" SCNd64 " %" SCNd64 " %" SCNd64 "", &task_status, &exit_status, &output_length, &bytes_sent, &start_time, &end_time, + &sandbox_used, &task_id); if (n < 7) { @@ -591,6 +592,14 @@ static vine_result_code_t get_completion_result(struct vine_manager *q, struct v } } + t->sandbox_measured = sandbox_used; + + /* Update category disk info */ + struct category *c = vine_category_lookup_or_create(q, t->category); + if (sandbox_used > c->min_vine_sandbox) { + c->min_vine_sandbox = sandbox_used; + } + hash_table_insert(q->workers_with_complete_tasks, w->hashkey, w); } @@ -1293,6 +1302,9 @@ static int fetch_outputs_from_worker(struct vine_manager *q, struct vine_worker_ resource_monitor_compress_logs(q, t); } + // fill in measured disk as it comes from a different info source. + t->resources_measured->disk = MAX(t->resources_measured->disk, t->sandbox_measured); + // Finish receiving output. t->time_when_done = timestamp_get(); @@ -2507,21 +2519,20 @@ static int build_poll_table(struct vine_manager *q) return n; } -/* - * Use declared dependencies to estimate the minimum disk requriement of a task - */ -static void vine_manager_estimate_task_disk_min(struct vine_manager *q, struct vine_task *t) +static void vine_manager_compute_input_size(struct vine_manager *q, struct vine_task *t) { - int mb = 0; + t->input_files_size = -1; + + int64_t input_size = 0; struct vine_mount *m; LIST_ITERATE(t->input_mounts, m) { - mb += (m->file->size) / 1e6; - } - if (mb > 0) { - struct category *c = category_lookup_or_create(q->categories, vine_task_get_category(t)); - c->min_allocation->disk = MAX(mb, c->min_allocation->disk); + if (m->file->state == VINE_FILE_STATE_CREATED) { + input_size += m->file->size; + } } + + t->input_files_size = (int64_t)ceil(((double)input_size) / ONE_MEGABYTE); } /* @@ -2547,50 +2558,80 @@ struct rmsummary *vine_manager_choose_resources_for_task(struct vine_manager *q, return limits; } - if (t->resources_requested->disk < 0) { - vine_manager_estimate_task_disk_min(q, t); + if (t->input_files_size < 0) { + vine_manager_compute_input_size(q, t); } /* Compute the minimum and maximum resources for this task. */ const struct rmsummary *min = vine_manager_task_resources_min(q, t); const struct rmsummary *max = vine_manager_task_resources_max(q, t); + /* available disk for all sandboxes */ + int64_t available_disk = w->resources->disk.total - BYTES_TO_MEGABYTES(w->inuse_cache); + + /* do not count the size of input files as available. + * TODO: efficiently discount the size of files already at worker. */ + if (t->input_files_size > 0) { + available_disk -= t->input_files_size; + } + rmsummary_merge_override_basic(limits, max); int use_whole_worker = 1; + int proportional_whole_tasks = q->proportional_whole_tasks; + if (t->resources_requested->memory > -1 || t->resources_requested->disk > -1) { + /* if mem or disk are specified explicitely, do not expand resources to fill an integer number of tasks. With this, + * the task is assigned exactly the memory and disk specified. We do not do this for cores and gpus, as the use case + * here is to specify the number of cores and allocated the rest of the resources evenly. */ + proportional_whole_tasks = 0; + } + /* Proportionally assign the worker's resources to the task if configured. */ if (q->proportional_resources) { /* Compute the proportion of the worker the task shall have across resource types. */ double max_proportion = -1; + double min_proportion = -1; + if (w->resources->cores.total > 0) { max_proportion = MAX(max_proportion, limits->cores / w->resources->cores.total); + min_proportion = MAX(min_proportion, min->cores / w->resources->cores.total); } if (w->resources->memory.total > 0) { max_proportion = MAX(max_proportion, limits->memory / w->resources->memory.total); + min_proportion = MAX(min_proportion, min->memory / w->resources->memory.total); } - if (w->resources->disk.total > 0) { - max_proportion = MAX(max_proportion, limits->disk / w->resources->disk.total); + if (available_disk > 0) { + max_proportion = MAX(max_proportion, limits->disk / available_disk); + min_proportion = MAX(min_proportion, min->disk / available_disk); } if (w->resources->gpus.total > 0) { max_proportion = MAX(max_proportion, limits->gpus / w->resources->gpus.total); + min_proportion = MAX(min_proportion, min->gpus / w->resources->gpus.total); } - /* If max_proportion > 1, then the task does not fit the worker for the + /* If a max_proportion was defined, it cannot be less than a proportion using the minimum + * resources for the category. If it was defined, then the min_proportion is not relevant as the + * task will try to use the whole worker. */ + if (max_proportion != -1) { + max_proportion = MAX(max_proportion, min_proportion); + } + + /* If max_proportion or min_proportion > 1, then the task does not fit the worker for the * specified resources. For the unspecified resources we use the whole * worker as not to trigger a warning when checking for tasks that can't * run on any available worker. */ - if (max_proportion > 1) { + if (max_proportion > 1 || min_proportion > 1) { use_whole_worker = 1; } else if (max_proportion > 0) { use_whole_worker = 0; // adjust max_proportion so that an integer number of tasks fit the worker. - if (q->proportional_whole_tasks) { + if (proportional_whole_tasks) { max_proportion = 1.0 / (floor(1.0 / max_proportion)); } @@ -2610,21 +2651,21 @@ struct rmsummary *vine_manager_choose_resources_for_task(struct vine_manager *q, limits->memory = MAX(1, MAX(limits->memory, floor(w->resources->memory.total * max_proportion))); - /* worker's disk is shared even among tasks that are not running, + /* worker's disk is shared evenly among tasks that are not running, * thus the proportion is modified by the current overcommit * multiplier */ - limits->disk = MAX(1, MAX(limits->disk, floor((w->resources->disk.total - w->resources->disk.inuse) * max_proportion / q->resource_submit_multiplier))); + limits->disk = MAX(1, MAX(limits->disk, floor(available_disk * max_proportion / q->resource_submit_multiplier))); } } - /* If no resource was specified, using whole worker. */ + /* If no resource was specified, use whole worker. */ if (limits->cores < 1 && limits->gpus < 1 && limits->memory < 1 && limits->disk < 1) { use_whole_worker = 1; } /* At least one specified resource would use the whole worker, thus * using whole worker for all unspecified resources. */ if ((limits->cores > 0 && limits->cores >= w->resources->cores.total) || (limits->gpus > 0 && limits->gpus >= w->resources->gpus.total) || - (limits->memory > 0 && limits->memory >= w->resources->memory.total) || (limits->disk > 0 && limits->disk >= w->resources->disk.total)) { + (limits->memory > 0 && limits->memory >= w->resources->memory.total) || (limits->disk > 0 && limits->disk >= available_disk)) { use_whole_worker = 1; } @@ -2645,7 +2686,7 @@ struct rmsummary *vine_manager_choose_resources_for_task(struct vine_manager *q, } if (limits->disk <= 0) { - limits->disk = w->resources->disk.total; + limits->disk = available_disk; } } else if (vine_schedule_in_ramp_down(q)) { /* if in ramp down, use all the free space of that worker. note that we don't use @@ -2658,21 +2699,14 @@ struct rmsummary *vine_manager_choose_resources_for_task(struct vine_manager *q, } limits->memory = w->resources->memory.total - w->resources->memory.inuse; - limits->disk = w->resources->disk.total - w->resources->disk.inuse; + limits->disk = available_disk; } /* never go below specified min resources. */ rmsummary_merge_max(limits, min); - /* If the user specified resources, override proportional calculation */ - if (t->resources_requested->cores > 0) - limits->cores = t->resources_requested->cores; - if (t->resources_requested->memory > 0) - limits->memory = t->resources_requested->memory; - if (t->resources_requested->disk > 0) - limits->disk = t->resources_requested->disk; - if (t->resources_requested->gpus > 0) - limits->gpus = t->resources_requested->gpus; + /* assume the user knows what they are doing... */ + rmsummary_merge_override_basic(limits, t->resources_requested); return limits; } @@ -2751,12 +2785,7 @@ static void count_worker_resources(struct vine_manager *q, struct vine_worker_in w->resources->gpus.inuse += box->gpus; } - char *cachename; - struct vine_file_replica *replica; - HASH_TABLE_ITERATE(w->current_files, cachename, replica) - { - w->resources->disk.inuse += ((double)replica->size) / 1e6; - } + w->resources->disk.inuse += ceil(BYTES_TO_MEGABYTES(w->inuse_cache)); } static void update_max_worker(struct vine_manager *q, struct vine_worker_info *w) @@ -2776,8 +2805,8 @@ static void update_max_worker(struct vine_manager *q, struct vine_worker_info *w q->current_max_worker->memory = w->resources->memory.total; } - if (q->current_max_worker->disk < w->resources->disk.total) { - q->current_max_worker->disk = w->resources->disk.total; + if (q->current_max_worker->disk < (w->resources->disk.total - w->inuse_cache)) { + q->current_max_worker->disk = w->resources->disk.total - w->inuse_cache; } if (q->current_max_worker->gpus < w->resources->gpus.total) { @@ -2917,6 +2946,37 @@ static int resubmit_task_on_exhaustion(struct vine_manager *q, struct vine_worke return 0; } +/* 1 if task resubmitted, 0 otherwise */ +static int resubmit_task_on_sandbox_exhaustion(struct vine_manager *q, struct vine_worker_info *w, struct vine_task *t) +{ + if (t->result != VINE_RESULT_SANDBOX_EXHAUSTION) { + return 0; + } + + struct category *c = vine_category_lookup_or_create(q, t->category); + + /* on sandbox exhausted, the resources allocated correspond to the overflown sandbox */ + double sandbox = t->resources_allocated->disk; + + /* grow sandbox by given factor (default is two) */ + sandbox *= q->sandbox_grow_factor * sandbox; + + /* take the MAX in case min_vine_sandbox was updated before th result of this task was processed */ + c->min_vine_sandbox = MAX(c->min_vine_sandbox, sandbox); + + debug(D_VINE, "Task %d exhausted disk sandbox on %s (%s).\n", t->task_id, w->hostname, w->addrport); + double max_allowed_disk = MAX(t->resources_requested->disk, c->max_allocation->disk); + + if (max_allowed_disk > -1 && c->min_vine_sandbox < max_allowed_disk) { + debug(D_VINE, "Task %d failed given max disk limit for sandbox.\n", t->task_id); + return 0; + } + + change_task_state(q, t, VINE_TASK_READY); + + return 1; +} + static int resubmit_if_needed(struct vine_manager *q, struct vine_worker_info *w, struct vine_task *t) { /* in this function, any change_task_state should only be to VINE_TASK_READY */ @@ -2942,6 +3002,9 @@ static int resubmit_if_needed(struct vine_manager *q, struct vine_worker_info *w case VINE_RESULT_RESOURCE_EXHAUSTION: return resubmit_task_on_exhaustion(q, w, t); break; + case VINE_RESULT_SANDBOX_EXHAUSTION: + return resubmit_task_on_sandbox_exhaustion(q, w, t); + break; default: /* by default tasks are not resumitted */ return 0; @@ -3771,6 +3834,8 @@ struct vine_manager *vine_ssl_create(int port, const char *key, const char *cert q->current_max_worker = rmsummary_create(-1); q->max_task_resources_requested = rmsummary_create(-1); + q->sandbox_grow_factor = 2.0; + q->stats = calloc(1, sizeof(struct vine_stats)); q->stats_measure = calloc(1, sizeof(struct vine_stats)); @@ -3931,6 +3996,22 @@ int vine_disable_peer_transfers(struct vine_manager *q) return 1; } +int vine_enable_proportional_resources(struct vine_manager *q) +{ + debug(D_VINE, "Proportional resources enabled"); + q->proportional_resources = 1; + q->proportional_whole_tasks = 1; + return 1; +} + +int vine_disable_proportional_resources(struct vine_manager *q) +{ + debug(D_VINE, "Proportional resources disabled"); + q->proportional_resources = 0; + q->proportional_whole_tasks = 0; + return 1; +} + int vine_enable_disconnect_slow_workers_category(struct vine_manager *q, const char *category, double multiplier) { struct category *c = vine_category_lookup_or_create(q, category); @@ -4232,6 +4313,9 @@ char *vine_monitor_wrap(struct vine_manager *q, struct vine_worker_info *w, stru buffer_printf(&b, " --interval %d", q->monitor_interval); } + /* disable disk as it is measured throught the sandbox, otherwise we end up measuring twice. */ + buffer_printf(&b, " --without-disk-footprint"); + int extra_files = (q->monitor_mode & VINE_MON_FULL); char *monitor_cmd = resource_monitor_write_command("./" RESOURCE_MONITOR_REMOTE_NAME, @@ -4422,6 +4506,9 @@ const char *vine_result_string(vine_result_t result) case VINE_RESULT_LIBRARY_EXIT: str = "LIBRARY_EXIT"; break; + case VINE_RESULT_SANDBOX_EXHAUSTION: + str = "SANDBOX_EXHAUSTION"; + break; } return str; @@ -5075,6 +5162,7 @@ static struct vine_task *vine_wait_internal(struct vine_manager *q, int timeout, timestamp_t current_time = timestamp_get(); if (current_time - q->time_last_large_tasks_check >= q->large_task_check_interval) { q->time_last_large_tasks_check = current_time; + find_max_worker(q); vine_schedule_check_for_large_tasks(q); } @@ -5380,7 +5468,11 @@ int vine_tune(struct vine_manager *q, const char *name, double value) q->prefer_dispatch = !!((int)value); } else if (!strcmp(name, "force-proportional-resources") || !strcmp(name, "proportional-resources")) { - q->proportional_resources = MAX(0, (int)value); + if (value > 0) { + vine_enable_proportional_resources(q); + } else { + vine_disable_proportional_resources(q); + } } else if (!strcmp(name, "force-proportional-resources-whole-tasks") || !strcmp(name, "proportional-whole-tasks")) { q->proportional_whole_tasks = MAX(0, (int)value); @@ -5448,8 +5540,13 @@ int vine_tune(struct vine_manager *q, const char *name, double value) } else if (!strcmp(name, "option-blocklist-slow-workers-timeout")) { q->option_blocklist_slow_workers_timeout = MAX(0, value); /*todo: confirm 0 or 1*/ + } else if (!strcmp(name, "watch-library-logfiles")) { q->watch_library_logfiles = !!((int)value); + + } else if (!strcmp(name, "sandbox-grow-factor")) { + q->sandbox_grow_factor = MAX(1.1, value); + } else { debug(D_NOTICE | D_VINE, "Warning: tuning parameter \"%s\" not recognized\n", name); return -1; @@ -5733,6 +5830,7 @@ void vine_accumulate_task(struct vine_manager *q, struct vine_task *t) case VINE_RESULT_RESOURCE_EXHAUSTION: case VINE_RESULT_MAX_WALL_TIME: case VINE_RESULT_OUTPUT_TRANSFER_ERROR: + case VINE_RESULT_SANDBOX_EXHAUSTION: if (category_accumulate_summary(c, t->resources_measured, q->current_max_worker)) { vine_txn_log_write_category(q, c); } @@ -5752,11 +5850,15 @@ void vine_accumulate_task(struct vine_manager *q, struct vine_task *t) break; case VINE_RESULT_INPUT_MISSING: case VINE_RESULT_OUTPUT_MISSING: + case VINE_RESULT_FIXED_LOCATION_MISSING: + case VINE_RESULT_CANCELLED: + case VINE_RESULT_RMONITOR_ERROR: + case VINE_RESULT_STDOUT_MISSING: case VINE_RESULT_MAX_END_TIME: case VINE_RESULT_UNKNOWN: case VINE_RESULT_FORSAKEN: case VINE_RESULT_MAX_RETRIES: - default: + case VINE_RESULT_LIBRARY_EXIT: break; } } @@ -5839,7 +5941,6 @@ int vine_enable_category_resource(struct vine_manager *q, const char *category, const struct rmsummary *vine_manager_task_resources_max(struct vine_manager *q, struct vine_task *t) { - struct category *c = vine_category_lookup_or_create(q, t->category); return category_task_max_resources(c, t->resources_requested, t->resource_request, t->task_id); diff --git a/taskvine/src/manager/vine_manager.h b/taskvine/src/manager/vine_manager.h index 0ecc2d7bc7..8f753769f8 100644 --- a/taskvine/src/manager/vine_manager.h +++ b/taskvine/src/manager/vine_manager.h @@ -218,6 +218,8 @@ struct vine_manager { int max_library_retries; /* The maximum time that a library can be failed and retry another one, if over this count the library template will be removed */ int watch_library_logfiles; /* If true, watch the output files produced by each of the library processes running on the remote workers, take them back the current logging directory */ + double sandbox_grow_factor; /* When task disk sandboxes are exhausted, increase the allocation using their measured valued times this factor */ + /*todo: confirm datatype. int or int64*/ int max_task_stdout_storage; /* Maximum size of standard output from task. (If larger, send to a separate file.) */ int max_new_workers; /* Maximum number of workers to add in a single cycle before dealing with other matters. */ diff --git a/taskvine/src/manager/vine_protocol.h b/taskvine/src/manager/vine_protocol.h index bda88e43b4..7a3dca9fcd 100644 --- a/taskvine/src/manager/vine_protocol.h +++ b/taskvine/src/manager/vine_protocol.h @@ -13,7 +13,7 @@ worker, and catalog, but should not be visible to the public user API. #ifndef VINE_PROTOCOL_H #define VINE_PROTOCOL_H -#define VINE_PROTOCOL_VERSION 10 +#define VINE_PROTOCOL_VERSION 11 #define VINE_LINE_MAX 4096 /**< Maximum length of a vine message line. */ diff --git a/taskvine/src/manager/vine_schedule.c b/taskvine/src/manager/vine_schedule.c index 4dc3fccc16..1127e1e636 100644 --- a/taskvine/src/manager/vine_schedule.c +++ b/taskvine/src/manager/vine_schedule.c @@ -14,6 +14,7 @@ See the file COPYING for details. #include "debug.h" #include "hash_table.h" #include "list.h" +#include "macros.h" #include "rmonitor_types.h" #include "rmsummary.h" @@ -132,6 +133,31 @@ int check_worker_have_enough_resources(struct vine_manager *q, struct vine_worke return ok; } +/* t->disk only specifies the size of output and ephemeral files. Here we check if the task would fit together with all its input files + * taking into account that some files may be already at the worker. */ +int check_worker_have_enough_disk_with_inputs(struct vine_manager *q, struct vine_worker_info *w, struct vine_task *t) +{ + int ok = 1; + double available = w->resources->disk.total - MAX(0, t->resources_requested->disk) - w->resources->disk.inuse; + + struct vine_mount *m; + LIST_ITERATE(t->input_mounts, m) + { + if (hash_table_lookup(w->current_files, m->file->cached_name)) { + continue; + } + + available -= m->file->size; + + if (available < 0) { + ok = 1; + break; + } + } + + return ok; +} + /* Check if this task is compatible with this given worker by considering * resources availability, features, blocklist, and all other relevant factors. * Used by all scheduling methods for basic compatibility. @@ -147,7 +173,7 @@ int check_worker_against_task(struct vine_manager *q, struct vine_worker_info *w /* Otherwise library templates are modified during the run. */ /* worker has not reported any resources yet */ - if (w->resources->tag < 0 || w->resources->workers.total < 1) { + if (w->resources->tag < 0 || w->resources->workers.total < 1 || w->end_time < 0) { return 0; } @@ -155,6 +181,10 @@ int check_worker_against_task(struct vine_manager *q, struct vine_worker_info *w if (w->draining) { return 0; } + // if worker's end time has not been received + if (w->end_time < 0) { + return 0; + } /* Don't send tasks if a task recently failed at this worker. */ if (w->last_failure_time + q->transient_error_interval > timestamp_get()) { @@ -182,11 +212,6 @@ int check_worker_against_task(struct vine_manager *q, struct vine_worker_info *w } rmsummary_delete(l); - // if worker's end time has not been received - if (w->end_time < 0) { - return 0; - } - // if wall time for worker is specified and there's not enough time for task, then not ok if (w->end_time > 0) { double current_time = ((double)timestamp_get()) / ONE_SECOND; @@ -198,6 +223,10 @@ int check_worker_against_task(struct vine_manager *q, struct vine_worker_info *w } } + if (!check_worker_have_enough_disk_with_inputs(q, w, t)) { + return 0; + } + /* If the worker is not the one the task wants. */ if (t->has_fixed_locations && !check_fixed_location_worker(q, w, t)) { return 0; diff --git a/taskvine/src/manager/vine_task.c b/taskvine/src/manager/vine_task.c index 0813b6c1bd..a2771ab1ee 100644 --- a/taskvine/src/manager/vine_task.c +++ b/taskvine/src/manager/vine_task.c @@ -72,6 +72,7 @@ struct vine_task *vine_task_create(const char *command_line) t->resources_measured = rmsummary_create(-1); t->resources_allocated = rmsummary_create(-1); t->current_resource_box = 0; + t->input_files_size = -1; t->refcount = 1; t->output_received = 0; diff --git a/taskvine/src/manager/vine_task.h b/taskvine/src/manager/vine_task.h index bd882e13ef..c8a40217c4 100644 --- a/taskvine/src/manager/vine_task.h +++ b/taskvine/src/manager/vine_task.h @@ -64,6 +64,7 @@ struct vine_task { int max_retries; /**< Number of times the task is tried to be executed on some workers until success. If less than one, the task is retried indefinitely. See try_count below.*/ int max_forsaken; /**< Number of times the task is submitted to workers without being executed. If less than one, the task is retried indefinitely. See forsaken_count below.*/ int64_t min_running_time; /**< Minimum time (in seconds) the task needs to run. (see vine_worker --wall-time)*/ + int64_t input_files_size; /**< Size (in bytes) of input files. < 0 if the size of at least one of the input files is unknown. */ /***** Internal state of task as it works towards completion. *****/ @@ -121,6 +122,8 @@ struct vine_task { struct rmsummary *resources_measured; /**< When monitoring is enabled, it points to the measured resources used by the task in its latest attempt. */ struct rmsummary *resources_requested; /**< Number of cores, disk, memory, time, etc. the task requires. */ struct rmsummary *current_resource_box; /**< Resources allocated to the task on this specific worker. */ + + double sandbox_measured; /**< On completion, the maximum size observed of the disk used by the task for output and ephemeral files. */ int has_fixed_locations; /**< Whether at least one file was added with the VINE_FIXED_LOCATION flag. Task fails immediately if no worker can satisfy all the strict inputs of the task. */ diff --git a/taskvine/src/worker/vine_cache_file.c b/taskvine/src/worker/vine_cache_file.c index dddf9357a0..3e364b3319 100644 --- a/taskvine/src/worker/vine_cache_file.c +++ b/taskvine/src/worker/vine_cache_file.c @@ -119,7 +119,7 @@ int vine_cache_file_measure_metadata(const char *path, int *mode, int64_t *size, return 0; /* Measure the size of the item recursively, if a directory. */ - result = path_disk_size_info_get(path, size, &nfiles); + result = path_disk_size_info_get(path, size, &nfiles, NULL); if (result < 0) return 0; diff --git a/taskvine/src/worker/vine_process.c b/taskvine/src/worker/vine_process.c index af56468eac..619f4cfa6a 100644 --- a/taskvine/src/worker/vine_process.c +++ b/taskvine/src/worker/vine_process.c @@ -13,6 +13,7 @@ See the file COPYING for details. #include "vine_file.h" #include "vine_mount.h" #include "vine_worker.h" +#include "vine_cache.h" #include "change_process_title.h" #include "create_dir.h" @@ -48,6 +49,8 @@ See the file COPYING for details. #include #include +extern struct vine_cache *cache_manager; + /* Give the letter code used for the process sandbox dir. */ @@ -578,13 +581,15 @@ void vine_process_compute_disk_needed(struct vine_process *p) } } +static int vine_process_sandbox_disk_measure(struct vine_process *p, int max_time_on_measurement, struct path_disk_size_info **state); + int vine_process_measure_disk(struct vine_process *p, int max_time_on_measurement) { /* we can't have pointers to struct members, thus we create temp variables here */ struct path_disk_size_info *state = p->disk_measurement_state; - int result = path_disk_size_info_get_r(p->sandbox, max_time_on_measurement, &state); + int result = vine_process_sandbox_disk_measure(p, max_time_on_measurement, &state); /* not a memory leak... Either disk_measurement_state was NULL or the same as state. */ p->disk_measurement_state = state; @@ -600,4 +605,31 @@ int vine_process_measure_disk(struct vine_process *p, int max_time_on_measuremen return result; } +static int vine_process_sandbox_disk_measure(struct vine_process *p, int max_secs, struct path_disk_size_info **state) +{ + int num_inputs = 0; + if (p->task->input_mounts) { + num_inputs = list_size(p->task->input_mounts); + } + + struct hash_table *exclude_paths = NULL; + if (num_inputs > 0) { + exclude_paths = hash_table_create(2 * num_inputs, 0); + + struct vine_mount *m; + LIST_ITERATE(p->task->input_mounts, m) + { + hash_table_insert(exclude_paths, m->remote_name, (void *)1); + } + } + + int result = path_disk_size_info_get_r(p->sandbox, max_secs, state, exclude_paths); + + if (exclude_paths) { + hash_table_delete(exclude_paths); + } + + return result; +} + /* vim: set noexpandtab tabstop=4: */ diff --git a/taskvine/src/worker/vine_worker.c b/taskvine/src/worker/vine_worker.c index bf1f2f2e07..1c0975fbc4 100644 --- a/taskvine/src/worker/vine_worker.c +++ b/taskvine/src/worker/vine_worker.c @@ -286,25 +286,27 @@ void send_complete_tasks(struct link *l) output[p->output_length] = '\0'; close(output_file); send_async_message(l, - "complete %d %d %lld %lld %llu %llu %d\n%s", + "complete %d %d %lld %lld %llu %llu %d %d\n%s", p->result, p->exit_code, (long long)p->output_length, (long long)p->output_length, (unsigned long long)p->execution_start, (unsigned long long)p->execution_end, + p->sandbox_size, p->task->task_id, output); free(output); } else { send_async_message(l, - "complete %d %d %lld %lld %llu %llu %d\n", + "complete %d %d %lld %lld %llu %llu %d %d\n", p->result, p->exit_code, (long long)p->output_length, 0, (unsigned long long)p->execution_start, (unsigned long long)p->execution_end, + p->sandbox_size, p->task->task_id); } } @@ -350,7 +352,7 @@ static int64_t measure_worker_disk() return 0; char *cache_dir = vine_cache_data_path(cache_manager, "."); - path_disk_size_info_get_r(cache_dir, options->max_time_on_measurement, &state); + path_disk_size_info_get_r(cache_dir, options->max_time_on_measurement, &state, NULL); free(cache_dir); int64_t disk_measured = 0; @@ -1100,7 +1102,7 @@ static void kill_all_tasks() /* Check whether a given process is still within the various limits imposed on it. */ -static int enforce_process_limits(struct vine_process *p) +static int enforce_process_sanbox_limits(struct vine_process *p) { /* If the task did not set disk usage, return right away. */ if (p->task->resources_requested->disk < 1) @@ -1121,7 +1123,7 @@ static int enforce_process_limits(struct vine_process *p) /* Check all processes to see whether they have exceeded various limits, and kill if necessary. */ -static int enforce_processes_limits() +static int enforce_processes_sandbox_limits() { static time_t last_check_time = 0; @@ -1136,8 +1138,8 @@ static int enforce_processes_limits() ITABLE_ITERATE(procs_running, task_id, p) { - if (!enforce_process_limits(p)) { - finish_running_task(p, VINE_RESULT_RESOURCE_EXHAUSTION); + if (!enforce_process_sanbox_limits(p)) { + finish_running_task(p, VINE_RESULT_SANDBOX_EXHAUSTION); trash_file(p->sandbox); ok = 0; @@ -1682,7 +1684,7 @@ static void vine_worker_serve_manager(struct link *manager) /* end a running processes if goes above its declared limits. * Mark offending process as RESOURCE_EXHASTION. */ - enforce_processes_limits(); + enforce_processes_sandbox_limits(); /* end running processes if worker resources are exhasusted, and marked * them as FORSAKEN, so they can be resubmitted somewhere else. */ diff --git a/taskvine/test/TR_vine_single.sh b/taskvine/test/TR_vine_single.sh index 1a0e1de2c5..0b0b41caf3 100755 --- a/taskvine/test/TR_vine_single.sh +++ b/taskvine/test/TR_vine_single.sh @@ -1,6 +1,7 @@ #!/bin/sh CORES=1 +DISK=5000 TASKS=10 . ./vine_common.sh diff --git a/taskvine/test/vine_allocations.py b/taskvine/test/vine_allocations.py index 783762b9e4..b09e7a0cf3 100755 --- a/taskvine/test/vine_allocations.py +++ b/taskvine/test/vine_allocations.py @@ -77,7 +77,7 @@ def check_task(category, category_mode, max, min, expected): q.tune("force-proportional-resources", 1) check_task("only_memory", "fixed", max={"memory": worker_memory / 2}, min={}, expected={"cores": worker_cores / 2, "memory": worker_memory / 2, "disk": worker_disk / 2, "gpus": 0}) - check_task("only_memory_w_minimum", "fixed", max={"memory": worker_memory / 2}, min={"cores": 3, "gpus": 2}, expected={"cores": 3, "memory": worker_memory / 2, "disk": worker_disk / 2, "gpus": 2}) + check_task("only_memory_w_minimum", "fixed", max={"memory": worker_memory / 2}, min={"cores": 3, "gpus": 2}, expected={"cores": 4, "memory": worker_memory, "disk": worker_disk, "gpus": 2}) check_task("only_cores", "fixed", max={"cores": worker_cores}, min={}, expected={"cores": worker_cores, "memory": worker_memory, "disk": worker_disk, "gpus": 0}) diff --git a/taskvine/test/vine_common.sh b/taskvine/test/vine_common.sh index d42b4cb2bd..c394ecf6a2 100755 --- a/taskvine/test/vine_common.sh +++ b/taskvine/test/vine_common.sh @@ -26,7 +26,7 @@ EOF port=`cat master.port` echo "starting worker" - ../src/worker/vine_worker -o worker.log -d all localhost $port -b 1 --timeout 20 --cores $CORES --memory 50 --single-shot + ../src/worker/vine_worker -o worker.log -d all localhost $port -b 1 --timeout 20 --cores ${CORES:-1} --memory ${MEMORY:-250} --disk ${DISK:-2000} --single-shot echo "checking for output" i=0 diff --git a/work_queue/src/work_queue_process.c b/work_queue/src/work_queue_process.c index 763b3101c2..e611fe852a 100644 --- a/work_queue/src/work_queue_process.c +++ b/work_queue/src/work_queue_process.c @@ -377,7 +377,7 @@ int work_queue_process_measure_disk(struct work_queue_process *p, int max_time_o struct path_disk_size_info *state = p->disk_measurement_state; - int result = path_disk_size_info_get_r(p->sandbox, max_time_on_measurement, &state); + int result = path_disk_size_info_get_r(p->sandbox, max_time_on_measurement, &state, NULL); /* not a memory leak... Either disk_measurement_state was NULL or the same as state. */ p->disk_measurement_state = state; diff --git a/work_queue/src/work_queue_worker.c b/work_queue/src/work_queue_worker.c index 7bb5eb68a3..36c6d3223f 100644 --- a/work_queue/src/work_queue_worker.c +++ b/work_queue/src/work_queue_worker.c @@ -288,7 +288,7 @@ static int64_t measure_worker_disk() { static struct path_disk_size_info *state = NULL; - path_disk_size_info_get_r("./cache", max_time_on_measurement, &state); + path_disk_size_info_get_r("./cache", max_time_on_measurement, &state, NULL); int64_t disk_measured = 0; if(state->last_byte_size_complete >= 0) { From a495f9c5912d0730edd9a06e5dd064fdaf60b943 Mon Sep 17 00:00:00 2001 From: Benjamin Tovar Date: Wed, 9 Oct 2024 10:24:51 -0400 Subject: [PATCH 2/8] wq: install py example in correctd directory (#3954) --- work_queue/src/bindings/python3/Makefile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/work_queue/src/bindings/python3/Makefile b/work_queue/src/bindings/python3/Makefile index e50d078cfe..52889e71d7 100644 --- a/work_queue/src/bindings/python3/Makefile +++ b/work_queue/src/bindings/python3/Makefile @@ -31,5 +31,5 @@ install: all chmod 755 work_queue_example.py cp ndcctools/work_queue.py ndcctools/work_queue_display.py ndcctools/work_queue_dask.py ndcctools/cwork_queue.py $(WQPYTHONSO) $(CCTOOLS_PYTHON3_PATH)/ndcctools cp work_queue.py $(CCTOOLS_PYTHON3_PATH) - mkdir -p $(CCTOOLS_INSTALL_DIR)/cctools/doc/examples/work_queue/python - cp work_queue_example.py $(CCTOOLS_INSTALL_DIR)/cctools/doc/examples/work_queue/python + mkdir -p $(CCTOOLS_INSTALL_DIR)/doc/cctools/examples/work_queue/python + cp work_queue_example.py $(CCTOOLS_INSTALL_DIR)/doc/cctools/examples/work_queue/python From 9f0da1f88ca0aacfe73afc8f10a572bcc8de52e7 Mon Sep 17 00:00:00 2001 From: Benjamin Tovar Date: Tue, 15 Oct 2024 11:01:34 -0400 Subject: [PATCH 3/8] Poncho pinned (#3955) * correctly handle conda exec arg in poncho package create * use with open for spec * add pinned to spec --- poncho/src/poncho/package_create.py | 39 +++++++++++++++++++++-------- poncho/src/poncho_package_create | 2 +- 2 files changed, 30 insertions(+), 11 deletions(-) diff --git a/poncho/src/poncho/package_create.py b/poncho/src/poncho/package_create.py index ca7aaeb3a5..e336dbeced 100755 --- a/poncho/src/poncho/package_create.py +++ b/poncho/src/poncho/package_create.py @@ -179,6 +179,15 @@ def dict_to_env( return output +def _write_pinned_files(poncho_spec, env_dir): + if "pinned" in poncho_spec: + if "conda" in poncho_spec: + with open(os.path.join(env_dir, "env", "conda-meta", "pinned"), "w") as f: + for p, v in poncho_spec["pinned"]["conda"].items(): + assert isinstance(v, str) + f.write(f"{p}={v}\n") + + def pack_env_from_dict( spec, output, @@ -210,11 +219,21 @@ def pack_env_from_dict( http_data(spec, env_dir) # create conda environment in temp directory + logger.info("creating environment directory...") + _run_conda_command( + env_dir, + needs_confirmation, + "create", + ) + + _write_pinned_files(spec, env_dir) + + # update conda environment in temp directory from spec logger.info("populating environment...") _run_conda_command( env_dir, needs_confirmation, - "env create", + "env update", "--file", env_dir + "/conda_spec.yml", ) @@ -265,15 +284,15 @@ def pack_env( # else if spec is a file or from stdin elif os.path.isfile(spec) or spec == "-": - f = open(spec, "r") - poncho_spec = json.load(f) - pack_env_from_dict( - poncho_spec, - output, - conda_executable, - download_micromamba, - ignore_editable_packages, - ) + with open(spec, "r") as f: + poncho_spec = json.load(f) + pack_env_from_dict( + poncho_spec, + output, + conda_executable, + download_micromamba, + ignore_editable_packages, + ) # else pack from a conda environment name # this thus assumes conda executable is in the current shell executable path diff --git a/poncho/src/poncho_package_create b/poncho/src/poncho_package_create index d91074e8e3..3243dbd7ff 100755 --- a/poncho/src/poncho_package_create +++ b/poncho/src/poncho_package_create @@ -5,7 +5,7 @@ if __name__ == '__main__': parser = argparse.ArgumentParser(description='Create a packed environment from a spec, a conda environment name, or a conda directory.') parser.add_argument('spec', help='Read in a spec file, a conda environment name, a conda directory, or - for stdin.') parser.add_argument('output', help='Write output from conda-pack to the given file.') - parser.add_argument('--conda-executable', action='store_true', help='Path to conda executable to use. Default are, in this order: mamba, $CONDA_EXE, conda') + parser.add_argument('--conda-executable', action='store', help='Path to conda executable to use. Default are, in this order: mamba, $CONDA_EXE, conda') parser.add_argument('--no-micromamba', action='store_true', help='Do not try no download micromamba if a conda executable is not found.') parser.add_argument('--ignore-editable-packages', action='store_true', help='Skip checks for editable packages.') From 186c2a2c4c390900a0a339e1f7221a0067479d08 Mon Sep 17 00:00:00 2001 From: Saiful Islam Date: Tue, 15 Oct 2024 11:01:49 -0400 Subject: [PATCH 4/8] Added poncho package support to vine_submit_workers (#3952) Issue ##3762 --- taskvine/src/tools/vine_submit_workers | 51 +++++++++++++++++++++++--- 1 file changed, 46 insertions(+), 5 deletions(-) diff --git a/taskvine/src/tools/vine_submit_workers b/taskvine/src/tools/vine_submit_workers index 83cff94d50..8d3c04989c 100755 --- a/taskvine/src/tools/vine_submit_workers +++ b/taskvine/src/tools/vine_submit_workers @@ -7,6 +7,8 @@ use_manager_name=0 pwfile="" arguments="" +poncho_env="" + host= port= count=1 @@ -95,6 +97,7 @@ show_help() echo " --disk Manually set the amount of disk (in MB) reported by this worker." echo " --scratch-dir Set the scratch directory location created on the local machine (default=/tmp/USER-workers)." echo " -E,--worker-options Extra options passed to vine_worker." + echo " --poncho-env Run each worker inside this poncho environment." echo " -h,--help Show this help message." echo "" echo "batch specific options:" @@ -193,6 +196,16 @@ condor_submit_workers_command() { condor_setup + if [ -n "$poncho_env" ]; then + condor_transfer_input_files="${condor_transfer_input_files}, poncho_package_run, $poncho_env" + + executable="poncho_package_run" + arguments="-e $poncho_env -- ./vine_worker $arguments $host $port" + else + executable="vine_worker" + arguments="$arguments $host $port" + fi + if [ -n "${condor_docker_universe}" ] then cat > condor_submit_file <> condor_submit_file </dev/null) @@ -295,7 +314,7 @@ slurm_submit_workers_command() sbatch="echo sbatch" echo "######## worker.sh ########" echo "#!/bin/sh" - echo "${submit_dir}"/vine_worker $arguments $host $port + echo "${worker_command}" echo "####################################" fi @@ -305,7 +324,7 @@ slurm_submit_workers_command() to_submit=$((to_submit-1)) ${sbatch} --job-name vineWorker --ntasks=1 --nodes=1 $slurm_parameters <worker.sh </dev/null) + if [ $? != 0 ] + then + echo "$0: please add 'poncho_package_run' to your PATH." 1>&2 + exit 1 + fi + + cp "$poncho_bin" "${submit_dir}" + cp "$poncho_env" "${submit_dir}" + fi + cp "${worker}" "${submit_dir}" set_up_password_file From 3b898d47ae251f4b6d7836309fbbb9784e86dc5c Mon Sep 17 00:00:00 2001 From: Saiful Islam Date: Tue, 15 Oct 2024 11:02:07 -0400 Subject: [PATCH 5/8] Added missing help options ins vine_factory (#3950) --- batch_job/src/vine_factory.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/batch_job/src/vine_factory.c b/batch_job/src/vine_factory.c index b397a7d277..a9ff110c02 100644 --- a/batch_job/src/vine_factory.c +++ b/batch_job/src/vine_factory.c @@ -1168,8 +1168,9 @@ static void show_help(const char *cmd) printf(" %-30s Enable debugging for this subsystem.\n", "-d,--debug="); printf(" %-30s Send debugging to this file.\n", "-o,--debug-file="); printf(" %-30s Specify the size of the debug file.\n", "-O,--debug-file-size="); - printf(" %-30s Workers should use SSL to connect to managers. (Not needed if project names.)", "--ssl"); + printf(" %-30s Workers should use SSL to connect to managers. (Not needed if project names.)\n", "--ssl"); printf(" %-30s SNI domain name if different from manager hostname. Implies --ssl.\n", "--tls-sni="); + printf(" %-30s Set a custom factory name.\n", "--factory-name"); printf(" %-30s Show the version string.\n", "-v,--version"); printf(" %-30s Show this screen.\n", "-h,--help"); @@ -1198,7 +1199,7 @@ static void show_help(const char *cmd) printf(" %-30s Alternate binary instead of vine_worker.\n", "--worker-binary="); printf(" %-30s Wrap factory with this command prefix.\n","--wrapper"); printf(" %-30s Add this input file needed by the wrapper.\n","--wrapper-input"); - printf(" %-30s Run each worker inside this python environment.\n","--python-env="); + printf(" %-30s Run each worker inside this poncho environment.\n","--poncho-env="); printf("\nOptions specific to batch systems:\n"); printf(" %-30s Generic batch system options.\n", "-B,--batch-options="); From 36f1e3454c38d1e84bc4deee0d30bb79f9bb4c72 Mon Sep 17 00:00:00 2001 From: Saiful Islam Date: Tue, 15 Oct 2024 11:02:26 -0400 Subject: [PATCH 6/8] Fixed poncho doc package create and run issue (#3951) Both specs and output are required in poncho_package_create --- doc/manuals/poncho/index.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/doc/manuals/poncho/index.md b/doc/manuals/poncho/index.md index 9f14a4ec99..93c70f6c2c 100644 --- a/doc/manuals/poncho/index.md +++ b/doc/manuals/poncho/index.md @@ -63,14 +63,14 @@ This will create `package.json` with contents similar to this: Then to create a complete package from the specification: ``` -poncho_package_create package.json +poncho_package_create package.json package.tar.gz ``` Once created, this package can be moved to another machine for execution. Then, to run a program in the environment: ``` -poncho_package_run -e package.tar.gz -- example.py +poncho_package_run -e package.tar.gz -- python example.py ``` ## Specification File From 7efb35424c4f4e616d55a6883c8912a77e76c90a Mon Sep 17 00:00:00 2001 From: Kevin Xue Date: Tue, 15 Oct 2024 11:03:29 -0400 Subject: [PATCH 7/8] Hungry feature (#3949) * tmp commit * debug commit for hungry implementation * hungry implementation * remove limits.h> * lint * removed file * actually removed file * changed list iteration and removed file * removed comment * lint --------- Co-authored-by: Kevin Xue Co-authored-by: Kevin Xue --- hi.txt | 1 + taskvine/src/manager/vine_manager.c | 85 +++++++++++++++++++++-------- 2 files changed, 64 insertions(+), 22 deletions(-) create mode 100644 hi.txt diff --git a/hi.txt b/hi.txt new file mode 100644 index 0000000000..bca70f3531 --- /dev/null +++ b/hi.txt @@ -0,0 +1 @@ +q diff --git a/taskvine/src/manager/vine_manager.c b/taskvine/src/manager/vine_manager.c index 2be3d275a2..65c61d9660 100644 --- a/taskvine/src/manager/vine_manager.c +++ b/taskvine/src/manager/vine_manager.c @@ -5198,7 +5198,7 @@ struct vine_task *vine_manager_no_wait(struct vine_manager *q, const char *tag, // check if workers' resources are available to execute more tasks queue should // have at least MAX(hungry_minimum, hungry_minimum_factor * number of workers) ready tasks //@param: struct vine_manager* - pointer to manager -//@return: 1 if hungry, 0 otherwise +//@return: approximate number of additional tasks if hungry, 0 otherwise int vine_hungry(struct vine_manager *q) { // check if manager is initialized @@ -5210,9 +5210,9 @@ int vine_hungry(struct vine_manager *q) struct vine_stats qstats; vine_get_stats(q, &qstats); - // if number of ready tasks is less than minimum, then queue is hungry - if (qstats.tasks_waiting < MAX(q->hungry_minimum, q->hungry_minimum_factor * hash_table_size(q->worker_table))) { - return 1; + // if number of ready tasks is the queue is less than the miniumum, then it is hungry + if (qstats.tasks_waiting < q->hungry_minimum) { + return q->hungry_minimum - qstats.tasks_waiting; } // get total available resources consumption (cores, memory, disk, gpus) of all workers of this manager @@ -5221,13 +5221,14 @@ int vine_hungry(struct vine_manager *q) int64_t workers_total_avail_memory = 0; int64_t workers_total_avail_disk = 0; int64_t workers_total_avail_gpus = 0; - - workers_total_avail_cores = overcommitted_resource_total(q, q->stats->total_cores) - q->stats->committed_cores; - workers_total_avail_memory = overcommitted_resource_total(q, q->stats->total_memory) - q->stats->committed_memory; - workers_total_avail_gpus = overcommitted_resource_total(q, q->stats->total_gpus) - q->stats->committed_gpus; - workers_total_avail_disk = q->stats->total_disk - q->stats->committed_disk; // never overcommit disk - - // get required resources (cores, memory, disk, gpus) of one waiting task + // Find available resources (2 * total - committed) + workers_total_avail_cores = 2 * qstats.total_cores - qstats.committed_cores; + workers_total_avail_memory = 2 * qstats.total_memory - qstats.committed_memory; + workers_total_avail_gpus = 2 * qstats.total_gpus - qstats.committed_gpus; + workers_total_avail_disk = 2 * qstats.total_disk - qstats.committed_disk; // never overcommit disk + + // get required resources (cores, memory, disk, gpus) of one (all?) waiting tasks + // seems to iterate through all tasks counted in the queue. int64_t ready_task_cores = 0; int64_t ready_task_memory = 0; int64_t ready_task_disk = 0; @@ -5235,22 +5236,18 @@ int vine_hungry(struct vine_manager *q) struct vine_task *t; - int count = task_state_count(q, NULL, VINE_TASK_READY); - - while (count > 0) { - count--; - t = list_pop_head(q->ready_list); - + LIST_ITERATE(q->ready_list, t) + { ready_task_cores += MAX(1, t->resources_requested->cores); ready_task_memory += t->resources_requested->memory; ready_task_disk += t->resources_requested->disk; ready_task_gpus += t->resources_requested->gpus; - - list_push_tail(q->ready_list, t); } - // check possible limiting factors - // return false if required resources exceed available resources + int count = task_state_count(q, NULL, VINE_TASK_READY); + + int64_t avg_additional_tasks_cores, avg_additional_tasks_memory, avg_additional_tasks_disk, avg_additional_tasks_gpus; + if (ready_task_cores > workers_total_avail_cores) { return 0; } @@ -5264,7 +5261,51 @@ int vine_hungry(struct vine_manager *q) return 0; } - return 1; // all good + if (ready_task_cores < 0) + ready_task_cores = 0; + if (ready_task_memory < 0) + ready_task_memory = 0; + if (ready_task_disk < 0) + ready_task_disk = 0; + if (ready_task_gpus < 0) + ready_task_gpus = 0; + + if (count != 0) { // each statement counts the available (2*total - committed) and further subtracts the ready/in-queue tasks and then finds how mabny more + if (ready_task_cores != 0) { + avg_additional_tasks_cores = (workers_total_avail_cores - ready_task_cores) / (ready_task_cores / count); + } else { + avg_additional_tasks_cores = workers_total_avail_cores; + } + if (ready_task_memory != 0) { + avg_additional_tasks_memory = (workers_total_avail_memory - ready_task_memory) / (ready_task_memory / count); + } else { + avg_additional_tasks_memory = workers_total_avail_cores; + } + if (ready_task_disk != 0) { + avg_additional_tasks_disk = (workers_total_avail_disk - ready_task_disk) / (ready_task_disk / count); + } else { + avg_additional_tasks_disk = workers_total_avail_cores; + } + if (ready_task_gpus != 0) { + avg_additional_tasks_gpus = (workers_total_avail_gpus - ready_task_gpus) / (ready_task_gpus / count); + } else { + avg_additional_tasks_gpus = workers_total_avail_cores; + } + } else { + return workers_total_avail_cores; // this returns number of cores if no tasks in queue and we have resources. + } + // find the limiting factor + int64_t min = avg_additional_tasks_cores; + if (avg_additional_tasks_memory < min) + min = avg_additional_tasks_memory; + if (avg_additional_tasks_disk < min) + min = avg_additional_tasks_disk; + if (avg_additional_tasks_gpus < min) + min = avg_additional_tasks_gpus; + if (min < 0) + min = 0; // if for some reason we have a negative, just make it 0 + + return min; } int vine_workers_shutdown(struct vine_manager *q, int n) From 96bbc710c0e222361001b25938eecd7f90919922 Mon Sep 17 00:00:00 2001 From: Barry Sly-Delgado Date: Tue, 15 Oct 2024 11:07:22 -0400 Subject: [PATCH 8/8] update doc to include task_mode for DaskVine (#3946) --- doc/manuals/taskvine/index.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/doc/manuals/taskvine/index.md b/doc/manuals/taskvine/index.md index 8904d91635..cd7ccdee4a 100644 --- a/doc/manuals/taskvine/index.md +++ b/doc/manuals/taskvine/index.md @@ -2721,6 +2721,8 @@ The `compute` call above may receive the following keyword arguments: | lazy\_transfer | Whether to bring each result back from the workers (False, default), or keep transient results at workers (True) | | resources | A dictionary to specify [maximum resources](#task-resources), e.g. `{"cores": 1, "memory": 2000"}` | | resources\_mode | [Automatic resource management](#automatic-resource-management) to use, e.g., "fixed", "max", or "max throughput"| +| task\_mode | Mode to execute individual tasks, such as [function calls](#serverless-computing). to use, e.g., "tasks", or "function-calls"| + ### Further Information