Skip to content

Commit

Permalink
Put all "kill" messages under reap_task_from_worker, so that all succ…
Browse files Browse the repository at this point in the history
…ess/failure paths for tasks will delete the task sandbox. (#4019)

Fixes a storage leak that resulted from task sandboxes not always being cleaned up.
  • Loading branch information
dthain authored Jan 13, 2025
1 parent ba4b5bb commit 123c9cb
Showing 1 changed file with 11 additions and 13 deletions.
24 changes: 11 additions & 13 deletions taskvine/src/manager/vine_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -914,6 +914,7 @@ static void cleanup_worker(struct vine_manager *q, struct vine_worker_info *w)
t->time_workers_execute_all += delta_time;
}

/* Remove the unfinished task and update data structures. */
reap_task_from_worker(q, w, t, VINE_TASK_READY);

// recreate inputs lost
Expand Down Expand Up @@ -1352,9 +1353,8 @@ static int fetch_outputs_from_worker(struct vine_manager *q, struct vine_worker_

vine_accumulate_task(q, t);

// At this point, a task is completed.
/* Remove the completed task and update all data structures. */
reap_task_from_worker(q, w, t, VINE_TASK_RETRIEVED);
vine_manager_send(q, w, "kill %d\n", t->task_id);

switch (t->result) {
case VINE_RESULT_INPUT_MISSING:
Expand Down Expand Up @@ -3058,7 +3058,7 @@ static int resubmit_if_needed(struct vine_manager *q, struct vine_worker_info *w
}

/*
Collect a completed task from a worker, and then update
Remove a running or completed task from a worker, and then update
all auxiliary data structures to remove the association
and change the task state.
*/
Expand All @@ -3073,6 +3073,9 @@ static void reap_task_from_worker(struct vine_manager *q, struct vine_worker_inf
/* Make sure the task and worker agree before changing anything. */
assert(t->worker == w);

/* Tell worker to remove the task sandbox (and if necessary, the running process) */
vine_manager_send(q, w, "kill %d\n", t->task_id);

w->total_task_time += t->time_workers_execute_last;

rmsummary_delete(t->current_resource_box);
Expand Down Expand Up @@ -3746,22 +3749,17 @@ static void reset_task_to_state(struct vine_manager *q, struct vine_task *t, vin
break;

case VINE_TASK_RUNNING:
// t->worker must be set if in RUNNING state.
/* t->worker must be set if in RUNNING state */
assert(w);

// send message to worker asking to kill its task.
vine_manager_send(q, w, "kill %d\n", t->task_id);
debug(D_VINE, "Task with id %d has been cancelled at worker %s (%s) and removed.", t->task_id, w->hostname, w->addrport);
/* Remove the running task and update all data structures. */
reap_task_from_worker(q, w, t, new_state);

// Delete any input files that are not to be cached.
/* After task is killed, delete non-cacheable inputs and all (incomplete) output files from the worker cache. */
delete_worker_files(q, w, t->input_mounts, VINE_CACHE_LEVEL_TASK);

// Delete all output files since they are not needed as the task was cancelled.
delete_worker_files(q, w, t->output_mounts, VINE_CACHE_LEVEL_FOREVER);

// Collect task structure from worker.
// Note that this calls change_task_state internally.
reap_task_from_worker(q, w, t, new_state);
/* change_task_state() happened inside reap_task_from_worker */

break;

Expand Down

0 comments on commit 123c9cb

Please sign in to comment.