Skip to content

Commit

Permalink
ch4/ofi: use explicit counters to track gpu pipeline
Browse files Browse the repository at this point in the history
Don't mix the usage of cc_ptr, use separate and explicit counters to
track the progress and completion of chunks.
  • Loading branch information
hzhou committed Feb 6, 2024
1 parent 4b78f19 commit 63cf008
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 14 deletions.
30 changes: 16 additions & 14 deletions src/mpid/ch4/netmod/ofi/ofi_gpu_pipeline.c
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ int MPIDI_OFI_gpu_pipeline_send(MPIR_Request * sreq, const void *send_buf,
MPIDI_OFI_idata_set_gpuchunk_bits(&cq_data, n_chunks);
MPIDI_OFI_idata_set_gpu_packed_bit(&cq_data, is_packed);

MPIDI_OFI_REQUEST(sreq, pipeline_info.send.num_remain) = n_chunks;
MPIDI_OFI_REQUEST(sreq, pipeline_info.send.cq_data) = cq_data;
MPIDI_OFI_REQUEST(sreq, pipeline_info.send.remote_addr) = remote_addr;
MPIDI_OFI_REQUEST(sreq, pipeline_info.send.vci_local) = vci_local;
Expand Down Expand Up @@ -123,9 +124,6 @@ static int send_alloc_poll(MPIR_Async_thing * thing)
p->offset += (size_t) chunk_sz;
p->left_sz -= (size_t) chunk_sz;
p->n_chunks++;
/* Increase request completion cnt, cc is 1 more than necessary
* to prevent parent request being freed prematurally. */
MPIR_cc_inc(p->sreq->cc_ptr);

spawn_send_copy(thing, p->sreq, &async_req, host_buf, chunk_sz);

Expand Down Expand Up @@ -228,11 +226,10 @@ int MPIDI_OFI_gpu_pipeline_send_event(struct fi_cq_tagged_entry *wc, MPIR_Reques

MPIDU_genq_private_pool_free_cell(MPIDI_OFI_global.gpu_pipeline_send_pool, host_buf);

int c;
MPIR_cc_decr(sreq->cc_ptr, &c);
if (c == 0) {
MPIDI_OFI_REQUEST(sreq, pipeline_info.send.num_remain) -= 1;
if (MPIDI_OFI_REQUEST(sreq, pipeline_info.send.num_remain) == 0) {
MPIR_Datatype_release_if_not_builtin(MPIDI_OFI_REQUEST(sreq, datatype));
MPIR_Request_free(sreq);
MPIDI_Request_complete_fast(sreq);
}

return mpi_errno;
Expand All @@ -259,6 +256,8 @@ int MPIDI_OFI_gpu_pipeline_recv(MPIR_Request * rreq,

/* The 1st recv is an empty chunk for matching. We need initialize rreq. */
MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.offset) = 0;
MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.num_inrecv) = 0;
MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.num_remain) = 0;
MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.is_sync) = false;
MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.remote_addr) = remote_addr;
MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.vci_local) = vci_local;
Expand Down Expand Up @@ -305,7 +304,8 @@ static int recv_alloc_poll(MPIR_Async_thing * thing)
struct recv_alloc *p = MPIR_Async_thing_get_state(thing);
MPIR_Request *rreq = p->rreq;

if (MPIR_cc_get(rreq->cc) > 1) {
/* arbitary threshold */
if (MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.num_inrecv) > 1) {
return MPIR_ASYNC_THING_NOPROGRESS;
}

Expand Down Expand Up @@ -339,6 +339,7 @@ static int recv_alloc_poll(MPIR_Async_thing * thing)
match_bits, mask_bits, (void *) &chunk_req->context);
MPID_THREAD_CS_EXIT(VCI, MPIDI_VCI(vci).lock);
if (ret == 0) {
MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.num_inrecv) += 1;
free(p);
/* chunk_req and host_buf will be freed in recv_events */
return MPIR_ASYNC_THING_DONE;
Expand Down Expand Up @@ -382,17 +383,18 @@ int MPIDI_OFI_gpu_pipeline_recv_event(struct fi_cq_tagged_entry *wc, MPIR_Reques
uint32_t packed = MPIDI_OFI_idata_get_gpu_packed_bit(wc->data);
uint32_t n_chunks = MPIDI_OFI_idata_get_gpuchunk_bits(wc->data);
/* ? - Not sure why sender cannot send packed data */
MPIR_Assertp(packed == 0);
MPIR_Assert(packed == 0);
if (wc->len > 0) {
/* message from a normal send */
MPIR_Assert(n_chunks == 0);
MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.num_remain) = 1;
mpi_errno = start_recv_copy(rreq, host_buf, wc->len, recv_buf, recv_count, datatype);
MPIR_ERR_CHECK(mpi_errno);
} else {
MPIR_Assert(n_chunks > 0);
MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.num_remain) = n_chunks;
/* There is no data in the init chunk, free the buffer */
MPIDU_genq_private_pool_free_cell(MPIDI_OFI_global.gpu_pipeline_recv_pool, host_buf);
MPIR_cc_dec(rreq->cc_ptr);
/* Post recv for the remaining chunks. */
for (int i = 0; i < n_chunks; i++) {
mpi_errno = start_recv_chunk(rreq, i, n_chunks);
Expand All @@ -401,6 +403,7 @@ int MPIDI_OFI_gpu_pipeline_recv_event(struct fi_cq_tagged_entry *wc, MPIR_Reques
}
} else {
MPIR_Assert(event_id == MPIDI_OFI_EVENT_RECV_GPU_PIPELINE);
MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.num_inrecv) -= 1;
mpi_errno = start_recv_copy(rreq, host_buf, wc->len, recv_buf, recv_count, datatype);
MPIR_ERR_CHECK(mpi_errno);
}
Expand Down Expand Up @@ -478,9 +481,8 @@ static int recv_copy_poll(MPIR_Async_thing * thing)
static void recv_copy_complete(MPIR_Request * rreq, void *buf)
{
int mpi_errno = MPI_SUCCESS;
int c;
MPIR_cc_decr(rreq->cc_ptr, &c);
if (c == 0) {
MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.num_remain) -= 1;
if (MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.num_remain) == 0) {
/* all chunks arrived and copied */
if (unlikely(MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.is_sync))) {
MPIR_Comm *comm = rreq->comm;
Expand Down Expand Up @@ -511,7 +513,7 @@ static void recv_copy_complete(MPIR_Request * rreq, void *buf)
/* Set number of bytes in status. */
MPIR_STATUS_SET_COUNT(rreq->status, MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.offset));

MPIR_Request_free(rreq);
MPIDI_Request_complete_fast(rreq);
}

/* Free host buffer, yaksa request and task. */
Expand Down
3 changes: 3 additions & 0 deletions src/mpid/ch4/netmod/ofi/ofi_pre.h
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ typedef struct {
fi_addr_t remote_addr;
uint64_t cq_data;
uint64_t match_bits;
int num_remain;
} send;
struct {
int vci_local;
Expand All @@ -231,6 +232,8 @@ typedef struct {
uint64_t match_bits;
uint64_t mask_bits;
MPI_Aint offset;
int num_inrecv;
int num_remain;
bool is_sync;
} recv;
} pipeline_info; /* GPU pipeline */
Expand Down

0 comments on commit 63cf008

Please sign in to comment.