From e885f9241bacb77c7f6d2bb5a781d14885913d38 Mon Sep 17 00:00:00 2001 From: Benjamin Tovar Date: Mon, 2 Dec 2024 12:35:04 -0500 Subject: [PATCH] vine: cleanup hungry feature (#3990) * add committed resources and hungry_factor to hungry feature Use committed resources as a fallback when waiting tasks do not declare resources. Also fix the use of hungry_factor, instead of simply using the number 2. * add test * format * limit sample of waiting tasks to attempt_schedule_depth * use number of workers as minimum * correctly consider sampling of tasks --- taskvine/src/manager/vine_manager.c | 144 +++++++++++++--------------- taskvine/test/TR_vine_hungry.sh | 38 ++++++++ taskvine/test/vine_python_hungry.py | 143 +++++++++++++++++++++++++++ 3 files changed, 246 insertions(+), 79 deletions(-) create mode 100755 taskvine/test/TR_vine_hungry.sh create mode 100644 taskvine/test/vine_python_hungry.py diff --git a/taskvine/src/manager/vine_manager.c b/taskvine/src/manager/vine_manager.c index a9ca71d52f..3f1da9f1da 100644 --- a/taskvine/src/manager/vine_manager.c +++ b/taskvine/src/manager/vine_manager.c @@ -5205,31 +5205,56 @@ struct vine_task *vine_manager_no_wait(struct vine_manager *q, const char *tag, //@return: approximate number of additional tasks if hungry, 0 otherwise int vine_hungry(struct vine_manager *q) { - // check if manager is initialized - // return false if not - if (q == NULL) { + if (!q) { return 0; } struct vine_stats qstats; vine_get_stats(q, &qstats); - // if number of ready tasks is the queue is less than the miniumum, then it is hungry - if (qstats.tasks_waiting < q->hungry_minimum) { - return q->hungry_minimum - qstats.tasks_waiting; + // set min tasks running to 1. if it was 0, then committed resource would be 0 anyway so average works out to 0. + int64_t tasks_running = MAX(qstats.tasks_running, 1); + int64_t tasks_waiting = qstats.tasks_waiting; + + /* queue is hungry according to the number of workers available (assume each worker can run at least one task) */ + int hungry_minimum = MAX(q->hungry_minimum, qstats.workers_connected * q->hungry_minimum_factor); + + if (tasks_running < 1 && tasks_waiting < 1) { + return hungry_minimum; } + /* assume a task uses at least one core, otherwise if no resource is specified, the queue is infinitely hungry */ + int64_t avg_commited_tasks_cores = MAX(1, DIV_INT_ROUND_UP(qstats.committed_cores, tasks_running)); + int64_t avg_commited_tasks_memory = DIV_INT_ROUND_UP(qstats.committed_memory, tasks_running); + int64_t avg_commited_tasks_disk = DIV_INT_ROUND_UP(qstats.committed_disk, tasks_running); + int64_t avg_commited_tasks_gpus = DIV_INT_ROUND_UP(qstats.committed_gpus, tasks_running); + // get total available resources consumption (cores, memory, disk, gpus) of all workers of this manager - // available = total (all) - committed (actual in use) - int64_t workers_total_avail_cores = 0; - int64_t workers_total_avail_memory = 0; - int64_t workers_total_avail_disk = 0; - int64_t workers_total_avail_gpus = 0; - // Find available resources (2 * total - committed) - workers_total_avail_cores = 2 * qstats.total_cores - qstats.committed_cores; - workers_total_avail_memory = 2 * qstats.total_memory - qstats.committed_memory; - workers_total_avail_gpus = 2 * qstats.total_gpus - qstats.committed_gpus; - workers_total_avail_disk = 2 * qstats.total_disk - qstats.committed_disk; // never overcommit disk + // available = factor*total (all) - committed (actual in use) + int64_t workers_total_avail_cores = q->hungry_minimum_factor * qstats.total_cores - qstats.committed_cores; + int64_t workers_total_avail_memory = q->hungry_minimum_factor * qstats.total_memory - qstats.committed_memory; + int64_t workers_total_avail_disk = q->hungry_minimum_factor * qstats.total_disk - qstats.committed_disk; + int64_t workers_total_avail_gpus = q->hungry_minimum_factor * qstats.total_gpus - qstats.committed_gpus; + + int64_t tasks_needed = 0; + if (tasks_waiting < 1) { + tasks_needed = DIV_INT_ROUND_UP(workers_total_avail_cores, avg_commited_tasks_cores); + if (avg_commited_tasks_memory > 0) { + tasks_needed = MIN(tasks_needed, DIV_INT_ROUND_UP(workers_total_avail_memory, avg_commited_tasks_memory)); + } + + if (avg_commited_tasks_disk > 0) { + tasks_needed = MIN(tasks_needed, DIV_INT_ROUND_UP(workers_total_avail_disk, avg_commited_tasks_disk)); + } + + if (avg_commited_tasks_gpus > 0) { + tasks_needed = MIN(tasks_needed, DIV_INT_ROUND_UP(workers_total_avail_gpus, avg_commited_tasks_gpus)); + } + + return MAX(tasks_needed, hungry_minimum); + } + + // from here on we can assume that tasks_waiting > 0. // get required resources (cores, memory, disk, gpus) of one (all?) waiting tasks // seems to iterate through all tasks counted in the queue. @@ -5240,79 +5265,40 @@ int vine_hungry(struct vine_manager *q) int t_idx; struct vine_task *t; - int iter_count = 0; - int iter_depth = priority_queue_size(q->ready_tasks); - - PRIORITY_QUEUE_BASE_ITERATE(q->ready_tasks, t_idx, t, iter_count, iter_depth) + int iter_depth = MIN(q->attempt_schedule_depth, tasks_waiting); + int sampled_tasks_waiting = 0; + PRIORITY_QUEUE_BASE_ITERATE(q->ready_tasks, t_idx, t, sampled_tasks_waiting, iter_depth) { - ready_task_cores += MAX(1, t->resources_requested->cores); - ready_task_memory += t->resources_requested->memory; - ready_task_disk += t->resources_requested->disk; - ready_task_gpus += t->resources_requested->gpus; + /* unset resources are marked with -1, so we added what we know about currently running tasks */ + ready_task_cores += t->resources_requested->cores > 0 ? t->resources_requested->cores : avg_commited_tasks_cores; + ready_task_memory += t->resources_requested->memory > 0 ? t->resources_requested->memory : avg_commited_tasks_memory; + ready_task_disk += t->resources_requested->disk > 0 ? t->resources_requested->disk : avg_commited_tasks_disk; + ready_task_gpus += t->resources_requested->gpus > 0 ? t->resources_requested->gpus : avg_commited_tasks_gpus; } - int count = priority_queue_size(q->ready_tasks); + int64_t avg_ready_tasks_cores = DIV_INT_ROUND_UP(ready_task_cores, sampled_tasks_waiting); + int64_t avg_ready_tasks_memory = DIV_INT_ROUND_UP(ready_task_memory, sampled_tasks_waiting); + int64_t avg_ready_tasks_disk = DIV_INT_ROUND_UP(ready_task_disk, sampled_tasks_waiting); + int64_t avg_ready_tasks_gpus = DIV_INT_ROUND_UP(ready_task_gpus, sampled_tasks_waiting); - int64_t avg_additional_tasks_cores, avg_additional_tasks_memory, avg_additional_tasks_disk, avg_additional_tasks_gpus; + // since sampled_tasks_waiting > 0 and avg_commited_tasks_cores > 0, then ready_task_cores > 0 and avg_ready_tasks_cores > 0 + tasks_needed = DIV_INT_ROUND_UP(workers_total_avail_cores, avg_ready_tasks_cores); - if (ready_task_cores > workers_total_avail_cores) { - return 0; - } - if (ready_task_memory > workers_total_avail_memory) { - return 0; + if (avg_ready_tasks_memory > 0) { + tasks_needed = MIN(tasks_needed, DIV_INT_ROUND_UP(workers_total_avail_memory, avg_ready_tasks_memory)); } - if (ready_task_disk > workers_total_avail_disk) { - return 0; + + if (avg_ready_tasks_disk > 0) { + tasks_needed = MIN(tasks_needed, DIV_INT_ROUND_UP(workers_total_avail_disk, avg_ready_tasks_disk)); } - if (ready_task_gpus > workers_total_avail_gpus) { - return 0; + + if (avg_ready_tasks_gpus > 0) { + tasks_needed = MIN(tasks_needed, DIV_INT_ROUND_UP(workers_total_avail_gpus, avg_ready_tasks_gpus)); } - if (ready_task_cores < 0) - ready_task_cores = 0; - if (ready_task_memory < 0) - ready_task_memory = 0; - if (ready_task_disk < 0) - ready_task_disk = 0; - if (ready_task_gpus < 0) - ready_task_gpus = 0; + tasks_needed = MAX(0, MAX(tasks_needed, hungry_minimum) - tasks_waiting); - if (count != 0) { // each statement counts the available (2*total - committed) and further subtracts the ready/in-queue tasks and then finds how mabny more - if (ready_task_cores != 0) { - avg_additional_tasks_cores = (workers_total_avail_cores - ready_task_cores) / (ready_task_cores / count); - } else { - avg_additional_tasks_cores = workers_total_avail_cores; - } - if (ready_task_memory != 0) { - avg_additional_tasks_memory = (workers_total_avail_memory - ready_task_memory) / (ready_task_memory / count); - } else { - avg_additional_tasks_memory = workers_total_avail_cores; - } - if (ready_task_disk != 0) { - avg_additional_tasks_disk = (workers_total_avail_disk - ready_task_disk) / (ready_task_disk / count); - } else { - avg_additional_tasks_disk = workers_total_avail_cores; - } - if (ready_task_gpus != 0) { - avg_additional_tasks_gpus = (workers_total_avail_gpus - ready_task_gpus) / (ready_task_gpus / count); - } else { - avg_additional_tasks_gpus = workers_total_avail_cores; - } - } else { - return workers_total_avail_cores; // this returns number of cores if no tasks in queue and we have resources. - } - // find the limiting factor - int64_t min = avg_additional_tasks_cores; - if (avg_additional_tasks_memory < min) - min = avg_additional_tasks_memory; - if (avg_additional_tasks_disk < min) - min = avg_additional_tasks_disk; - if (avg_additional_tasks_gpus < min) - min = avg_additional_tasks_gpus; - if (min < 0) - min = 0; // if for some reason we have a negative, just make it 0 - - return min; + return tasks_needed; } int vine_workers_shutdown(struct vine_manager *q, int n) diff --git a/taskvine/test/TR_vine_hungry.sh b/taskvine/test/TR_vine_hungry.sh new file mode 100755 index 0000000000..6960df2130 --- /dev/null +++ b/taskvine/test/TR_vine_hungry.sh @@ -0,0 +1,38 @@ +#!/bin/sh +set -ex + +. ../../dttools/test/test_runner_common.sh + +import_config_val CCTOOLS_PYTHON_TEST_EXEC +import_config_val CCTOOLS_PYTHON_TEST_DIR + +export PATH=$(pwd)/../src/worker:$(pwd)/../../batch_job/src:$PATH +export PYTHONPATH=$(pwd)/../../test_support/python_modules/${CCTOOLS_PYTHON_TEST_DIR}:$PYTHONPATH + +check_needed() +{ + [ -n "${CCTOOLS_PYTHON_TEST_EXEC}" ] || return 1 +} + +prepare() +{ + return 0 +} + +run() +{ + # send taskvine to the background, saving its exit status. + exec ${CCTOOLS_PYTHON_TEST_EXEC} vine_python_hungry.py +} + +clean() +{ + rm -rf vine-run-info + + exit 0 +} + + +dispatch "$@" + +# vim: set noexpandtab tabstop=4: diff --git a/taskvine/test/vine_python_hungry.py b/taskvine/test/vine_python_hungry.py new file mode 100644 index 0000000000..7ddd6d76cd --- /dev/null +++ b/taskvine/test/vine_python_hungry.py @@ -0,0 +1,143 @@ +#! /usr/bin/env python + +import ndcctools.taskvine as vine +import signal +import sys + + +def timeout(signum, frame): + print("hungry test did not finish in time") + sys.exit(1) + + +command = "sleep 120" + + +signal.signal(signal.SIGALRM, timeout) +signal.alarm(600) + + +m = vine.Manager(port=0) + +m.tune("hungry-minimum", 11) +assert m.hungry() == 11 + +m.tune("hungry-minimum", 2) +assert m.hungry() == 2 + +t_1 = vine.Task(command) +i_1 = m.submit(t_1) + +assert m.hungry() == 1 + +t_2 = vine.Task(command) +i_2 = m.submit(t_2) + +assert m.hungry() == 0 + +worker_cores = 12 +worker_memory = 1200 +worker_disk = 1200 + +workers = vine.Factory("local", manager=m) +workers.max_workers = 1 +workers.min_workers = 1 +workers.cores = worker_cores +workers.disk = worker_disk +workers.memory = worker_memory + +with workers: + while m.stats.tasks_running < 1: + m.wait(1) + + # hungry-minimum is 2, 2 tasks submitted, one waiting, thus hungry for 1 task + assert m.hungry() == 1 + + m.tune("hungry-minimum", 5) + + # hungry-minimum is 5, 2 tasks submitted, one waiting, thus hungry for 4 tasks + assert m.hungry() == 4 + + m.cancel_by_task_id(i_1) + m.cancel_by_task_id(i_2) + + while m.stats.tasks_running > 0: + m.wait(1) + + # hungry-minimum is 5, no tasks submitted, thus hungry for max of 5 tasks and 2 x worker_cores + assert m.hungry() == 2 * worker_cores + + t_3 = vine.Task(command) + t_3.set_cores(1) + i_3 = m.submit(t_3) + + while m.stats.tasks_running < 1: + m.wait(1) + + # hungry-minimum is 5, and tasks with one core is running. max of 5 and 2 x worker_cores - cores running + assert m.hungry() == worker_cores * 2 - 1 + + factor = 3 + m.tune("hungry-minimum-factor", factor) + + # as previous, but now available has different hungry factor + assert m.hungry() == worker_cores * factor - 1 + + t_4 = vine.Task(command) + t_4.set_cores(worker_cores - 1) + i_4 = m.submit(t_4) + + while m.stats.tasks_running < 2: + m.wait(1) + + # hungry-minimum is 5, and all cores are being used + assert m.hungry() == 5 + + m.cancel_by_task_id(i_3) + m.cancel_by_task_id(i_4) + + while m.stats.tasks_running > 0: + m.wait(1) + + mem_task = int(2 * worker_memory / worker_cores) + t_5 = vine.Task(command) + t_5.set_cores(1) + t_5.set_memory(mem_task) + i_5 = m.submit(t_5) + + while m.stats.tasks_running < 1: + m.wait(1) + + # memory should be the limit factor here + assert m.hungry() == (factor * worker_memory - mem_task) / mem_task + + m.cancel_by_task_id(i_5) + + cores_t_6 = 1 + t_6 = vine.Task(command) + t_6.set_cores(cores_t_6) + t_6.set_memory(1) + t_6.set_disk(1) + i_6 = m.submit(t_6) + + cores_t_7 = 11 + t_7 = vine.Task(command) + t_7.set_cores(cores_t_7) + t_7.set_memory(1) + t_7.set_disk(1) + i_7 = m.submit(t_7) + + while m.stats.tasks_running < 2: + m.wait(1) + + cores_t_8 = 2 + t_8 = vine.Task(command) + t_8.set_cores(cores_t_8) + i_8 = m.submit(t_8) + + factor = 10 + m.tune("hungry-minimum-factor", factor) + + # avg cores waiting should be the limiting factor + # each task would get two cores, minus one task of the already waiting task + assert m.hungry() == (factor * worker_cores - cores_t_6 - cores_t_7) / cores_t_8 - 1