Skip to content

Commit

Permalink
Merge branch 'cooperative-computing-lab:master' into daskvine_priorit…
Browse files Browse the repository at this point in the history
…y_scheduling_algorithm
  • Loading branch information
JinZhou5042 authored Dec 3, 2024
2 parents 8d3060b + ca1b4d1 commit 804e7f8
Show file tree
Hide file tree
Showing 4 changed files with 283 additions and 81 deletions.
177 changes: 96 additions & 81 deletions taskvine/src/manager/vine_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ See the file COPYING for details.
/* Default value for how frequently to check for tasks that do not fit any worker. */
#define VINE_LARGE_TASK_CHECK_INTERVAL 180000000 // 3 minutes in usecs

/* Default value for how frequently to allow calls to vine_hungry_computation. */
#define VINE_HUNGRY_CHECK_INTERVAL 5000000 // 5 seconds in usecs

/* Default timeout for slow workers to come back to the pool, can be set prior to creating a manager. */
double vine_option_blocklist_slow_workers_timeout = 900;

Expand Down Expand Up @@ -3947,6 +3950,7 @@ struct vine_manager *vine_ssl_create(int port, const char *key, const char *cert
q->max_task_stdout_storage = MAX_TASK_STDOUT_STORAGE;
q->max_new_workers = MAX_NEW_WORKERS;
q->large_task_check_interval = VINE_LARGE_TASK_CHECK_INTERVAL;
q->hungry_check_interval = VINE_HUNGRY_CHECK_INTERVAL;
q->option_blocklist_slow_workers_timeout = vine_option_blocklist_slow_workers_timeout;

q->manager_preferred_connection = xxstrdup("by_ip");
Expand Down Expand Up @@ -5201,35 +5205,57 @@ struct vine_task *vine_manager_no_wait(struct vine_manager *q, const char *tag,

// check if workers' resources are available to execute more tasks queue should
// have at least MAX(hungry_minimum, hungry_minimum_factor * number of workers) ready tasks
// Usually not called directly, but by vine_hungry.
//@param: struct vine_manager* - pointer to manager
//@return: approximate number of additional tasks if hungry, 0 otherwise
int vine_hungry(struct vine_manager *q)
int vine_hungry_computation(struct vine_manager *q)
{
// check if manager is initialized
// return false if not
if (q == NULL) {
return 0;
}

struct vine_stats qstats;
vine_get_stats(q, &qstats);

// if number of ready tasks is the queue is less than the miniumum, then it is hungry
if (qstats.tasks_waiting < q->hungry_minimum) {
return q->hungry_minimum - qstats.tasks_waiting;
// set min tasks running to 1. if it was 0, then committed resource would be 0 anyway so average works out to 0.
int64_t tasks_running = MAX(qstats.tasks_running, 1);
int64_t tasks_waiting = qstats.tasks_waiting;

/* queue is hungry according to the number of workers available (assume each worker can run at least one task) */
int hungry_minimum = MAX(q->hungry_minimum, qstats.workers_connected * q->hungry_minimum_factor);

if (tasks_running < 1 && tasks_waiting < 1) {
return hungry_minimum;
}

/* assume a task uses at least one core, otherwise if no resource is specified, the queue is infinitely hungry */
int64_t avg_commited_tasks_cores = MAX(1, DIV_INT_ROUND_UP(qstats.committed_cores, tasks_running));
int64_t avg_commited_tasks_memory = DIV_INT_ROUND_UP(qstats.committed_memory, tasks_running);
int64_t avg_commited_tasks_disk = DIV_INT_ROUND_UP(qstats.committed_disk, tasks_running);
int64_t avg_commited_tasks_gpus = DIV_INT_ROUND_UP(qstats.committed_gpus, tasks_running);

// get total available resources consumption (cores, memory, disk, gpus) of all workers of this manager
// available = total (all) - committed (actual in use)
int64_t workers_total_avail_cores = 0;
int64_t workers_total_avail_memory = 0;
int64_t workers_total_avail_disk = 0;
int64_t workers_total_avail_gpus = 0;
// Find available resources (2 * total - committed)
workers_total_avail_cores = 2 * qstats.total_cores - qstats.committed_cores;
workers_total_avail_memory = 2 * qstats.total_memory - qstats.committed_memory;
workers_total_avail_gpus = 2 * qstats.total_gpus - qstats.committed_gpus;
workers_total_avail_disk = 2 * qstats.total_disk - qstats.committed_disk; // never overcommit disk
// available = factor*total (all) - committed (actual in use)
int64_t workers_total_avail_cores = q->hungry_minimum_factor * qstats.total_cores - qstats.committed_cores;
int64_t workers_total_avail_memory = q->hungry_minimum_factor * qstats.total_memory - qstats.committed_memory;
int64_t workers_total_avail_disk = q->hungry_minimum_factor * qstats.total_disk - qstats.committed_disk;
int64_t workers_total_avail_gpus = q->hungry_minimum_factor * qstats.total_gpus - qstats.committed_gpus;

int64_t tasks_needed = 0;
if (tasks_waiting < 1) {
tasks_needed = DIV_INT_ROUND_UP(workers_total_avail_cores, avg_commited_tasks_cores);
if (avg_commited_tasks_memory > 0) {
tasks_needed = MIN(tasks_needed, DIV_INT_ROUND_UP(workers_total_avail_memory, avg_commited_tasks_memory));
}

if (avg_commited_tasks_disk > 0) {
tasks_needed = MIN(tasks_needed, DIV_INT_ROUND_UP(workers_total_avail_disk, avg_commited_tasks_disk));
}

if (avg_commited_tasks_gpus > 0) {
tasks_needed = MIN(tasks_needed, DIV_INT_ROUND_UP(workers_total_avail_gpus, avg_commited_tasks_gpus));
}

return MAX(tasks_needed, hungry_minimum);
}

// from here on we can assume that tasks_waiting > 0.

// get required resources (cores, memory, disk, gpus) of one (all?) waiting tasks
// seems to iterate through all tasks counted in the queue.
Expand All @@ -5240,79 +5266,68 @@ int vine_hungry(struct vine_manager *q)

int t_idx;
struct vine_task *t;
int iter_count = 0;
int iter_depth = priority_queue_size(q->ready_tasks);

PRIORITY_QUEUE_BASE_ITERATE(q->ready_tasks, t_idx, t, iter_count, iter_depth)
int iter_depth = MIN(q->attempt_schedule_depth, tasks_waiting);
int sampled_tasks_waiting = 0;
PRIORITY_QUEUE_BASE_ITERATE(q->ready_tasks, t_idx, t, sampled_tasks_waiting, iter_depth)
{
ready_task_cores += MAX(1, t->resources_requested->cores);
ready_task_memory += t->resources_requested->memory;
ready_task_disk += t->resources_requested->disk;
ready_task_gpus += t->resources_requested->gpus;
/* unset resources are marked with -1, so we added what we know about currently running tasks */
ready_task_cores += t->resources_requested->cores > 0 ? t->resources_requested->cores : avg_commited_tasks_cores;
ready_task_memory += t->resources_requested->memory > 0 ? t->resources_requested->memory : avg_commited_tasks_memory;
ready_task_disk += t->resources_requested->disk > 0 ? t->resources_requested->disk : avg_commited_tasks_disk;
ready_task_gpus += t->resources_requested->gpus > 0 ? t->resources_requested->gpus : avg_commited_tasks_gpus;
}

int count = priority_queue_size(q->ready_tasks);
int64_t avg_ready_tasks_cores = DIV_INT_ROUND_UP(ready_task_cores, sampled_tasks_waiting);
int64_t avg_ready_tasks_memory = DIV_INT_ROUND_UP(ready_task_memory, sampled_tasks_waiting);
int64_t avg_ready_tasks_disk = DIV_INT_ROUND_UP(ready_task_disk, sampled_tasks_waiting);
int64_t avg_ready_tasks_gpus = DIV_INT_ROUND_UP(ready_task_gpus, sampled_tasks_waiting);

int64_t avg_additional_tasks_cores, avg_additional_tasks_memory, avg_additional_tasks_disk, avg_additional_tasks_gpus;
// since sampled_tasks_waiting > 0 and avg_commited_tasks_cores > 0, then ready_task_cores > 0 and avg_ready_tasks_cores > 0
tasks_needed = DIV_INT_ROUND_UP(workers_total_avail_cores, avg_ready_tasks_cores);

if (ready_task_cores > workers_total_avail_cores) {
return 0;
if (avg_ready_tasks_memory > 0) {
tasks_needed = MIN(tasks_needed, DIV_INT_ROUND_UP(workers_total_avail_memory, avg_ready_tasks_memory));
}
if (ready_task_memory > workers_total_avail_memory) {
return 0;

if (avg_ready_tasks_disk > 0) {
tasks_needed = MIN(tasks_needed, DIV_INT_ROUND_UP(workers_total_avail_disk, avg_ready_tasks_disk));
}
if (ready_task_disk > workers_total_avail_disk) {
return 0;

if (avg_ready_tasks_gpus > 0) {
tasks_needed = MIN(tasks_needed, DIV_INT_ROUND_UP(workers_total_avail_gpus, avg_ready_tasks_gpus));
}
if (ready_task_gpus > workers_total_avail_gpus) {

tasks_needed = MAX(0, MAX(tasks_needed, hungry_minimum) - tasks_waiting);

return tasks_needed;
}

/*
* Finding out the number of tasks needed when the manager is hungry is a potentially
* expensive operation if there are many workers connected or there already many waiting
* tasks. However, the number of tasks needed only changes significantly when the number
* of connected workers changes, and this does not happen very often. Thus we only call
* the expensive computation every few seconds, and in between these calls we just
* keep track how many tasks have been added/removed to the ready queue since last
* time we really checked.
* */
int vine_hungry(struct vine_manager *q)
{
if (!q) {
return 0;
}

if (ready_task_cores < 0)
ready_task_cores = 0;
if (ready_task_memory < 0)
ready_task_memory = 0;
if (ready_task_disk < 0)
ready_task_disk = 0;
if (ready_task_gpus < 0)
ready_task_gpus = 0;
timestamp_t current_time = timestamp_get();

if (count != 0) { // each statement counts the available (2*total - committed) and further subtracts the ready/in-queue tasks and then finds how mabny more
if (ready_task_cores != 0) {
avg_additional_tasks_cores = (workers_total_avail_cores - ready_task_cores) / (ready_task_cores / count);
} else {
avg_additional_tasks_cores = workers_total_avail_cores;
}
if (ready_task_memory != 0) {
avg_additional_tasks_memory = (workers_total_avail_memory - ready_task_memory) / (ready_task_memory / count);
} else {
avg_additional_tasks_memory = workers_total_avail_cores;
}
if (ready_task_disk != 0) {
avg_additional_tasks_disk = (workers_total_avail_disk - ready_task_disk) / (ready_task_disk / count);
} else {
avg_additional_tasks_disk = workers_total_avail_cores;
}
if (ready_task_gpus != 0) {
avg_additional_tasks_gpus = (workers_total_avail_gpus - ready_task_gpus) / (ready_task_gpus / count);
} else {
avg_additional_tasks_gpus = workers_total_avail_cores;
}
} else {
return workers_total_avail_cores; // this returns number of cores if no tasks in queue and we have resources.
}
// find the limiting factor
int64_t min = avg_additional_tasks_cores;
if (avg_additional_tasks_memory < min)
min = avg_additional_tasks_memory;
if (avg_additional_tasks_disk < min)
min = avg_additional_tasks_disk;
if (avg_additional_tasks_gpus < min)
min = avg_additional_tasks_gpus;
if (min < 0)
min = 0; // if for some reason we have a negative, just make it 0

return min;
if (current_time - q->time_last_hungry + q->hungry_check_interval > 0) {
q->time_last_hungry = current_time;
q->tasks_waiting_last_hungry = priority_queue_size(q->ready_tasks);
q->tasks_to_sate_hungry = vine_hungry_computation(q);
}

int change = q->tasks_waiting_last_hungry - priority_queue_size(q->ready_tasks);

return MAX(0, q->tasks_to_sate_hungry - change);
}

int vine_workers_shutdown(struct vine_manager *q, int n)
Expand Down
6 changes: 6 additions & 0 deletions taskvine/src/manager/vine_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,12 @@ struct vine_manager {
int file_source_max_transfers;
int worker_source_max_transfers;

/* Hungry call optimization */
timestamp_t time_last_hungry; /* Last time vine_hungry_computation was called. */
int tasks_to_sate_hungry; /* Number of tasks that would sate the queue since last call to vine_hungry_computation. */
int tasks_waiting_last_hungry; /* Number of tasks originally waiting when call to vine_hungry_computation was made. */
timestamp_t hungry_check_interval; /* Maximum interval between vine_hungry_computation checks. */

/* Various performance knobs that can be tuned. */
int short_timeout; /* Timeout in seconds to send/recv a brief message from worker */
int long_timeout; /* Timeout if in the middle of an incomplete message. */
Expand Down
38 changes: 38 additions & 0 deletions taskvine/test/TR_vine_hungry.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#!/bin/sh
set -ex

. ../../dttools/test/test_runner_common.sh

import_config_val CCTOOLS_PYTHON_TEST_EXEC
import_config_val CCTOOLS_PYTHON_TEST_DIR

export PATH=$(pwd)/../src/worker:$(pwd)/../../batch_job/src:$PATH
export PYTHONPATH=$(pwd)/../../test_support/python_modules/${CCTOOLS_PYTHON_TEST_DIR}:$PYTHONPATH

check_needed()
{
[ -n "${CCTOOLS_PYTHON_TEST_EXEC}" ] || return 1
}

prepare()
{
return 0
}

run()
{
# send taskvine to the background, saving its exit status.
exec ${CCTOOLS_PYTHON_TEST_EXEC} vine_python_hungry.py
}

clean()
{
rm -rf vine-run-info

exit 0
}


dispatch "$@"

# vim: set noexpandtab tabstop=4:
Loading

0 comments on commit 804e7f8

Please sign in to comment.