diff --git a/src/mpid/ch4/netmod/ofi/ofi_gpu_pipeline.c b/src/mpid/ch4/netmod/ofi/ofi_gpu_pipeline.c index 388bf2f2f5d..101fdae7a71 100644 --- a/src/mpid/ch4/netmod/ofi/ofi_gpu_pipeline.c +++ b/src/mpid/ch4/netmod/ofi/ofi_gpu_pipeline.c @@ -59,6 +59,7 @@ int MPIDI_OFI_gpu_pipeline_send(MPIR_Request * sreq, const void *send_buf, MPIDI_OFI_idata_set_gpuchunk_bits(&cq_data, n_chunks); MPIDI_OFI_idata_set_gpu_packed_bit(&cq_data, is_packed); + MPIDI_OFI_REQUEST(sreq, pipeline_info.send.num_remain) = n_chunks; MPIDI_OFI_REQUEST(sreq, pipeline_info.send.cq_data) = cq_data; MPIDI_OFI_REQUEST(sreq, pipeline_info.send.remote_addr) = remote_addr; MPIDI_OFI_REQUEST(sreq, pipeline_info.send.vci_local) = vci_local; @@ -123,9 +124,6 @@ static int send_alloc_poll(MPIR_Async_thing * thing) p->offset += (size_t) chunk_sz; p->left_sz -= (size_t) chunk_sz; p->n_chunks++; - /* Increase request completion cnt, cc is 1 more than necessary - * to prevent parent request being freed prematurally. */ - MPIR_cc_inc(p->sreq->cc_ptr); spawn_send_copy(thing, p->sreq, &async_req, host_buf, chunk_sz); @@ -228,11 +226,10 @@ int MPIDI_OFI_gpu_pipeline_send_event(struct fi_cq_tagged_entry *wc, MPIR_Reques MPIDU_genq_private_pool_free_cell(MPIDI_OFI_global.gpu_pipeline_send_pool, host_buf); - int c; - MPIR_cc_decr(sreq->cc_ptr, &c); - if (c == 0) { + MPIDI_OFI_REQUEST(sreq, pipeline_info.send.num_remain) -= 1; + if (MPIDI_OFI_REQUEST(sreq, pipeline_info.send.num_remain) == 0) { MPIR_Datatype_release_if_not_builtin(MPIDI_OFI_REQUEST(sreq, datatype)); - MPIR_Request_free(sreq); + MPIDI_Request_complete_fast(sreq); } return mpi_errno; @@ -259,6 +256,8 @@ int MPIDI_OFI_gpu_pipeline_recv(MPIR_Request * rreq, /* The 1st recv is an empty chunk for matching. We need initialize rreq. */ MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.offset) = 0; + MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.num_inrecv) = 0; + MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.num_remain) = 0; MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.is_sync) = false; MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.remote_addr) = remote_addr; MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.vci_local) = vci_local; @@ -305,7 +304,8 @@ static int recv_alloc_poll(MPIR_Async_thing * thing) struct recv_alloc *p = MPIR_Async_thing_get_state(thing); MPIR_Request *rreq = p->rreq; - if (MPIR_cc_get(rreq->cc) > 1) { + /* arbitary threshold */ + if (MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.num_inrecv) > 1) { return MPIR_ASYNC_THING_NOPROGRESS; } @@ -339,6 +339,7 @@ static int recv_alloc_poll(MPIR_Async_thing * thing) match_bits, mask_bits, (void *) &chunk_req->context); MPID_THREAD_CS_EXIT(VCI, MPIDI_VCI(vci).lock); if (ret == 0) { + MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.num_inrecv) += 1; free(p); /* chunk_req and host_buf will be freed in recv_events */ return MPIR_ASYNC_THING_DONE; @@ -382,17 +383,18 @@ int MPIDI_OFI_gpu_pipeline_recv_event(struct fi_cq_tagged_entry *wc, MPIR_Reques uint32_t packed = MPIDI_OFI_idata_get_gpu_packed_bit(wc->data); uint32_t n_chunks = MPIDI_OFI_idata_get_gpuchunk_bits(wc->data); /* ? - Not sure why sender cannot send packed data */ - MPIR_Assertp(packed == 0); + MPIR_Assert(packed == 0); if (wc->len > 0) { /* message from a normal send */ MPIR_Assert(n_chunks == 0); + MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.num_remain) = 1; mpi_errno = start_recv_copy(rreq, host_buf, wc->len, recv_buf, recv_count, datatype); MPIR_ERR_CHECK(mpi_errno); } else { MPIR_Assert(n_chunks > 0); + MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.num_remain) = n_chunks; /* There is no data in the init chunk, free the buffer */ MPIDU_genq_private_pool_free_cell(MPIDI_OFI_global.gpu_pipeline_recv_pool, host_buf); - MPIR_cc_dec(rreq->cc_ptr); /* Post recv for the remaining chunks. */ for (int i = 0; i < n_chunks; i++) { mpi_errno = start_recv_chunk(rreq, i, n_chunks); @@ -401,6 +403,7 @@ int MPIDI_OFI_gpu_pipeline_recv_event(struct fi_cq_tagged_entry *wc, MPIR_Reques } } else { MPIR_Assert(event_id == MPIDI_OFI_EVENT_RECV_GPU_PIPELINE); + MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.num_inrecv) -= 1; mpi_errno = start_recv_copy(rreq, host_buf, wc->len, recv_buf, recv_count, datatype); MPIR_ERR_CHECK(mpi_errno); } @@ -478,9 +481,8 @@ static int recv_copy_poll(MPIR_Async_thing * thing) static void recv_copy_complete(MPIR_Request * rreq, void *buf) { int mpi_errno = MPI_SUCCESS; - int c; - MPIR_cc_decr(rreq->cc_ptr, &c); - if (c == 0) { + MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.num_remain) -= 1; + if (MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.num_remain) == 0) { /* all chunks arrived and copied */ if (unlikely(MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.is_sync))) { MPIR_Comm *comm = rreq->comm; @@ -511,7 +513,7 @@ static void recv_copy_complete(MPIR_Request * rreq, void *buf) /* Set number of bytes in status. */ MPIR_STATUS_SET_COUNT(rreq->status, MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.offset)); - MPIR_Request_free(rreq); + MPIDI_Request_complete_fast(rreq); } /* Free host buffer, yaksa request and task. */ diff --git a/src/mpid/ch4/netmod/ofi/ofi_pre.h b/src/mpid/ch4/netmod/ofi/ofi_pre.h index bfc9ea5b53c..07b999ca808 100644 --- a/src/mpid/ch4/netmod/ofi/ofi_pre.h +++ b/src/mpid/ch4/netmod/ofi/ofi_pre.h @@ -223,6 +223,7 @@ typedef struct { fi_addr_t remote_addr; uint64_t cq_data; uint64_t match_bits; + int num_remain; } send; struct { int vci_local; @@ -231,6 +232,8 @@ typedef struct { uint64_t match_bits; uint64_t mask_bits; MPI_Aint offset; + int num_inrecv; + int num_remain; bool is_sync; } recv; } pipeline_info; /* GPU pipeline */