Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vine: add up peer transfer ids during scheduling #3958

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions taskvine/src/manager/vine_file.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ int vine_file_delete(struct vine_file *f)
free(f->source);
free(f->cached_name);
free(f->data);
free(f->transfer_id);
free(f);
}

Expand Down
1 change: 1 addition & 0 deletions taskvine/src/manager/vine_file.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ struct vine_file {
struct vine_task *recovery_task; // For temp files, a copy of the task that created it.
struct vine_worker_info *source_worker; // if this is a substitute file, attach the worker serving it.
int change_message_shown; // True if error message already shown.
char *transfer_id; // Contains the transfer id when file is a remote url/peer transfer.
int refcount; // Number of references from a task object, delete when zero.
};

Expand Down
25 changes: 25 additions & 0 deletions taskvine/src/manager/vine_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -3093,6 +3093,9 @@ static int vine_manager_transfer_capacity_available(struct vine_manager *q, stru
{
struct vine_mount *m;

/* Keep a list of txid created for peer transfers, in case we decide not to schedule we can remove them */
struct list *list_txid_scheduled = list_create();

LIST_ITERATE(t->input_mounts, m)
{
/* Is the file already present on that worker? */
Expand All @@ -3117,6 +3120,11 @@ static int vine_manager_transfer_capacity_available(struct vine_manager *q, stru
if ((peer = vine_file_replica_table_find_worker(q, m->file->cached_name))) {
char *peer_source = string_format("%s/%s", peer->transfer_url, m->file->cached_name);
m->substitute = vine_file_substitute_url(m->file, peer_source, peer);

char *transfer_id = vine_current_transfers_add(q, w, peer, 0);
m->substitute->transfer_id = xxstrdup(transfer_id);
list_push_head(list_txid_scheduled, transfer_id);

free(peer_source);
found_match = 1;
}
Expand All @@ -3135,23 +3143,40 @@ static int vine_manager_transfer_capacity_available(struct vine_manager *q, stru
if (m->file->type == VINE_URL) {
/* For a URL transfer, we can fall back to the original if capacity is available. */
if (vine_current_transfers_url_in_use(q, m->file->source) >= q->file_source_max_transfers) {
char *txid;
LIST_ITERATE(list_txid_scheduled, txid)
{
vine_current_transfers_remove(q, txid);
}
return 0;
} else {
/* keep going */
}
} else if (m->file->type == VINE_TEMP) {
// debug(D_VINE,"task %lld has no ready transfer source for temp %s",(long
// long)t->task_id,m->file->cached_name);
char *txid;
LIST_ITERATE(list_txid_scheduled, txid)
{
vine_current_transfers_remove(q, txid);
}
return 0;
} else if (m->file->type == VINE_MINI_TASK) {
if (!vine_manager_transfer_capacity_available(q, w, m->file->mini_task)) {
char *txid;
LIST_ITERATE(list_txid_scheduled, txid)
{
vine_current_transfers_remove(q, txid);
}
return 0;
}
} else {
/* keep going */
}
}

list_delete(list_txid_scheduled);

debug(D_VINE, "task %lld has a ready transfer source for all files", (long long)t->task_id);
return 1;
}
Expand Down
10 changes: 2 additions & 8 deletions taskvine/src/manager/vine_manager_put.c
Original file line number Diff line number Diff line change
Expand Up @@ -232,14 +232,11 @@ vine_result_code_t vine_manager_put_url_now(struct vine_manager *q, struct vine_
url_encode(source, source_encoded, sizeof(source_encoded));
url_encode(f->cached_name, cached_name_encoded, sizeof(cached_name_encoded));

char *transfer_id = vine_current_transfers_add(q, w, f->source_worker, source);

vine_manager_send(q, w, "puturl_now %s %s %d %lld 0%o %s\n", source_encoded, cached_name_encoded, f->cache_level, (long long)f->size, mode, transfer_id);
vine_manager_send(q, w, "puturl_now %s %s %d %lld 0%o %s\n", source_encoded, cached_name_encoded, f->cache_level, (long long)f->size, mode, f->transfer_id);

struct vine_file_replica *replica = vine_file_replica_create(f->type, f->cache_level, f->size, f->mtime);
vine_file_replica_table_insert(q, w, f->cached_name, replica);

free(transfer_id);
return VINE_SUCCESS;
}

Expand Down Expand Up @@ -268,14 +265,11 @@ vine_result_code_t vine_manager_put_url(struct vine_manager *q, struct vine_work
url_encode(f->source, source_encoded, sizeof(source_encoded));
url_encode(f->cached_name, cached_name_encoded, sizeof(cached_name_encoded));

char *transfer_id = vine_current_transfers_add(q, w, f->source_worker, f->source);

vine_manager_send(q, w, "puturl %s %s %d %lld 0%o %s\n", source_encoded, cached_name_encoded, f->cache_level, (long long)f->size, mode, transfer_id);
vine_manager_send(q, w, "puturl %s %s %d %lld 0%o %s\n", source_encoded, cached_name_encoded, f->cache_level, (long long)f->size, mode, f->transfer_id);

struct vine_file_replica *replica = vine_file_replica_create(f->type, f->cache_level, f->size, f->mtime);
vine_file_replica_table_insert(q, w, f->cached_name, replica);

free(transfer_id);
return VINE_SUCCESS;
}

Expand Down
Loading