From c291b5d93d613eebf15aca6aa094480f57b86937 Mon Sep 17 00:00:00 2001 From: Colin Thomas Date: Wed, 16 Oct 2024 12:40:26 -0400 Subject: [PATCH] add txid per input file --- taskvine/src/manager/vine_file.c | 1 + taskvine/src/manager/vine_file.h | 1 + taskvine/src/manager/vine_manager.c | 25 +++++++++++++++++++++++++ taskvine/src/manager/vine_manager_put.c | 10 ++-------- 4 files changed, 29 insertions(+), 8 deletions(-) diff --git a/taskvine/src/manager/vine_file.c b/taskvine/src/manager/vine_file.c index d2c236f930..5a63b7d135 100644 --- a/taskvine/src/manager/vine_file.c +++ b/taskvine/src/manager/vine_file.c @@ -67,6 +67,7 @@ int vine_file_delete(struct vine_file *f) free(f->source); free(f->cached_name); free(f->data); + free(f->transfer_id); free(f); } diff --git a/taskvine/src/manager/vine_file.h b/taskvine/src/manager/vine_file.h index 10bb785776..f83a592e61 100644 --- a/taskvine/src/manager/vine_file.h +++ b/taskvine/src/manager/vine_file.h @@ -43,6 +43,7 @@ struct vine_file { struct vine_task *recovery_task; // For temp files, a copy of the task that created it. struct vine_worker_info *source_worker; // if this is a substitute file, attach the worker serving it. int change_message_shown; // True if error message already shown. + char *transfer_id; // Contains the transfer id when file is a remote url/peer transfer. int refcount; // Number of references from a task object, delete when zero. }; diff --git a/taskvine/src/manager/vine_manager.c b/taskvine/src/manager/vine_manager.c index 65c61d9660..c84c024fb5 100644 --- a/taskvine/src/manager/vine_manager.c +++ b/taskvine/src/manager/vine_manager.c @@ -3093,6 +3093,9 @@ static int vine_manager_transfer_capacity_available(struct vine_manager *q, stru { struct vine_mount *m; + /* Keep a list of txid created for peer transfers, in case we decide not to schedule we can remove them */ + struct list *list_txid_scheduled = list_create(); + LIST_ITERATE(t->input_mounts, m) { /* Is the file already present on that worker? */ @@ -3117,6 +3120,11 @@ static int vine_manager_transfer_capacity_available(struct vine_manager *q, stru if ((peer = vine_file_replica_table_find_worker(q, m->file->cached_name))) { char *peer_source = string_format("%s/%s", peer->transfer_url, m->file->cached_name); m->substitute = vine_file_substitute_url(m->file, peer_source, peer); + + char *transfer_id = vine_current_transfers_add(q, w, peer, 0); + m->substitute->transfer_id = xxstrdup(transfer_id); + list_push_head(list_txid_scheduled, transfer_id); + free(peer_source); found_match = 1; } @@ -3135,6 +3143,11 @@ static int vine_manager_transfer_capacity_available(struct vine_manager *q, stru if (m->file->type == VINE_URL) { /* For a URL transfer, we can fall back to the original if capacity is available. */ if (vine_current_transfers_url_in_use(q, m->file->source) >= q->file_source_max_transfers) { + char *txid; + LIST_ITERATE(list_txid_scheduled, txid) + { + vine_current_transfers_remove(q, txid); + } return 0; } else { /* keep going */ @@ -3142,9 +3155,19 @@ static int vine_manager_transfer_capacity_available(struct vine_manager *q, stru } else if (m->file->type == VINE_TEMP) { // debug(D_VINE,"task %lld has no ready transfer source for temp %s",(long // long)t->task_id,m->file->cached_name); + char *txid; + LIST_ITERATE(list_txid_scheduled, txid) + { + vine_current_transfers_remove(q, txid); + } return 0; } else if (m->file->type == VINE_MINI_TASK) { if (!vine_manager_transfer_capacity_available(q, w, m->file->mini_task)) { + char *txid; + LIST_ITERATE(list_txid_scheduled, txid) + { + vine_current_transfers_remove(q, txid); + } return 0; } } else { @@ -3152,6 +3175,8 @@ static int vine_manager_transfer_capacity_available(struct vine_manager *q, stru } } + list_delete(list_txid_scheduled); + debug(D_VINE, "task %lld has a ready transfer source for all files", (long long)t->task_id); return 1; } diff --git a/taskvine/src/manager/vine_manager_put.c b/taskvine/src/manager/vine_manager_put.c index 83dbaeb64f..e9e742011a 100644 --- a/taskvine/src/manager/vine_manager_put.c +++ b/taskvine/src/manager/vine_manager_put.c @@ -232,14 +232,11 @@ vine_result_code_t vine_manager_put_url_now(struct vine_manager *q, struct vine_ url_encode(source, source_encoded, sizeof(source_encoded)); url_encode(f->cached_name, cached_name_encoded, sizeof(cached_name_encoded)); - char *transfer_id = vine_current_transfers_add(q, w, f->source_worker, source); - - vine_manager_send(q, w, "puturl_now %s %s %d %lld 0%o %s\n", source_encoded, cached_name_encoded, f->cache_level, (long long)f->size, mode, transfer_id); + vine_manager_send(q, w, "puturl_now %s %s %d %lld 0%o %s\n", source_encoded, cached_name_encoded, f->cache_level, (long long)f->size, mode, f->transfer_id); struct vine_file_replica *replica = vine_file_replica_create(f->type, f->cache_level, f->size, f->mtime); vine_file_replica_table_insert(q, w, f->cached_name, replica); - free(transfer_id); return VINE_SUCCESS; } @@ -268,14 +265,11 @@ vine_result_code_t vine_manager_put_url(struct vine_manager *q, struct vine_work url_encode(f->source, source_encoded, sizeof(source_encoded)); url_encode(f->cached_name, cached_name_encoded, sizeof(cached_name_encoded)); - char *transfer_id = vine_current_transfers_add(q, w, f->source_worker, f->source); - - vine_manager_send(q, w, "puturl %s %s %d %lld 0%o %s\n", source_encoded, cached_name_encoded, f->cache_level, (long long)f->size, mode, transfer_id); + vine_manager_send(q, w, "puturl %s %s %d %lld 0%o %s\n", source_encoded, cached_name_encoded, f->cache_level, (long long)f->size, mode, f->transfer_id); struct vine_file_replica *replica = vine_file_replica_create(f->type, f->cache_level, f->size, f->mtime); vine_file_replica_table_insert(q, w, f->cached_name, replica); - free(transfer_id); return VINE_SUCCESS; }