diff --git a/dummy b/dummy new file mode 100644 index 00000000000..d00491fd7e5 --- /dev/null +++ b/dummy @@ -0,0 +1 @@ +1 diff --git a/src/include/mpiimpl.h b/src/include/mpiimpl.h index 35e5484132a..c21894d2ba3 100644 --- a/src/include/mpiimpl.h +++ b/src/include/mpiimpl.h @@ -155,7 +155,6 @@ typedef struct MPIR_Stream MPIR_Stream; /******************* PART 3: DEVICE INDEPENDENT HEADERS **********************/ /*****************************************************************************/ -#include "mpir_misc.h" #include "mpir_dbg.h" #include "mpir_objects.h" #include "mpir_strerror.h" @@ -166,6 +165,7 @@ typedef struct MPIR_Stream MPIR_Stream; #include "mpir_mem.h" #include "mpir_info.h" #include "mpir_errcodes.h" +#include "mpir_misc.h" #include "mpir_errhandler.h" #include "mpir_attr_generic.h" #include "mpir_contextid.h" diff --git a/src/include/mpir_misc.h b/src/include/mpir_misc.h index 6a46b13d11c..2a0c1277c13 100644 --- a/src/include/mpir_misc.h +++ b/src/include/mpir_misc.h @@ -49,9 +49,6 @@ extern MPL_initlock_t MPIR_init_lock; #include "typerep_pre.h" /* needed for MPIR_Typerep_req */ -/* FIXME: bad names. Not gpu-specific, confusing with MPIR_Request. - * It's a general async handle. - */ typedef enum { MPIR_NULL_REQUEST = 0, MPIR_TYPEREP_REQUEST, @@ -64,7 +61,27 @@ typedef struct { MPL_gpu_request gpu_req; } u; MPIR_request_type_t type; -} MPIR_gpu_req; +} MPIR_async_req; + +MPL_STATIC_INLINE_PREFIX void MPIR_async_test(MPIR_async_req * areq, int *is_done) +{ + int err; + switch (areq->type) { + case MPIR_NULL_REQUEST: + /* a dummy, immediately complete */ + *is_done = 1; + break; + case MPIR_TYPEREP_REQUEST: + MPIR_Typerep_test(areq->u.y_req, is_done); + break; + case MPIR_GPU_REQUEST: + err = MPL_gpu_test(&areq->u.gpu_req, is_done); + MPIR_Assertp(err == MPL_SUCCESS); + break; + default: + MPIR_Assert(0); + } +} int MPIR_Localcopy(const void *sendbuf, MPI_Aint sendcount, MPI_Datatype sendtype, void *recvbuf, MPI_Aint recvcount, MPI_Datatype recvtype); @@ -82,7 +99,7 @@ int MPIR_Ilocalcopy_gpu(const void *sendbuf, MPI_Aint sendcount, MPI_Datatype se MPI_Aint sendoffset, MPL_pointer_attr_t * sendattr, void *recvbuf, MPI_Aint recvcount, MPI_Datatype recvtype, MPI_Aint recvoffset, MPL_pointer_attr_t * recvattr, MPL_gpu_copy_direction_t dir, - MPL_gpu_engine_type_t enginetype, bool commit, MPIR_gpu_req * req); + MPL_gpu_engine_type_t enginetype, bool commit, MPIR_async_req * req); /* Contiguous datatype calculates buffer address with `(char *) buf + dt_true_lb`. * However, dt_true_lb is treated as ptrdiff_t (signed), and when buf is MPI_BOTTOM diff --git a/src/include/mpir_typerep.h b/src/include/mpir_typerep.h index 536a4502c77..81d9ec48cab 100644 --- a/src/include/mpir_typerep.h +++ b/src/include/mpir_typerep.h @@ -79,8 +79,6 @@ int MPIR_Typerep_ipack(const void *inbuf, MPI_Aint incount, MPI_Datatype datatyp int MPIR_Typerep_iunpack(const void *inbuf, MPI_Aint insize, void *outbuf, MPI_Aint outcount, MPI_Datatype datatype, MPI_Aint outoffset, MPI_Aint * actual_unpack_bytes, MPIR_Typerep_req * typerep_req, uint32_t flags); -int MPIR_Typerep_wait(MPIR_Typerep_req typerep_req); -int MPIR_Typerep_test(MPIR_Typerep_req typerep_req, int *completed); int MPIR_Typerep_size_external32(MPI_Datatype type); int MPIR_Typerep_pack_external(const void *inbuf, MPI_Aint incount, MPI_Datatype datatype, diff --git a/src/mpi/datatype/typerep/src/typerep_pre.h b/src/mpi/datatype/typerep/src/typerep_pre.h index 022510fbe2c..347bed20a41 100644 --- a/src/mpi/datatype/typerep/src/typerep_pre.h +++ b/src/mpi/datatype/typerep/src/typerep_pre.h @@ -28,4 +28,7 @@ typedef struct { #define MPIR_TYPEREP_HANDLE_NULL NULL #endif +int MPIR_Typerep_wait(MPIR_Typerep_req typerep_req); +int MPIR_Typerep_test(MPIR_Typerep_req typerep_req, int *completed); + #endif /* TYPEREP_PRE_H_INCLUDED */ diff --git a/src/mpi/misc/utils.c b/src/mpi/misc/utils.c index 88de7f1b40c..8be94576821 100644 --- a/src/mpi/misc/utils.c +++ b/src/mpi/misc/utils.c @@ -188,7 +188,8 @@ static int do_localcopy_gpu(const void *sendbuf, MPI_Aint sendcount, MPI_Datatyp MPI_Aint sendoffset, MPL_pointer_attr_t * send_attr, void *recvbuf, MPI_Aint recvcount, MPI_Datatype recvtype, MPI_Aint recvoffset, MPL_pointer_attr_t * recv_attr, MPL_gpu_copy_direction_t dir, - MPL_gpu_engine_type_t enginetype, bool commit, MPIR_gpu_req * gpu_req) + MPL_gpu_engine_type_t enginetype, bool commit, + MPIR_async_req * async_req) { int mpi_errno = MPI_SUCCESS; int mpl_errno = MPL_SUCCESS; @@ -200,8 +201,8 @@ static int do_localcopy_gpu(const void *sendbuf, MPI_Aint sendcount, MPI_Datatyp MPIR_FUNC_ENTER; - if (gpu_req) - gpu_req->type = MPIR_NULL_REQUEST; + if (async_req) + async_req->type = MPIR_NULL_REQUEST; MPIR_Datatype_get_size_macro(sendtype, sendsize); MPIR_Datatype_get_size_macro(recvtype, recvsize); @@ -260,7 +261,7 @@ static int do_localcopy_gpu(const void *sendbuf, MPI_Aint sendcount, MPI_Datatyp MPIR_ERR_CHKANDJUMP(dev_id == -1, mpi_errno, MPI_ERR_OTHER, "**mpl_gpu_get_dev_id_from_attr"); - if (gpu_req == NULL) { + if (async_req == NULL) { MPL_gpu_request req; mpl_errno = MPL_gpu_imemcpy((char *) MPIR_get_contig_ptr(recvbuf, recvtype_true_lb) + @@ -281,8 +282,8 @@ static int do_localcopy_gpu(const void *sendbuf, MPI_Aint sendcount, MPI_Datatyp recvoffset, (char *) MPIR_get_contig_ptr(sendbuf, sendtype_true_lb) + sendoffset, copy_sz, dev_id, dir, enginetype, - &gpu_req->u.gpu_req, commit); - gpu_req->type = MPIR_GPU_REQUEST; + &async_req->u.gpu_req, commit); + async_req->type = MPIR_GPU_REQUEST; } } #else @@ -300,15 +301,15 @@ static int do_localcopy_gpu(const void *sendbuf, MPI_Aint sendcount, MPI_Datatyp fn_fail: goto fn_exit; fn_fallback: - if (gpu_req) { + if (async_req) { mpi_errno = do_localcopy(sendbuf, sendcount, sendtype, sendoffset, recvbuf, recvcount, recvtype, - recvoffset, LOCALCOPY_NONBLOCKING, &gpu_req->u.y_req); + recvoffset, LOCALCOPY_NONBLOCKING, &async_req->u.y_req); MPIR_ERR_CHECK(mpi_errno); - if (gpu_req->u.y_req.req == MPIR_TYPEREP_REQ_NULL) { - gpu_req->type = MPIR_NULL_REQUEST; + if (async_req->u.y_req.req == MPIR_TYPEREP_REQ_NULL) { + async_req->type = MPIR_NULL_REQUEST; } else { - gpu_req->type = MPIR_TYPEREP_REQUEST; + async_req->type = MPIR_TYPEREP_REQUEST; } } else { mpi_errno = @@ -414,7 +415,7 @@ int MPIR_Ilocalcopy_gpu(const void *sendbuf, MPI_Aint sendcount, MPI_Datatype se MPI_Aint sendoffset, MPL_pointer_attr_t * sendattr, void *recvbuf, MPI_Aint recvcount, MPI_Datatype recvtype, MPI_Aint recvoffset, MPL_pointer_attr_t * recvattr, MPL_gpu_copy_direction_t dir, - MPL_gpu_engine_type_t enginetype, bool commit, MPIR_gpu_req * req) + MPL_gpu_engine_type_t enginetype, bool commit, MPIR_async_req * async_req) { int mpi_errno = MPI_SUCCESS; @@ -423,14 +424,14 @@ int MPIR_Ilocalcopy_gpu(const void *sendbuf, MPI_Aint sendcount, MPI_Datatype se #ifdef MPL_HAVE_GPU mpi_errno = do_localcopy_gpu(sendbuf, sendcount, sendtype, sendoffset, sendattr, recvbuf, recvcount, - recvtype, recvoffset, recvattr, dir, enginetype, commit, req); + recvtype, recvoffset, recvattr, dir, enginetype, commit, async_req); MPIR_ERR_CHECK(mpi_errno); #else mpi_errno = do_localcopy(sendbuf, sendcount, sendtype, sendoffset, recvbuf, recvcount, recvtype, - recvoffset, LOCALCOPY_NONBLOCKING, &req->u.y_req); + recvoffset, LOCALCOPY_NONBLOCKING, &async_req->u.y_req); MPIR_ERR_CHECK(mpi_errno); - req->type = MPIR_TYPEREP_REQUEST; + async_req->type = MPIR_TYPEREP_REQUEST; #endif fn_exit: diff --git a/src/mpid/ch4/netmod/ofi/Makefile.mk b/src/mpid/ch4/netmod/ofi/Makefile.mk index 0ccce9d6181..01addc1021c 100644 --- a/src/mpid/ch4/netmod/ofi/Makefile.mk +++ b/src/mpid/ch4/netmod/ofi/Makefile.mk @@ -21,6 +21,7 @@ mpi_core_sources += src/mpid/ch4/netmod/ofi/func_table.c \ src/mpid/ch4/netmod/ofi/ofi_progress.c \ src/mpid/ch4/netmod/ofi/ofi_am_events.c \ src/mpid/ch4/netmod/ofi/ofi_nic.c \ + src/mpid/ch4/netmod/ofi/ofi_gpu_pipeline.c \ src/mpid/ch4/netmod/ofi/globals.c \ src/mpid/ch4/netmod/ofi/init_provider.c \ src/mpid/ch4/netmod/ofi/init_settings.c \ diff --git a/src/mpid/ch4/netmod/ofi/ofi_am_impl.h b/src/mpid/ch4/netmod/ofi/ofi_am_impl.h index dc3bd2bcfac..6a1ab05c040 100644 --- a/src/mpid/ch4/netmod/ofi/ofi_am_impl.h +++ b/src/mpid/ch4/netmod/ofi/ofi_am_impl.h @@ -28,7 +28,7 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_progress_do_queue(int vci_idx); * The seq need be tracked between local (rank, vci) and remote (rank, vci). * We don't need local rank since it is implicit on each process. * - * LOCAL_ID is send to remote precess to identify self. + * LOCAL_ID is send to remote process to identify self. * REMOTE_ID is used locally to track remote process. * I realize the confusing part of the naming. * @@ -568,7 +568,7 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_do_emulated_inject(MPIR_Comm * comm, fi_a memcpy(ibuf + sizeof(*msg_hdrp), am_hdr, am_hdr_sz); MPIDI_OFI_REQUEST(sreq, event_id) = MPIDI_OFI_EVENT_INJECT_EMU; - MPIDI_OFI_REQUEST(sreq, util.inject_buf) = ibuf; + MPIDI_OFI_REQUEST(sreq, u.am_inject_emu.inject_buf) = ibuf; MPIDI_OFI_global.per_vci[vci_src].am_inflight_inject_emus += 1; MPIDI_OFI_CALL_RETRY_AM(fi_send(MPIDI_OFI_global.ctx[ctx_idx].tx, ibuf, len, diff --git a/src/mpid/ch4/netmod/ofi/ofi_comm.c b/src/mpid/ch4/netmod/ofi/ofi_comm.c index 57b9cb131de..8936941498a 100644 --- a/src/mpid/ch4/netmod/ofi/ofi_comm.c +++ b/src/mpid/ch4/netmod/ofi/ofi_comm.c @@ -145,6 +145,9 @@ int MPIDI_OFI_mpi_comm_commit_pre_hook(MPIR_Comm * comm) MPIDI_OFI_COMM(comm).enable_hashing = 0; MPIDI_OFI_COMM(comm).pref_nic = NULL; + /* Initialize tag for gpu_pipeline chunks; incremented by sender. */ + MPIDI_OFI_COMM(comm).pipeline_tag = 0; + if (comm->hints[MPIR_COMM_HINT_ENABLE_MULTI_NIC_STRIPING] == -1) { comm->hints[MPIR_COMM_HINT_ENABLE_MULTI_NIC_STRIPING] = MPIR_CVAR_CH4_OFI_ENABLE_MULTI_NIC_STRIPING; diff --git a/src/mpid/ch4/netmod/ofi/ofi_events.c b/src/mpid/ch4/netmod/ofi/ofi_events.c index 84046946d11..75b574fe1fb 100644 --- a/src/mpid/ch4/netmod/ofi/ofi_events.c +++ b/src/mpid/ch4/netmod/ofi/ofi_events.c @@ -80,165 +80,6 @@ static int peek_empty_event(int vci, struct fi_cq_tagged_entry *wc, MPIR_Request return MPI_SUCCESS; } -/* GPU pipeline events */ -static int pipeline_send_event(struct fi_cq_tagged_entry *wc, MPIR_Request * r) -{ - int mpi_errno = MPI_SUCCESS; - int c; - MPIDI_OFI_gpu_pipeline_request *req; - MPIR_Request *sreq; - void *wc_buf = NULL; - MPIR_FUNC_ENTER; - - req = (MPIDI_OFI_gpu_pipeline_request *) r; - /* get original mpi request */ - sreq = req->parent; - wc_buf = req->buf; - MPIDU_genq_private_pool_free_cell(MPIDI_OFI_global.gpu_pipeline_send_pool, wc_buf); - - MPIR_cc_decr(sreq->cc_ptr, &c); - if (c == 0) { - MPIR_Datatype_release_if_not_builtin(MPIDI_OFI_REQUEST(sreq, datatype)); - MPIR_Request_free(sreq); - } - MPL_free(r); - - MPIR_FUNC_EXIT; - return mpi_errno; -} - -static int pipeline_recv_event(struct fi_cq_tagged_entry *wc, MPIR_Request * r, int event_id) -{ - int mpi_errno = MPI_SUCCESS; - int vci_local, i; - MPIDI_OFI_gpu_pipeline_request *req; - MPIR_Request *rreq; - void *wc_buf = NULL; - int in_use MPL_UNUSED; - MPIDI_OFI_gpu_task_t *task = NULL; - int engine_type = MPIR_CVAR_CH4_OFI_GPU_PIPELINE_H2D_ENGINE_TYPE; - - MPIR_FUNC_ENTER; - - req = (MPIDI_OFI_gpu_pipeline_request *) r; - rreq = req->parent; - wc_buf = req->buf; - MPL_free(r); - - void *recv_buf = MPIDI_OFI_REQUEST(rreq, noncontig.pack.buf); - size_t recv_count = MPIDI_OFI_REQUEST(rreq, noncontig.pack.count); - MPI_Datatype datatype = MPIDI_OFI_REQUEST(rreq, noncontig.pack.datatype); - - fi_addr_t remote_addr = MPIDI_OFI_REQUEST(rreq, pipeline_info.remote_addr); - vci_local = MPIDI_OFI_REQUEST(rreq, pipeline_info.vci_local); - - if (event_id == MPIDI_OFI_EVENT_RECV_GPU_PIPELINE_INIT) { - rreq->status.MPI_SOURCE = MPIDI_OFI_cqe_get_source(wc, true); - rreq->status.MPI_ERROR = MPIDI_OFI_idata_get_error_bits(wc->data); - rreq->status.MPI_TAG = MPIDI_OFI_init_get_tag(wc->tag); - - if (unlikely(MPIDI_OFI_is_tag_sync(wc->tag))) { - MPIDI_OFI_REQUEST(rreq, pipeline_info.is_sync) = true; - } - - uint32_t packed = MPIDI_OFI_idata_get_gpu_packed_bit(wc->data); - uint32_t n_chunks = MPIDI_OFI_idata_get_gpuchunk_bits(wc->data); - if (likely(packed == 0)) { - if (wc->len > 0) { - MPIR_Assert(n_chunks == 0); - /* First chunk arrives. */ - MPI_Aint actual_unpack_bytes; - MPIR_gpu_req yreq; - mpi_errno = - MPIR_Ilocalcopy_gpu(wc_buf, wc->len, MPI_BYTE, 0, NULL, recv_buf, recv_count, - datatype, 0, NULL, MPL_GPU_COPY_H2D, engine_type, 1, &yreq); - MPIR_ERR_CHECK(mpi_errno); - actual_unpack_bytes = wc->len; - task = - MPIDI_OFI_create_gpu_task(MPIDI_OFI_PIPELINE_RECV, wc_buf, - actual_unpack_bytes, rreq, yreq); - DL_APPEND(MPIDI_OFI_global.gpu_recv_task_queue[vci_local], task); - MPIDI_OFI_REQUEST(rreq, pipeline_info.offset) += (size_t) actual_unpack_bytes; - } else { - /* free this chunk */ - MPIDU_genq_private_pool_free_cell(MPIDI_OFI_global.gpu_pipeline_recv_pool, wc_buf); - MPIR_Assert(n_chunks > 0); - /* Post recv for remaining chunks. */ - MPIR_cc_dec(rreq->cc_ptr); - for (i = 0; i < n_chunks; i++) { - int c; - MPIR_cc_incr(rreq->cc_ptr, &c); - - size_t chunk_sz = MPIR_CVAR_CH4_OFI_GPU_PIPELINE_BUFFER_SZ; - - char *host_buf = NULL; - MPIDU_genq_private_pool_alloc_cell(MPIDI_OFI_global.gpu_pipeline_recv_pool, - (void **) &host_buf); - - MPIDI_OFI_REQUEST(rreq, event_id) = MPIDI_OFI_EVENT_RECV_GPU_PIPELINE; - - MPIDI_OFI_gpu_pipeline_request *chunk_req = NULL; - chunk_req = (MPIDI_OFI_gpu_pipeline_request *) - MPL_malloc(sizeof(MPIDI_OFI_gpu_pipeline_request), MPL_MEM_BUFFER); - if (chunk_req == NULL) { - mpi_errno = MPIR_ERR_OTHER; - goto fn_fail; - } - chunk_req->event_id = MPIDI_OFI_EVENT_RECV_GPU_PIPELINE; - chunk_req->parent = rreq; - chunk_req->buf = host_buf; - int ret = 0; - if (!MPIDI_OFI_global.gpu_recv_queue && host_buf) { - ret = fi_trecv - (MPIDI_OFI_global.ctx - [MPIDI_OFI_REQUEST(rreq, pipeline_info.ctx_idx)].rx, - host_buf, chunk_sz, NULL, remote_addr, - MPIDI_OFI_REQUEST(rreq, - pipeline_info.match_bits) | - MPIDI_OFI_GPU_PIPELINE_SEND, MPIDI_OFI_REQUEST(rreq, - pipeline_info. - mask_bits), - (void *) &chunk_req->context); - } - if (MPIDI_OFI_global.gpu_recv_queue || !host_buf || ret != 0) { - MPIDI_OFI_gpu_pending_recv_t *recv_task = - MPIDI_OFI_create_recv_task(chunk_req, i, n_chunks); - DL_APPEND(MPIDI_OFI_global.gpu_recv_queue, recv_task); - } - } - } - } else { - MPIR_ERR_CHKANDJUMP(true, mpi_errno, MPI_ERR_OTHER, "**gpu_pipeline_packed"); - } - } else { - if (likely(event_id == MPIDI_OFI_EVENT_RECV_GPU_PIPELINE)) { - /* FIXME: current design unpacks all bytes from host buffer, overflow check is missing. */ - MPI_Aint actual_unpack_bytes; - MPIR_gpu_req yreq; - mpi_errno = - MPIR_Ilocalcopy_gpu(wc_buf, (MPI_Aint) wc->len, MPI_BYTE, 0, NULL, - (char *) recv_buf, (MPI_Aint) recv_count, datatype, - MPIDI_OFI_REQUEST(rreq, pipeline_info.offset), NULL, - MPL_GPU_COPY_H2D, engine_type, 1, &yreq); - MPIR_ERR_CHECK(mpi_errno); - actual_unpack_bytes = wc->len; - MPIDI_OFI_REQUEST(rreq, pipeline_info.offset) += (size_t) actual_unpack_bytes; - task = - MPIDI_OFI_create_gpu_task(MPIDI_OFI_PIPELINE_RECV, wc_buf, actual_unpack_bytes, - rreq, yreq); - DL_APPEND(MPIDI_OFI_global.gpu_recv_task_queue[vci_local], task); - } else { - MPIR_ERR_CHKANDJUMP(true, mpi_errno, MPI_ERR_OTHER, "**gpu_pipeline_packed"); - } - } - fn_exit: - MPIR_FUNC_EXIT; - return mpi_errno; - fn_fail: - rreq->status.MPI_ERROR = mpi_errno; - goto fn_exit; -} - static int send_huge_event(int vci, struct fi_cq_tagged_entry *wc, MPIR_Request * sreq) { int mpi_errno = MPI_SUCCESS; @@ -253,7 +94,7 @@ static int send_huge_event(int vci, struct fi_cq_tagged_entry *wc, MPIR_Request comm = sreq->comm; num_nics = MPIDI_OFI_COMM(comm).enable_striping ? MPIDI_OFI_global.num_nics : 1; - huge_send_mrs = MPIDI_OFI_REQUEST(sreq, huge.send_mrs); + huge_send_mrs = MPIDI_OFI_REQUEST(sreq, u.huge_send.mrs); /* Clean up the memory region */ for (int i = 0; i < num_nics; i++) { @@ -264,10 +105,7 @@ static int send_huge_event(int vci, struct fi_cq_tagged_entry *wc, MPIR_Request } } MPL_free(huge_send_mrs); - - if (MPIDI_OFI_REQUEST(sreq, noncontig.pack.pack_buffer)) { - MPL_free(MPIDI_OFI_REQUEST(sreq, noncontig.pack.pack_buffer)); - } + MPL_free(MPIDI_OFI_REQUEST(sreq, u.huge_send.pack_buffer)); MPIR_Datatype_release_if_not_builtin(MPIDI_OFI_REQUEST(sreq, datatype)); MPIDI_CH4_REQUEST_FREE(sreq); @@ -315,7 +153,7 @@ static int inject_emu_event(int vci, struct fi_cq_tagged_entry *wc, MPIR_Request MPIR_cc_decr(req->cc_ptr, &incomplete); if (!incomplete) { - MPL_free(MPIDI_OFI_REQUEST(req, util.inject_buf)); + MPL_free(MPIDI_OFI_REQUEST(req, u.am_inject_emu.inject_buf)); MPIDI_CH4_REQUEST_FREE(req); MPIDI_OFI_global.per_vci[vci].am_inflight_inject_emus -= 1; } @@ -597,114 +435,113 @@ int MPIDI_OFI_dispatch_function(int vci, struct fi_cq_tagged_entry *wc, MPIR_Req { int mpi_errno = MPI_SUCCESS; - if (MPIDI_OFI_REQUEST(req, event_id) == MPIDI_OFI_EVENT_SEND) { - /* Passing the event_id as a parameter; do not need to load it from the - * request object each time the send_event handler is invoked */ - mpi_errno = MPIDI_OFI_send_event(vci, wc, req, MPIDI_OFI_EVENT_SEND); - goto fn_exit; - } else if (MPIDI_OFI_REQUEST(req, event_id) == MPIDI_OFI_EVENT_RECV) { - /* Passing the event_id as a parameter; do not need to load it from the - * request object each time the send_event handler is invoked */ - mpi_errno = MPIDI_OFI_recv_event(vci, wc, req, MPIDI_OFI_EVENT_RECV); - goto fn_exit; - } else if (MPIDI_OFI_REQUEST(req, event_id) == MPIDI_OFI_EVENT_AM_SEND) { - mpi_errno = am_isend_event(vci, wc, req); - goto fn_exit; - } else if (MPIDI_OFI_REQUEST(req, event_id) == MPIDI_OFI_EVENT_AM_SEND_RDMA) { - mpi_errno = am_isend_rdma_event(vci, wc, req); - goto fn_exit; - } else if (MPIDI_OFI_REQUEST(req, event_id) == MPIDI_OFI_EVENT_AM_SEND_PIPELINE) { - mpi_errno = am_isend_pipeline_event(vci, wc, req); - goto fn_exit; - } else if (MPIDI_OFI_REQUEST(req, event_id) == MPIDI_OFI_EVENT_AM_RECV) { - if (wc->flags & FI_RECV) - mpi_errno = am_recv_event(vci, wc, req); + switch (MPIDI_OFI_REQUEST(req, event_id)) { + case MPIDI_OFI_EVENT_SEND: + mpi_errno = MPIDI_OFI_send_event(vci, wc, req, MPIDI_OFI_EVENT_SEND); + break; - if (unlikely(wc->flags & FI_MULTI_RECV)) { - MPIDI_OFI_am_repost_request_t *am = (MPIDI_OFI_am_repost_request_t *) req; - mpi_errno = MPIDI_OFI_am_repost_buffer(vci, am->index); - } + case MPIDI_OFI_EVENT_RECV: + mpi_errno = MPIDI_OFI_recv_event(vci, wc, req, MPIDI_OFI_EVENT_RECV); + break; - goto fn_exit; - } else if (MPIDI_OFI_REQUEST(req, event_id) == MPIDI_OFI_EVENT_AM_READ) { - mpi_errno = am_read_event(vci, wc, req); - goto fn_exit; - } else if (MPIDI_OFI_REQUEST(req, event_id) == MPIDI_OFI_EVENT_SEND_GPU_PIPELINE) { - mpi_errno = pipeline_send_event(wc, req); - goto fn_exit; - } else if (MPIDI_OFI_REQUEST(req, event_id) == MPIDI_OFI_EVENT_RECV_GPU_PIPELINE_INIT) { - mpi_errno = pipeline_recv_event(wc, req, MPIDI_OFI_EVENT_RECV_GPU_PIPELINE_INIT); - goto fn_exit; - } else if (MPIDI_OFI_REQUEST(req, event_id) == MPIDI_OFI_EVENT_RECV_GPU_PIPELINE) { - mpi_errno = pipeline_recv_event(wc, req, MPIDI_OFI_EVENT_RECV_GPU_PIPELINE); - goto fn_exit; - } else if (unlikely(1)) { - switch (MPIDI_OFI_REQUEST(req, event_id)) { - case MPIDI_OFI_EVENT_PEEK: - mpi_errno = peek_event(vci, wc, req); - break; + case MPIDI_OFI_EVENT_PEEK: + mpi_errno = peek_event(vci, wc, req); + break; - case MPIDI_OFI_EVENT_RECV_HUGE: - if (wc->tag & MPIDI_OFI_HUGE_SEND) { - mpi_errno = MPIDI_OFI_recv_huge_event(vci, wc, req); - } else { - mpi_errno = MPIDI_OFI_recv_event(vci, wc, req, MPIDI_OFI_EVENT_RECV_HUGE); - } - break; + case MPIDI_OFI_EVENT_RECV_HUGE: + if (wc->tag & MPIDI_OFI_HUGE_SEND) { + mpi_errno = MPIDI_OFI_recv_huge_event(vci, wc, req); + } else { + mpi_errno = MPIDI_OFI_recv_event(vci, wc, req, MPIDI_OFI_EVENT_RECV_HUGE); + } + break; - case MPIDI_OFI_EVENT_RECV_PACK: - mpi_errno = MPIDI_OFI_recv_event(vci, wc, req, MPIDI_OFI_EVENT_RECV_PACK); - break; + case MPIDI_OFI_EVENT_RECV_PACK: + mpi_errno = MPIDI_OFI_recv_event(vci, wc, req, MPIDI_OFI_EVENT_RECV_PACK); + break; - case MPIDI_OFI_EVENT_RECV_NOPACK: - mpi_errno = MPIDI_OFI_recv_event(vci, wc, req, MPIDI_OFI_EVENT_RECV_NOPACK); - break; + case MPIDI_OFI_EVENT_RECV_NOPACK: + mpi_errno = MPIDI_OFI_recv_event(vci, wc, req, MPIDI_OFI_EVENT_RECV_NOPACK); + break; - case MPIDI_OFI_EVENT_SEND_HUGE: - mpi_errno = send_huge_event(vci, wc, req); - break; + case MPIDI_OFI_EVENT_RECV_GPU_PIPELINE_INIT: + case MPIDI_OFI_EVENT_RECV_GPU_PIPELINE: + mpi_errno = MPIDI_OFI_gpu_pipeline_recv_event(wc, req); + break; - case MPIDI_OFI_EVENT_SEND_PACK: - mpi_errno = MPIDI_OFI_send_event(vci, wc, req, MPIDI_OFI_EVENT_SEND_PACK); - break; + case MPIDI_OFI_EVENT_SEND_HUGE: + mpi_errno = send_huge_event(vci, wc, req); + break; - case MPIDI_OFI_EVENT_SEND_NOPACK: - mpi_errno = MPIDI_OFI_send_event(vci, wc, req, MPIDI_OFI_EVENT_SEND_NOPACK); - break; + case MPIDI_OFI_EVENT_SEND_GPU_PIPELINE: + mpi_errno = MPIDI_OFI_gpu_pipeline_send_event(wc, req); + break; - case MPIDI_OFI_EVENT_SSEND_ACK: - mpi_errno = ssend_ack_event(vci, wc, req); - break; + case MPIDI_OFI_EVENT_SEND_PACK: + mpi_errno = MPIDI_OFI_send_event(vci, wc, req, MPIDI_OFI_EVENT_SEND_PACK); + break; - case MPIDI_OFI_EVENT_CHUNK_DONE: - mpi_errno = chunk_done_event(vci, wc, req); - break; + case MPIDI_OFI_EVENT_SEND_NOPACK: + mpi_errno = MPIDI_OFI_send_event(vci, wc, req, MPIDI_OFI_EVENT_SEND_NOPACK); + break; - case MPIDI_OFI_EVENT_HUGE_CHUNK_DONE: - mpi_errno = MPIDI_OFI_huge_chunk_done_event(vci, wc, req); - break; + case MPIDI_OFI_EVENT_SSEND_ACK: + mpi_errno = ssend_ack_event(vci, wc, req); + break; - case MPIDI_OFI_EVENT_INJECT_EMU: - mpi_errno = inject_emu_event(vci, wc, req); - break; + case MPIDI_OFI_EVENT_CHUNK_DONE: + mpi_errno = chunk_done_event(vci, wc, req); + break; - case MPIDI_OFI_EVENT_DYNPROC_DONE: - mpi_errno = dynproc_done_event(vci, wc, req); - break; + case MPIDI_OFI_EVENT_HUGE_CHUNK_DONE: + mpi_errno = MPIDI_OFI_huge_chunk_done_event(vci, wc, req); + break; - case MPIDI_OFI_EVENT_ACCEPT_PROBE: - mpi_errno = accept_probe_event(vci, wc, req); - break; + case MPIDI_OFI_EVENT_INJECT_EMU: + mpi_errno = inject_emu_event(vci, wc, req); + break; - case MPIDI_OFI_EVENT_ABORT: - default: - mpi_errno = MPI_SUCCESS; - MPIR_Assert(0); - break; - } + case MPIDI_OFI_EVENT_DYNPROC_DONE: + mpi_errno = dynproc_done_event(vci, wc, req); + break; + + case MPIDI_OFI_EVENT_ACCEPT_PROBE: + mpi_errno = accept_probe_event(vci, wc, req); + break; + + case MPIDI_OFI_EVENT_AM_SEND: + mpi_errno = am_isend_event(vci, wc, req); + break; + + case MPIDI_OFI_EVENT_AM_SEND_RDMA: + mpi_errno = am_isend_rdma_event(vci, wc, req); + break; + + case MPIDI_OFI_EVENT_AM_SEND_PIPELINE: + mpi_errno = am_isend_pipeline_event(vci, wc, req); + break; + + case MPIDI_OFI_EVENT_AM_RECV: + if (wc->flags & FI_RECV) + mpi_errno = am_recv_event(vci, wc, req); + + if (unlikely(wc->flags & FI_MULTI_RECV)) { + MPIDI_OFI_am_repost_request_t *am = (MPIDI_OFI_am_repost_request_t *) req; + mpi_errno = MPIDI_OFI_am_repost_buffer(vci, am->index); + } + break; + + case MPIDI_OFI_EVENT_AM_READ: + mpi_errno = am_read_event(vci, wc, req); + break; + + case MPIDI_OFI_EVENT_ABORT: + default: + mpi_errno = MPI_SUCCESS; + MPIR_Assert(0); + break; } - fn_exit: return mpi_errno; } @@ -770,12 +607,10 @@ int MPIDI_OFI_handle_cq_error(int vci, int nic, ssize_t ret) MPIR_STATUS_SET_COUNT(req->status, 0); if ((event_id == MPIDI_OFI_EVENT_RECV_PACK || event_id == MPIDI_OFI_EVENT_GET_HUGE) && - MPIDI_OFI_REQUEST(req, noncontig.pack.pack_buffer)) { - MPL_free(MPIDI_OFI_REQUEST(req, noncontig.pack.pack_buffer)); - } else if (MPIDI_OFI_ENABLE_PT2PT_NOPACK && - event_id == MPIDI_OFI_EVENT_RECV_NOPACK && - MPIDI_OFI_REQUEST(req, noncontig.nopack)) { - MPL_free(MPIDI_OFI_REQUEST(req, noncontig.nopack)); + MPIDI_OFI_REQUEST(req, u.recv.pack_buffer)) { + MPL_free(MPIDI_OFI_REQUEST(req, u.recv.pack_buffer)); + } else if (event_id == MPIDI_OFI_EVENT_RECV_NOPACK) { + MPL_free(MPIDI_OFI_REQUEST(req, u.nopack_recv.iovs)); } MPIR_Datatype_release_if_not_builtin(MPIDI_OFI_REQUEST(req, datatype)); MPIDI_Request_complete_fast(req); diff --git a/src/mpid/ch4/netmod/ofi/ofi_events.h b/src/mpid/ch4/netmod/ofi/ofi_events.h index 036a49b5671..71dfa2c1995 100644 --- a/src/mpid/ch4/netmod/ofi/ofi_events.h +++ b/src/mpid/ch4/netmod/ofi/ofi_events.h @@ -14,31 +14,6 @@ int MPIDI_OFI_rma_done_event(int vci, struct fi_cq_tagged_entry *wc, MPIR_Request * in_req); int MPIDI_OFI_dispatch_function(int vci, struct fi_cq_tagged_entry *wc, MPIR_Request * req); -MPL_STATIC_INLINE_PREFIX MPL_gpu_engine_type_t MPIDI_OFI_gpu_get_recv_engine_type(int cvar) -{ - if (cvar == MPIR_CVAR_CH4_OFI_GPU_RECEIVE_ENGINE_TYPE_compute) { - return MPL_GPU_ENGINE_TYPE_COMPUTE; - } else if (cvar == MPIR_CVAR_CH4_OFI_GPU_RECEIVE_ENGINE_TYPE_copy_high_bandwidth) { - return MPL_GPU_ENGINE_TYPE_COPY_HIGH_BANDWIDTH; - } else if (cvar == MPIR_CVAR_CH4_OFI_GPU_RECEIVE_ENGINE_TYPE_copy_low_latency) { - return MPL_GPU_ENGINE_TYPE_COPY_LOW_LATENCY; - } else { - return MPL_GPU_ENGINE_TYPE_LAST; - } -} - -MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_cqe_get_source(struct fi_cq_tagged_entry *wc, bool has_err) -{ - if (MPIDI_OFI_ENABLE_DATA) { - if (unlikely(has_err)) { - return wc->data & ((1 << MPIDI_OFI_IDATA_SRC_BITS) - 1); - } - return wc->data; - } else { - return MPIDI_OFI_init_get_source(wc->tag); - } -} - MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_send_event(int vci, struct fi_cq_tagged_entry *wc /* unused */ , MPIR_Request * sreq, int event_id) @@ -46,11 +21,12 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_send_event(int vci, MPIR_FUNC_ENTER; /* free the packing buffers and datatype */ - if ((event_id == MPIDI_OFI_EVENT_SEND_PACK) && - (MPIDI_OFI_REQUEST(sreq, noncontig.pack.pack_buffer))) { - MPL_free(MPIDI_OFI_REQUEST(sreq, noncontig.pack.pack_buffer)); - } else if (MPIDI_OFI_ENABLE_PT2PT_NOPACK && (event_id == MPIDI_OFI_EVENT_SEND_NOPACK)) { - MPL_free(MPIDI_OFI_REQUEST(sreq, noncontig.nopack)); + if (event_id == MPIDI_OFI_EVENT_SEND_PACK) { + MPIR_Assert(MPIDI_OFI_REQUEST(sreq, u.pack_send.pack_buffer)); + MPL_free(MPIDI_OFI_REQUEST(sreq, u.pack_send.pack_buffer)); + } else if (event_id == MPIDI_OFI_EVENT_SEND_NOPACK) { + MPIR_Assert(MPIDI_OFI_ENABLE_PT2PT_NOPACK); + MPL_free(MPIDI_OFI_REQUEST(sreq, u.nopack_send.iovs)); } MPIR_Datatype_release_if_not_builtin(MPIDI_OFI_REQUEST(sreq, datatype)); @@ -70,6 +46,10 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_recv_event(int vci, struct fi_cq_tagged_e mpi_errno = MPIDI_OFI_recv_huge_event(vci, wc, rreq); goto fn_exit; } + if (wc->data & MPIDI_OFI_IDATA_PIPELINE) { + mpi_errno = MPIDI_OFI_gpu_pipeline_recv_unexp_event(wc, rreq); + goto fn_exit; + } rreq->status.MPI_SOURCE = MPIDI_OFI_cqe_get_source(wc, true); if (!rreq->status.MPI_ERROR) { rreq->status.MPI_ERROR = MPIDI_OFI_idata_get_error_bits(wc->data); @@ -92,51 +72,54 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_recv_event(int vci, struct fi_cq_tagged_e MPIDI_anysrc_free_partner(rreq); #endif if ((event_id == MPIDI_OFI_EVENT_RECV_PACK || event_id == MPIDI_OFI_EVENT_GET_HUGE) && - (MPIDI_OFI_REQUEST(rreq, noncontig.pack.pack_buffer))) { + (MPIDI_OFI_REQUEST(rreq, u.recv.pack_buffer))) { MPI_Aint actual_unpack_bytes; int is_contig; MPL_pointer_attr_t attr; MPI_Aint true_lb, true_extent; - MPIR_Type_get_true_extent_impl(MPIDI_OFI_REQUEST(rreq, noncontig.pack.datatype), &true_lb, - &true_extent); - MPIR_Datatype_is_contig(MPIDI_OFI_REQUEST(rreq, noncontig.pack.datatype), &is_contig); - void *recv_buf = MPIR_get_contig_ptr(MPIDI_OFI_REQUEST(rreq, noncontig.pack.buf), true_lb); + + void *pack_buffer = MPIDI_OFI_REQUEST(rreq, u.recv.pack_buffer); + void *buf = MPIDI_OFI_REQUEST(rreq, u.recv.buf); + MPI_Datatype datatype = MPIDI_OFI_REQUEST(rreq, u.recv.datatype); + + MPIR_Type_get_true_extent_impl(datatype, &true_lb, &true_extent); + MPIR_Datatype_is_contig(datatype, &is_contig); + void *recv_buf = MPIR_get_contig_ptr(buf, true_lb); + MPIR_GPU_query_pointer_attr(recv_buf, &attr); MPL_gpu_engine_type_t engine = MPIDI_OFI_gpu_get_recv_engine_type(MPIR_CVAR_CH4_OFI_GPU_RECEIVE_ENGINE_TYPE); if (is_contig && engine != MPL_GPU_ENGINE_TYPE_LAST && MPL_gpu_query_pointer_is_dev(recv_buf, &attr)) { actual_unpack_bytes = wc->len; - mpi_errno = - MPIR_Localcopy_gpu(MPIDI_OFI_REQUEST(rreq, noncontig.pack.pack_buffer), count, - MPI_BYTE, 0, NULL, recv_buf, count, MPI_BYTE, 0, &attr, - MPL_GPU_COPY_DIRECTION_NONE, engine, true); + mpi_errno = MPIR_Localcopy_gpu(pack_buffer, count, MPI_BYTE, 0, NULL, + recv_buf, count, MPI_BYTE, 0, &attr, + MPL_GPU_COPY_DIRECTION_NONE, engine, true); MPIR_ERR_CHECK(mpi_errno); } else { - MPIR_Typerep_unpack(MPIDI_OFI_REQUEST(rreq, noncontig.pack.pack_buffer), count, - MPIDI_OFI_REQUEST(rreq, noncontig.pack.buf), - MPIDI_OFI_REQUEST(rreq, noncontig.pack.count), - MPIDI_OFI_REQUEST(rreq, noncontig.pack.datatype), 0, + MPI_Aint recv_count = MPIDI_OFI_REQUEST(rreq, u.recv.count); + MPIR_Typerep_unpack(pack_buffer, count, buf, recv_count, datatype, 0, &actual_unpack_bytes, MPIR_TYPEREP_FLAG_NONE); } - MPL_free(MPIDI_OFI_REQUEST(rreq, noncontig.pack.pack_buffer)); + MPL_free(pack_buffer); if (actual_unpack_bytes != (MPI_Aint) count) { rreq->status.MPI_ERROR = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, __FUNCTION__, __LINE__, MPI_ERR_TYPE, "**dtypemismatch", 0); } - } else if (MPIDI_OFI_ENABLE_PT2PT_NOPACK && (event_id == MPIDI_OFI_EVENT_RECV_NOPACK) && - (MPIDI_OFI_REQUEST(rreq, noncontig.nopack))) { - MPI_Count elements; + } else if (event_id == MPIDI_OFI_EVENT_RECV_NOPACK) { + MPIR_Assert(MPIDI_OFI_ENABLE_PT2PT_NOPACK); + MPIR_Assert(MPIDI_OFI_REQUEST(rreq, u.nopack_recv.iovs)); + MPI_Count elements; /* Check to see if there are any bytes that don't fit into the datatype basic elements */ MPI_Count count_x = count; /* need a MPI_Count variable (consider 32-bit OS) */ MPIR_Get_elements_x_impl(&count_x, MPIDI_OFI_REQUEST(rreq, datatype), &elements); if (count_x) MPIR_ERR_SET(rreq->status.MPI_ERROR, MPI_ERR_TYPE, "**dtypemismatch"); - MPL_free(MPIDI_OFI_REQUEST(rreq, noncontig.nopack)); + MPL_free(MPIDI_OFI_REQUEST(rreq, u.nopack_recv.iovs)); } MPIR_Datatype_release_if_not_builtin(MPIDI_OFI_REQUEST(rreq, datatype)); diff --git a/src/mpid/ch4/netmod/ofi/ofi_gpu_pipeline.c b/src/mpid/ch4/netmod/ofi/ofi_gpu_pipeline.c new file mode 100644 index 00000000000..81546f687d9 --- /dev/null +++ b/src/mpid/ch4/netmod/ofi/ofi_gpu_pipeline.c @@ -0,0 +1,576 @@ +/* + * Copyright (C) by Argonne National Laboratory + * See COPYRIGHT in top-level directory + */ + +#include "mpidimpl.h" +#include "ofi_impl.h" +#include "mpir_async_things.h" + +struct chunk_req { + char pad[MPIDI_REQUEST_HDR_SIZE]; + struct fi_context context[MPIDI_OFI_CONTEXT_STRUCTS]; /* fixed field, do not move */ + int event_id; /* fixed field, do not move */ + MPIR_Request *parent; /* Parent request */ + void *buf; +}; + +struct pipeline_header { + int n_chunks; + int pipeline_tag; +}; + +static void spawn_send_copy(MPIR_Async_thing * thing, MPIR_Request * sreq, MPIR_async_req * areq, + const void *buf, MPI_Aint chunk_sz); +static int start_recv_chunk(MPIR_Request * rreq, int n_chunks); +static int start_recv_copy(MPIR_Request * rreq, void *buf, MPI_Aint chunk_sz, + void *recv_buf, MPI_Aint count, MPI_Datatype datatype); + +/* ------------------------------------ + * send_alloc: allocate send chunks and start copy, may postpone as async + */ +struct send_alloc { + MPIR_Request *sreq; + const void *send_buf; + MPI_Aint count; + MPI_Datatype datatype; + MPL_pointer_attr_t attr; + MPI_Aint offset, left_sz, chunk_sz; + int n_chunks; +}; + +static int send_alloc_poll(MPIR_Async_thing * thing); + +int MPIDI_OFI_gpu_pipeline_send(MPIR_Request * sreq, const void *send_buf, + MPI_Aint count, MPI_Datatype datatype, + MPL_pointer_attr_t attr, MPI_Aint data_sz, + uint64_t cq_data, fi_addr_t remote_addr, + int vci_local, int ctx_idx, uint64_t match_bits, int pipeline_tag) +{ + int mpi_errno = MPI_SUCCESS; + + uint32_t n_chunks = 0; + MPI_Aint chunk_sz = MPIR_CVAR_CH4_OFI_GPU_PIPELINE_BUFFER_SZ; + if (data_sz <= chunk_sz) { + /* data fits in a single chunk */ + chunk_sz = data_sz; + n_chunks = 1; + } else { + n_chunks = data_sz / chunk_sz; + if (data_sz % chunk_sz > 0) { + n_chunks++; + } + } + + MPIDI_OFI_REQUEST(sreq, u.pipeline_send.num_remain) = n_chunks; + MPIDI_OFI_REQUEST(sreq, u.pipeline_send.cq_data) = cq_data; + MPIDI_OFI_REQUEST(sreq, u.pipeline_send.remote_addr) = remote_addr; + MPIDI_OFI_REQUEST(sreq, u.pipeline_send.vci_local) = vci_local; + MPIDI_OFI_REQUEST(sreq, u.pipeline_send.ctx_idx) = ctx_idx; + MPIDI_OFI_REQUEST(sreq, u.pipeline_send.match_bits) = match_bits; + MPIDI_OFI_REQUEST(sreq, u.pipeline_send.pipeline_tag) = pipeline_tag; + + struct pipeline_header hdr; + hdr.n_chunks = n_chunks; + hdr.pipeline_tag = pipeline_tag; + + /* Send the initial empty packet for matching */ + MPIDI_OFI_CALL_RETRY(fi_tinjectdata(MPIDI_OFI_global.ctx[ctx_idx].tx, + &hdr, sizeof(hdr), cq_data | MPIDI_OFI_IDATA_PIPELINE, + remote_addr, match_bits), vci_local, tinjectdata); + + struct send_alloc *p; + p = MPL_malloc(sizeof(*p), MPL_MEM_OTHER); + MPIR_Assert(p); + + p->sreq = sreq; + p->send_buf = send_buf; + p->count = count; + p->datatype = datatype; + p->attr = attr; + p->left_sz = data_sz; + p->chunk_sz = chunk_sz; + p->offset = 0; + p->n_chunks = 0; + + mpi_errno = MPIR_Async_things_add(send_alloc_poll, p); + /* TODO: kick the progress right away */ + + fn_exit: + return mpi_errno; + fn_fail: + goto fn_exit; +} + +static int send_alloc_poll(MPIR_Async_thing * thing) +{ + int num_new_chunks = 0; + struct send_alloc *p = MPIR_Async_thing_get_state(thing); + + while (p->left_sz > 0) { + void *host_buf; + MPIDU_genq_private_pool_alloc_cell(MPIDI_OFI_global.gpu_pipeline_send_pool, &host_buf); + if (host_buf == NULL) { + return (num_new_chunks == 0) ? MPIR_ASYNC_THING_NOPROGRESS : MPIR_ASYNC_THING_UPDATED; + } + MPIR_async_req async_req; + MPI_Aint chunk_sz = MPL_MIN(p->left_sz, p->chunk_sz); + MPL_gpu_engine_type_t engine_type = + MPIDI_OFI_gpu_get_send_engine_type(MPIR_CVAR_CH4_OFI_GPU_SEND_ENGINE_TYPE); + int commit = p->left_sz <= chunk_sz ? 1 : 0; + if (!commit && + !MPIR_CVAR_GPU_USE_IMMEDIATE_COMMAND_LIST && + p->n_chunks % MPIR_CVAR_CH4_OFI_GPU_PIPELINE_NUM_BUFFERS_PER_CHUNK == + MPIR_CVAR_CH4_OFI_GPU_PIPELINE_NUM_BUFFERS_PER_CHUNK - 1) + commit = 1; + int mpi_errno; + mpi_errno = MPIR_Ilocalcopy_gpu(p->send_buf, p->count, p->datatype, + p->offset, &p->attr, host_buf, chunk_sz, + MPI_BYTE, 0, NULL, MPL_GPU_COPY_D2H, engine_type, + commit, &async_req); + MPIR_Assertp(mpi_errno == MPI_SUCCESS); + + p->offset += (size_t) chunk_sz; + p->left_sz -= (size_t) chunk_sz; + p->n_chunks++; + + spawn_send_copy(thing, p->sreq, &async_req, host_buf, chunk_sz); + + num_new_chunks++; + } + /* all done */ + MPL_free(p); + return MPIR_ASYNC_THING_DONE; +}; + +/* ------------------------------------ + * send_copy: async copy before sending the chunk data + */ +struct send_copy { + MPIR_Request *sreq; + /* async handle */ + MPIR_async_req async_req; + /* for sending data */ + const void *buf; + MPI_Aint chunk_sz; +}; + +static int send_copy_poll(MPIR_Async_thing * thing); +static void send_copy_complete(MPIR_Request * sreq, const void *buf, MPI_Aint chunk_sz); + +static void spawn_send_copy(MPIR_Async_thing * thing, + MPIR_Request * sreq, MPIR_async_req * areq, + const void *buf, MPI_Aint chunk_sz) +{ + struct send_copy *p; + p = MPL_malloc(sizeof(*p), MPL_MEM_OTHER); + MPIR_Assert(p); + + p->sreq = sreq; + p->async_req = *areq; + p->buf = buf; + p->chunk_sz = chunk_sz; + + MPIR_Async_thing_spawn(thing, send_copy_poll, p); +} + +static int send_copy_poll(MPIR_Async_thing * thing) +{ + int is_done = 0; + + struct send_copy *p = MPIR_Async_thing_get_state(thing); + MPIR_async_test(&(p->async_req), &is_done); + + if (is_done) { + /* finished copy, go ahead send the data */ + send_copy_complete(p->sreq, p->buf, p->chunk_sz); + MPL_free(p); + return MPIR_ASYNC_THING_DONE; + } + + return MPIR_ASYNC_THING_NOPROGRESS; +} + +static void send_copy_complete(MPIR_Request * sreq, const void *buf, MPI_Aint chunk_sz) +{ + int mpi_errno = MPI_SUCCESS; + int vci_local = MPIDI_OFI_REQUEST(sreq, u.pipeline_send.vci_local); + + struct chunk_req *chunk_req = MPL_malloc(sizeof(struct chunk_req), MPL_MEM_BUFFER); + MPIR_Assertp(chunk_req); + + chunk_req->parent = sreq; + chunk_req->event_id = MPIDI_OFI_EVENT_SEND_GPU_PIPELINE; + chunk_req->buf = (void *) buf; + + int ctx_idx = MPIDI_OFI_REQUEST(sreq, u.pipeline_send.ctx_idx); + fi_addr_t remote_addr = MPIDI_OFI_REQUEST(sreq, u.pipeline_send.remote_addr); + uint64_t cq_data = MPIDI_OFI_REQUEST(sreq, u.pipeline_send.cq_data); + uint64_t match_bits = MPIDI_OFI_REQUEST(sreq, u.pipeline_send.pipeline_tag) | + MPIDI_OFI_GPU_PIPELINE_SEND; + MPID_THREAD_CS_ENTER(VCI, MPIDI_VCI(vci_local).lock); + MPIDI_OFI_CALL_RETRY(fi_tsenddata(MPIDI_OFI_global.ctx[ctx_idx].tx, + buf, chunk_sz, NULL /* desc */ , + cq_data, remote_addr, match_bits, + (void *) &chunk_req->context), vci_local, fi_tsenddata); + MPID_THREAD_CS_EXIT(VCI, MPIDI_VCI(vci_local).lock); + /* both send buffer and chunk_req will be freed in pipeline_send_event */ + + return; + fn_fail: + MPIR_Assert(0); +} + +/* ------------------------------------ + * send_event: callback for MPIDI_OFI_dispatch_function in ofi_events.c + */ +int MPIDI_OFI_gpu_pipeline_send_event(struct fi_cq_tagged_entry *wc, MPIR_Request * r) +{ + int mpi_errno = MPI_SUCCESS; + + struct chunk_req *chunk_req = (void *) r; + MPIR_Request *sreq = chunk_req->parent;; + void *host_buf = chunk_req->buf; + MPL_free(chunk_req); + + MPIDU_genq_private_pool_free_cell(MPIDI_OFI_global.gpu_pipeline_send_pool, host_buf); + + MPIDI_OFI_REQUEST(sreq, u.pipeline_send.num_remain) -= 1; + if (MPIDI_OFI_REQUEST(sreq, u.pipeline_send.num_remain) == 0) { + MPIR_Datatype_release_if_not_builtin(MPIDI_OFI_REQUEST(sreq, datatype)); + MPIDI_Request_complete_fast(sreq); + } + + return mpi_errno; +} + +/* ------------------------------------ + * recv_alloc: allocate recv chunk buffer and post fi_trecv + * There are actually two async things: issuing the initial recv and + * issuing recvs for the rest of the chunks. + */ + +/* the state for recv_init_alloc is just the rreq */ + +static bool issue_recv_alloc(MPIR_Request * rreq, bool is_init); +static int recv_init_alloc_poll(MPIR_Async_thing * thing); + +int MPIDI_OFI_gpu_pipeline_recv(MPIR_Request * rreq, + void *recv_buf, MPI_Aint count, MPI_Datatype datatype, + fi_addr_t remote_addr, int vci_local, + uint64_t match_bits, uint64_t mask_bits, + MPI_Aint data_sz, int ctx_idx) +{ + int mpi_errno = MPI_SUCCESS; + + /* The 1st recv is an empty chunk for matching. We need initialize rreq. */ + MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.offset) = 0; + MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.num_inrecv) = 0; + MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.num_remain) = 0; + MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.is_sync) = false; + MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.remote_addr) = remote_addr; + MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.vci_local) = vci_local; + MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.match_bits) = match_bits; + MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.mask_bits) = mask_bits; + MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.ctx_idx) = ctx_idx; + + /* Save original buf, datatype and count */ + MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.buf) = recv_buf; + MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.count) = count; + MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.datatype) = datatype; + + mpi_errno = MPIR_Async_things_add(recv_init_alloc_poll, rreq); + + return mpi_errno; +} + +static int recv_init_alloc_poll(MPIR_Async_thing * thing) +{ + MPIR_Request *rreq = MPIR_Async_thing_get_state(thing); + + int ret = issue_recv_alloc(rreq, true /* is_init */); + if (ret) { + return MPIR_ASYNC_THING_DONE; + } + + return MPIR_ASYNC_THING_NOPROGRESS; +} + +/* ---- */ +struct recv_chunk_alloc { + MPIR_Request *rreq; + int n_chunks; + int issued_chunks; +}; + +static int recv_chunk_alloc_poll(MPIR_Async_thing * thing); + +/* this is called from recv_event */ +static int start_recv_chunk(MPIR_Request * rreq, int n_chunks) +{ + int mpi_errno = MPI_SUCCESS; + + struct recv_chunk_alloc *p; + p = MPL_malloc(sizeof(*p), MPL_MEM_OTHER); + MPIR_Assert(p); + + p->rreq = rreq; + p->n_chunks = n_chunks; + p->issued_chunks = 0; + + mpi_errno = MPIR_Async_things_add(recv_chunk_alloc_poll, p); + + return mpi_errno; +} + +static int recv_chunk_alloc_poll(MPIR_Async_thing * thing) +{ + struct recv_chunk_alloc *p = MPIR_Async_thing_get_state(thing); + MPIR_Request *rreq = p->rreq; + + /* arbitrary threshold */ + if (MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.num_inrecv) > 1) { + return MPIR_ASYNC_THING_NOPROGRESS; + } + + bool ret = issue_recv_alloc(rreq, false /* is_init */); + if (ret) { + p->issued_chunks++; + if (p->issued_chunks == p->n_chunks) { + MPL_free(p); + return MPIR_ASYNC_THING_DONE; + } else { + return MPIR_ASYNC_THING_UPDATED; + } + } + + return MPIR_ASYNC_THING_NOPROGRESS; +} + +/* ---- */ +static bool issue_recv_alloc(MPIR_Request * rreq, bool is_init) +{ + void *host_buf; + MPIDU_genq_private_pool_alloc_cell(MPIDI_OFI_global.gpu_pipeline_recv_pool, &host_buf); + if (!host_buf) { + return MPIR_ASYNC_THING_NOPROGRESS; + } + + fi_addr_t remote_addr = MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.remote_addr); + int ctx_idx = MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.ctx_idx); + int vci = MPIDI_Request_get_vci(rreq); + uint64_t mask_bits = MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.mask_bits); + + struct chunk_req *chunk_req; + chunk_req = MPL_malloc(sizeof(*chunk_req), MPL_MEM_BUFFER); + MPIR_Assert(chunk_req); + + chunk_req->parent = rreq; + chunk_req->buf = host_buf; + + uint64_t match_bits; + if (is_init) { + match_bits = MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.match_bits); + chunk_req->event_id = MPIDI_OFI_EVENT_RECV_GPU_PIPELINE_INIT; + } else { + match_bits = MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.pipeline_tag) | + MPIDI_OFI_GPU_PIPELINE_SEND; + chunk_req->event_id = MPIDI_OFI_EVENT_RECV_GPU_PIPELINE; + } + MPID_THREAD_CS_ENTER(VCI, MPIDI_VCI(vci).lock); + int ret = fi_trecv(MPIDI_OFI_global.ctx[ctx_idx].rx, + host_buf, MPIR_CVAR_CH4_OFI_GPU_PIPELINE_BUFFER_SZ, NULL, remote_addr, + match_bits, mask_bits, (void *) &chunk_req->context); + MPID_THREAD_CS_EXIT(VCI, MPIDI_VCI(vci).lock); + if (ret == 0) { + MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.num_inrecv) += 1; + /* chunk_req and host_buf will be freed in recv_events */ + return true; + } + if (ret != -FI_EAGAIN && ret != -FI_ENOMEM) { + /* unexpected error */ + MPIR_Assert(0); + } + MPIDU_genq_private_pool_free_cell(MPIDI_OFI_global.gpu_pipeline_recv_pool, host_buf); + MPL_free(chunk_req); + return false; +}; + +/* ------------------------------------ + * recv_event: callback for MPIDI_OFI_dispatch_function in ofi_events.c + */ +int MPIDI_OFI_gpu_pipeline_recv_event(struct fi_cq_tagged_entry *wc, MPIR_Request * r) +{ + int mpi_errno = MPI_SUCCESS; + + struct chunk_req *chunk_req = (void *) r; + int event_id = chunk_req->event_id; + MPIR_Request *rreq = chunk_req->parent; + void *host_buf = chunk_req->buf; + + MPL_free(chunk_req); + + void *recv_buf = MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.buf); + size_t recv_count = MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.count); + MPI_Datatype datatype = MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.datatype); + + if (event_id == MPIDI_OFI_EVENT_RECV_GPU_PIPELINE_INIT) { + rreq->status.MPI_SOURCE = MPIDI_OFI_cqe_get_source(wc, true); + rreq->status.MPI_ERROR = MPIDI_OFI_idata_get_error_bits(wc->data); + rreq->status.MPI_TAG = MPIDI_OFI_init_get_tag(wc->tag); + + if (unlikely(MPIDI_OFI_is_tag_sync(wc->tag))) { + MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.is_sync) = true; + } + + bool is_pipeline = (wc->data & MPIDI_OFI_IDATA_PIPELINE); + if (!is_pipeline) { + /* message from a normal send */ + MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.num_remain) = 1; + mpi_errno = start_recv_copy(rreq, host_buf, wc->len, recv_buf, recv_count, datatype); + MPIR_ERR_CHECK(mpi_errno); + } else { + struct pipeline_header *p_hdr = host_buf; + MPIR_Assert(p_hdr->n_chunks > 0); + MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.num_remain) = p_hdr->n_chunks; + MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.pipeline_tag) = p_hdr->pipeline_tag; + /* There is no data in the init chunk, free the buffer */ + MPIDU_genq_private_pool_free_cell(MPIDI_OFI_global.gpu_pipeline_recv_pool, host_buf); + /* Post recv for the remaining chunks. */ + mpi_errno = start_recv_chunk(rreq, p_hdr->n_chunks); + MPIR_ERR_CHECK(mpi_errno); + } + } else { + MPIR_Assert(event_id == MPIDI_OFI_EVENT_RECV_GPU_PIPELINE); + MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.num_inrecv) -= 1; + mpi_errno = start_recv_copy(rreq, host_buf, wc->len, recv_buf, recv_count, datatype); + MPIR_ERR_CHECK(mpi_errno); + } + + fn_exit: + return mpi_errno; + fn_fail: + rreq->status.MPI_ERROR = mpi_errno; + goto fn_exit; +} + +/* We didn't expect pipeline protocol, e.g. recv into a host buffer, but + * the sender uses the pipeline, e.g. send from a large gpu buffer. + * We need convert the rreq, i.e. prepare u.pipeline_recv data. + */ +int MPIDI_OFI_gpu_pipeline_recv_unexp_event(struct fi_cq_tagged_entry *wc, MPIR_Request * rreq) +{ + /* Since we didn't store the information (ref. u.pipeline_recv), + * we can't issue fi_trecv for all the chunks. Just assert fail for now. + */ + MPIR_Assertp(0); + return MPI_SUCCESS; +} + +/* ------------------------------------ + * recv_copy: async copy from host_buf to user buffer in recv event + */ +struct recv_copy { + MPIR_Request *rreq; + /* async handle */ + MPIR_async_req async_req; + /* for cleanups */ + void *buf; +}; + +static int recv_copy_poll(MPIR_Async_thing * thing); +static void recv_copy_complete(MPIR_Request * rreq, void *buf); + +static int start_recv_copy(MPIR_Request * rreq, void *buf, MPI_Aint chunk_sz, + void *recv_buf, MPI_Aint count, MPI_Datatype datatype) +{ + int mpi_errno = MPI_SUCCESS; + + MPI_Aint offset = MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.offset); + int engine_type = MPIR_CVAR_CH4_OFI_GPU_PIPELINE_H2D_ENGINE_TYPE; + + /* FIXME: current design unpacks all bytes from host buffer, overflow check is missing. */ + MPIR_async_req async_req; + mpi_errno = MPIR_Ilocalcopy_gpu(buf, chunk_sz, MPI_BYTE, 0, NULL, + recv_buf, count, datatype, offset, NULL, + MPL_GPU_COPY_H2D, engine_type, 1, &async_req); + MPIR_ERR_CHECK(mpi_errno); + + MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.offset) += chunk_sz; + + struct recv_copy *p; + p = MPL_malloc(sizeof(*p), MPL_MEM_OTHER); + MPIR_Assert(p); + + p->rreq = rreq; + p->async_req = async_req; + p->buf = buf; + + mpi_errno = MPIR_Async_things_add(recv_copy_poll, p); + + fn_exit: + return mpi_errno; + fn_fail: + goto fn_exit; +} + +static int recv_copy_poll(MPIR_Async_thing * thing) +{ + int is_done = 0; + + struct recv_copy *p = MPIR_Async_thing_get_state(thing); + MPIR_async_test(&(p->async_req), &is_done); + + if (is_done) { + recv_copy_complete(p->rreq, p->buf); + MPL_free(p); + return MPIR_ASYNC_THING_DONE; + } + + return MPIR_ASYNC_THING_NOPROGRESS; +} + +static void recv_copy_complete(MPIR_Request * rreq, void *buf) +{ + int mpi_errno = MPI_SUCCESS; + MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.num_remain) -= 1; + if (MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.num_remain) == 0) { + /* all chunks arrived and copied */ + if (unlikely(MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.is_sync))) { + MPIR_Comm *comm = rreq->comm; + uint64_t ss_bits = + MPIDI_OFI_init_sendtag(MPL_atomic_relaxed_load_int + (&MPIDI_OFI_REQUEST(rreq, util_id)), + MPIR_Comm_rank(comm), rreq->status.MPI_TAG, + MPIDI_OFI_SYNC_SEND_ACK); + int r = rreq->status.MPI_SOURCE; + int vci_src = MPIDI_get_vci(SRC_VCI_FROM_RECVER, comm, r, comm->rank, + rreq->status.MPI_TAG); + int vci_dst = MPIDI_get_vci(DST_VCI_FROM_RECVER, comm, r, comm->rank, + rreq->status.MPI_TAG); + int vci_local = vci_dst; + int vci_remote = vci_src; + int nic = 0; + int ctx_idx = MPIDI_OFI_get_ctx_index(vci_local, nic); + fi_addr_t dest_addr = MPIDI_OFI_comm_to_phys(comm, r, nic, vci_remote); + MPID_THREAD_CS_ENTER(VCI, MPIDI_VCI(vci_local).lock); + MPIDI_OFI_CALL_RETRY(fi_tinjectdata(MPIDI_OFI_global.ctx[ctx_idx].tx, NULL /* buf */ , + 0 /* len */ , + MPIR_Comm_rank(comm), dest_addr, ss_bits), + vci_local, tinjectdata); + MPID_THREAD_CS_EXIT(VCI, MPIDI_VCI(vci_local).lock); + } + + MPIR_Datatype_release_if_not_builtin(MPIDI_OFI_REQUEST(rreq, datatype)); + /* Set number of bytes in status. */ + MPIR_STATUS_SET_COUNT(rreq->status, MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.offset)); + + MPIDI_Request_complete_fast(rreq); + } + + /* Free host buffer, yaksa request and task. */ + MPIDU_genq_private_pool_free_cell(MPIDI_OFI_global.gpu_pipeline_recv_pool, buf); + return; + fn_fail: + MPIR_Assertp(0); +} diff --git a/src/mpid/ch4/netmod/ofi/ofi_huge.c b/src/mpid/ch4/netmod/ofi/ofi_huge.c index 60f7efe7e93..e8377f83dfb 100644 --- a/src/mpid/ch4/netmod/ofi/ofi_huge.c +++ b/src/mpid/ch4/netmod/ofi/ofi_huge.c @@ -14,7 +14,7 @@ static int get_huge_complete(MPIR_Request * rreq); static int get_huge(MPIR_Request * rreq) { int mpi_errno = MPI_SUCCESS; - MPIDI_OFI_huge_remote_info_t *info = MPIDI_OFI_REQUEST(rreq, huge.remote_info); + MPIDI_OFI_huge_remote_info_t *info = MPIDI_OFI_REQUEST(rreq, u.recv.remote_info); MPI_Aint cur_offset; if (MPIDI_OFI_COMM(rreq->comm).enable_striping) { @@ -23,7 +23,7 @@ static int get_huge(MPIR_Request * rreq) cur_offset = MPIDI_OFI_global.max_msg_size; } - MPI_Aint data_sz = MPIDI_OFI_REQUEST(rreq, util.iov.iov_len); + MPI_Aint data_sz = MPIDI_OFI_REQUEST(rreq, u.recv.msg_iov.iov_len); if (data_sz < info->msgsize) { rreq->status.MPI_ERROR = MPI_ERR_TRUNCATE; @@ -57,7 +57,7 @@ static uintptr_t recv_rbase(MPIDI_OFI_huge_remote_info_t * remote_info) static int get_huge_issue_read(MPIR_Request * rreq) { int mpi_errno = MPI_SUCCESS; - MPIDI_OFI_huge_remote_info_t *info = MPIDI_OFI_REQUEST(rreq, huge.remote_info); + MPIDI_OFI_huge_remote_info_t *info = MPIDI_OFI_REQUEST(rreq, u.recv.remote_info); MPIR_Comm *comm = rreq->comm; MPIR_FUNC_ENTER; @@ -69,7 +69,7 @@ static int get_huge_issue_read(MPIR_Request * rreq) } bytesLeft = info->msgsize - cur_offset; - void *recv_buf = MPIDI_OFI_REQUEST(rreq, util.iov.iov_base); + void *recv_buf = MPIDI_OFI_REQUEST(rreq, u.recv.msg_iov.iov_base); MPI_Aint chunk_size; if (MPIDI_OFI_COMM(comm).enable_striping) { @@ -142,7 +142,7 @@ static int get_huge_complete(MPIR_Request * rreq) int mpi_errno = MPI_SUCCESS; MPIR_FUNC_ENTER; - MPIDI_OFI_huge_remote_info_t *info = MPIDI_OFI_REQUEST(rreq, huge.remote_info); + MPIDI_OFI_huge_remote_info_t *info = MPIDI_OFI_REQUEST(rreq, u.recv.remote_info); /* note: it's receiver ack sender */ int vci_remote = info->vci_src; @@ -193,7 +193,7 @@ int MPIDI_OFI_recv_huge_event(int vci, struct fi_cq_tagged_entry *wc, MPIR_Reque comm_ptr = rreq->comm; MPIR_T_PVAR_COUNTER_INC(MULTINIC, nic_recvd_bytes_count[MPIDI_OFI_REQUEST(rreq, nic_num)], wc->len); - if (MPIDI_OFI_REQUEST(rreq, huge.remote_info)) { + if (MPIDI_OFI_REQUEST(rreq, u.recv.remote_info)) { /* this is mrecv, we already got remote info */ ready_to_get = true; } else { @@ -205,7 +205,7 @@ int MPIDI_OFI_recv_huge_event(int vci, struct fi_cq_tagged_entry *wc, MPIR_Reque LL_FOREACH(MPIDI_OFI_global.per_vci[vci].huge_ctrl_head, list_ptr) { if (list_ptr->comm_id == comm_id && list_ptr->rank == rank && list_ptr->tag == tag) { - MPIDI_OFI_REQUEST(rreq, huge.remote_info) = list_ptr->u.info; + MPIDI_OFI_REQUEST(rreq, u.recv.remote_info) = list_ptr->u.info; LL_DELETE(MPIDI_OFI_global.per_vci[vci].huge_ctrl_head, MPIDI_OFI_global.per_vci[vci].huge_ctrl_tail, list_ptr); MPL_free(list_ptr); @@ -287,12 +287,12 @@ int MPIDI_OFI_recv_huge_control(int vci, MPIR_Context_id_t comm_id, int rank, in /* let MPIDI_OFI_recv_huge_event finish the recv */ } else if (MPIDI_OFI_REQUEST(rreq, kind) == MPIDI_OFI_req_kind__mprobe) { /* attach info and finish the mprobe */ - MPIDI_OFI_REQUEST(rreq, huge.remote_info) = info; + MPIDI_OFI_REQUEST(rreq, u.recv.remote_info) = info; MPIR_STATUS_SET_COUNT(rreq->status, info->msgsize); MPL_atomic_release_store_int(&(MPIDI_OFI_REQUEST(rreq, util_id)), MPIDI_OFI_PEEK_FOUND); } else { /* attach info and finish recv */ - MPIDI_OFI_REQUEST(rreq, huge.remote_info) = info; + MPIDI_OFI_REQUEST(rreq, u.recv.remote_info) = info; mpi_errno = get_huge(rreq); MPIR_ERR_CHECK(mpi_errno); } @@ -328,7 +328,7 @@ int MPIDI_OFI_peek_huge_event(int vci, struct fi_cq_tagged_entry *wc, MPIR_Reque } if (found_msg) { if (MPIDI_OFI_REQUEST(rreq, kind) == MPIDI_OFI_req_kind__mprobe) { - MPIDI_OFI_REQUEST(rreq, huge.remote_info) = list_ptr->u.info; + MPIDI_OFI_REQUEST(rreq, u.recv.remote_info) = list_ptr->u.info; LL_DELETE(MPIDI_OFI_global.per_vci[vci].huge_ctrl_head, MPIDI_OFI_global.per_vci[vci].huge_ctrl_tail, list_ptr); MPL_free(list_ptr); diff --git a/src/mpid/ch4/netmod/ofi/ofi_impl.h b/src/mpid/ch4/netmod/ofi/ofi_impl.h index 493980377f5..3bb2852160e 100644 --- a/src/mpid/ch4/netmod/ofi/ofi_impl.h +++ b/src/mpid/ch4/netmod/ofi/ofi_impl.h @@ -595,7 +595,7 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_multx_receiver_nic_index(MPIR_Comm * comm return nic_idx; } -/* cq bufferring routines -- +/* cq buffering routines -- * in particular, when we encounter EAGAIN error during progress, such as during * active message handling, recursively calling progress may result in unpredictable * behaviors (e.g. stack overflow). Thus we need use the cq buffering to avoid @@ -827,318 +827,56 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_gpu_free_pack_buffer(void *ptr) } } -MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_gpu_pipeline_chunk_size(size_t data_sz) +MPL_STATIC_INLINE_PREFIX MPL_gpu_engine_type_t MPIDI_OFI_gpu_get_send_engine_type(int cvar) { - int chunk_size = MPIR_CVAR_CH4_OFI_GPU_PIPELINE_BUFFER_SZ; - if (data_sz <= MPIR_CVAR_CH4_OFI_GPU_PIPELINE_BUFFER_SZ) { - chunk_size = data_sz; - } - return chunk_size; -} - -MPL_STATIC_INLINE_PREFIX MPIDI_OFI_gpu_task_t *MPIDI_OFI_create_gpu_task(MPIDI_OFI_pipeline_type_t - type, void *buf, - size_t len, - MPIR_Request * request, - MPIR_gpu_req yreq) -{ - MPIDI_OFI_gpu_task_t *task = - (MPIDI_OFI_gpu_task_t *) MPL_malloc(sizeof(MPIDI_OFI_gpu_task_t), MPL_MEM_OTHER); - MPIR_Assert(task != NULL); - task->type = type; - task->status = MPIDI_OFI_PIPELINE_READY; - task->buf = buf; - task->len = len; - task->request = request; - task->yreq = yreq; - task->prev = NULL; - task->next = NULL; - return task; -} - -MPL_STATIC_INLINE_PREFIX MPIDI_OFI_gpu_pending_recv_t - * MPIDI_OFI_create_recv_task(MPIDI_OFI_gpu_pipeline_request * req, int idx, int n_chunks) -{ - MPIDI_OFI_gpu_pending_recv_t *task = - (MPIDI_OFI_gpu_pending_recv_t *) MPL_malloc(sizeof(MPIDI_OFI_gpu_pending_recv_t), - MPL_MEM_OTHER); - MPIR_Assert(task); - task->req = req; - task->idx = idx; - task->n_chunks = n_chunks; - task->prev = NULL; - task->next = NULL; - return task; -} - -MPL_STATIC_INLINE_PREFIX MPIDI_OFI_gpu_pending_send_t *MPIDI_OFI_create_send_task(MPIR_Request * - req, - void *send_buf, - MPL_pointer_attr_t - attr, - MPI_Aint left_sz, - MPI_Aint count, - int dt_contig) -{ - MPIDI_OFI_gpu_pending_send_t *task = - (MPIDI_OFI_gpu_pending_send_t *) MPL_malloc(sizeof(MPIDI_OFI_gpu_pending_send_t), - MPL_MEM_OTHER); - MPIR_Assert(task); - task->sreq = req; - task->attr = attr; - task->send_buf = send_buf; - task->offset = 0; - task->n_chunks = 0; - task->left_sz = left_sz; - task->count = count; - task->dt_contig = dt_contig; - task->prev = NULL; - task->next = NULL; - return task; -} - -static int MPIDI_OFI_gpu_progress_task(MPIDI_OFI_gpu_task_t * gpu_queue[], int vni); - -static int MPIDI_OFI_gpu_progress_send(void) -{ - int mpi_errno = MPI_SUCCESS; - int engine_type = MPIR_CVAR_CH4_OFI_GPU_PIPELINE_D2H_ENGINE_TYPE; - - while (MPIDI_OFI_global.gpu_send_queue) { - char *host_buf = NULL; - MPI_Aint chunk_sz; - int vci_local = -1; - - MPIDI_OFI_gpu_pending_send_t *send_task = MPIDI_OFI_global.gpu_send_queue; - MPI_Datatype datatype = MPIDI_OFI_REQUEST(send_task->sreq, datatype); - int block_sz = MPIDI_OFI_REQUEST(send_task->sreq, pipeline_info.chunk_sz); - while (send_task->left_sz > 0) { - MPIDI_OFI_gpu_task_t *task = NULL; - chunk_sz = send_task->left_sz > block_sz ? block_sz : send_task->left_sz; - host_buf = NULL; - MPIDU_genq_private_pool_alloc_cell(MPIDI_OFI_global.gpu_pipeline_send_pool, - (void **) &host_buf); - if (host_buf == NULL) { - goto fn_exit; - } - MPI_Aint actual_pack_bytes; - MPIR_gpu_req yreq; - int commit = send_task->left_sz <= chunk_sz ? 1 : 0; - if (!commit && - !MPIR_CVAR_GPU_USE_IMMEDIATE_COMMAND_LIST && - send_task->n_chunks % MPIR_CVAR_CH4_OFI_GPU_PIPELINE_NUM_BUFFERS_PER_CHUNK == - MPIR_CVAR_CH4_OFI_GPU_PIPELINE_NUM_BUFFERS_PER_CHUNK - 1) - commit = 1; - mpi_errno = - MPIR_Ilocalcopy_gpu((char *) send_task->send_buf, send_task->count, datatype, - send_task->offset, &send_task->attr, host_buf, chunk_sz, - MPI_BYTE, 0, NULL, MPL_GPU_COPY_D2H, engine_type, - commit, &yreq); - MPIR_ERR_CHECK(mpi_errno); - actual_pack_bytes = chunk_sz; - task = - MPIDI_OFI_create_gpu_task(MPIDI_OFI_PIPELINE_SEND, host_buf, actual_pack_bytes, - send_task->sreq, yreq); - send_task->offset += (size_t) actual_pack_bytes; - send_task->left_sz -= (size_t) actual_pack_bytes; - vci_local = MPIDI_OFI_REQUEST(send_task->sreq, pipeline_info.vci_local); - MPIR_Assert(vci_local < MPIDI_CH4_MAX_VCIS); - DL_APPEND(MPIDI_OFI_global.gpu_send_task_queue[vci_local], task); - send_task->n_chunks++; - /* Increase request completion cnt, cc is 1 more than necessary - * to prevent parent request being freed prematurally. */ - MPIR_cc_inc(send_task->sreq->cc_ptr); - } - /* all done, decrease cc by 1 to allow parent request to be freed - * when complete */ - MPIR_cc_dec(send_task->sreq->cc_ptr); - /* Update correct number of chunks in immediate data. */ - MPIDI_OFI_idata_set_gpuchunk_bits(&MPIDI_OFI_REQUEST - (send_task->sreq, pipeline_info.cq_data), - send_task->n_chunks); - DL_DELETE(MPIDI_OFI_global.gpu_send_queue, send_task); - MPL_free(send_task); - - if (vci_local != -1) - MPIDI_OFI_gpu_progress_task(MPIDI_OFI_global.gpu_send_task_queue, vci_local); - + if (cvar == MPIR_CVAR_CH4_OFI_GPU_SEND_ENGINE_TYPE_compute) { + return MPL_GPU_ENGINE_TYPE_COMPUTE; + } else if (cvar == MPIR_CVAR_CH4_OFI_GPU_SEND_ENGINE_TYPE_copy_high_bandwidth) { + return MPL_GPU_ENGINE_TYPE_COPY_HIGH_BANDWIDTH; + } else if (cvar == MPIR_CVAR_CH4_OFI_GPU_SEND_ENGINE_TYPE_copy_low_latency) { + return MPL_GPU_ENGINE_TYPE_COPY_LOW_LATENCY; + } else { + return MPL_GPU_ENGINE_TYPE_LAST; } - - fn_exit: - return mpi_errno; - fn_fail: - mpi_errno = MPI_ERR_OTHER; - goto fn_exit; } -MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_gpu_progress_recv(void) +MPL_STATIC_INLINE_PREFIX MPL_gpu_engine_type_t MPIDI_OFI_gpu_get_recv_engine_type(int cvar) { - int mpi_errno = MPI_SUCCESS; - - while (MPIDI_OFI_global.gpu_recv_queue) { - MPIDI_OFI_gpu_pending_recv_t *recv_task = MPIDI_OFI_global.gpu_recv_queue; - MPIDI_OFI_gpu_pipeline_request *chunk_req = recv_task->req; - MPIR_Request *rreq = chunk_req->parent; - void *host_buf = chunk_req->buf; - if (!host_buf) { - MPIDU_genq_private_pool_alloc_cell(MPIDI_OFI_global.gpu_pipeline_recv_pool, - (void **) &host_buf); - if (!host_buf) { - break; - } - chunk_req->buf = host_buf; - } - fi_addr_t remote_addr = MPIDI_OFI_REQUEST(rreq, pipeline_info.remote_addr); - - int ret = fi_trecv(MPIDI_OFI_global.ctx[MPIDI_OFI_REQUEST(rreq, pipeline_info.ctx_idx)].rx, - (void *) host_buf, - MPIR_CVAR_CH4_OFI_GPU_PIPELINE_BUFFER_SZ, NULL, remote_addr, - MPIDI_OFI_REQUEST(rreq, - pipeline_info.match_bits) | - MPIDI_OFI_GPU_PIPELINE_SEND, - MPIDI_OFI_REQUEST(rreq, pipeline_info.mask_bits), - (void *) &chunk_req->context); - if (ret == 0) { - DL_DELETE(MPIDI_OFI_global.gpu_recv_queue, recv_task); - MPL_free(recv_task); - } else if (ret == -FI_EAGAIN || ret == -FI_ENOMEM) { - break; - } else { - goto fn_fail; - } + if (cvar == MPIR_CVAR_CH4_OFI_GPU_RECEIVE_ENGINE_TYPE_compute) { + return MPL_GPU_ENGINE_TYPE_COMPUTE; + } else if (cvar == MPIR_CVAR_CH4_OFI_GPU_RECEIVE_ENGINE_TYPE_copy_high_bandwidth) { + return MPL_GPU_ENGINE_TYPE_COPY_HIGH_BANDWIDTH; + } else if (cvar == MPIR_CVAR_CH4_OFI_GPU_RECEIVE_ENGINE_TYPE_copy_low_latency) { + return MPL_GPU_ENGINE_TYPE_COPY_LOW_LATENCY; + } else { + return MPL_GPU_ENGINE_TYPE_LAST; } - - fn_exit: - return mpi_errno; - fn_fail: - mpi_errno = MPI_ERR_OTHER; - goto fn_exit; } -static int MPIDI_OFI_gpu_progress_task(MPIDI_OFI_gpu_task_t * gpu_queue[], int vni) +MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_cqe_get_source(struct fi_cq_tagged_entry *wc, bool has_err) { - int mpi_errno = MPI_SUCCESS; - MPIDI_OFI_gpu_task_t *task = NULL; - MPIDI_OFI_gpu_task_t *tmp; - - DL_FOREACH_SAFE(gpu_queue[vni], task, tmp) { - if (task->status == MPIDI_OFI_PIPELINE_EXEC) { - /* Avoid the deadlock of re-launching an executing OFI task. */ - goto fn_exit; - } - - MPIR_gpu_req *yreq = &task->yreq; - int completed = 0; - if (yreq->type == MPIR_GPU_REQUEST) { - mpi_errno = MPL_gpu_test(&yreq->u.gpu_req, &completed); - MPIR_ERR_CHECK(mpi_errno); - } else if (yreq->type == MPIR_TYPEREP_REQUEST) { - MPIR_Typerep_test(yreq->u.y_req, &completed); - } else { - completed = 1; - } - if (completed == 1) { - /* GPU transfer completes. */ - task->status = MPIDI_OFI_PIPELINE_EXEC; - MPIR_Request *request = task->request; - - if (task->type == MPIDI_OFI_PIPELINE_SEND) { - MPIDI_OFI_gpu_pipeline_request *chunk_req = (MPIDI_OFI_gpu_pipeline_request *) - MPL_malloc(sizeof(MPIDI_OFI_gpu_pipeline_request), MPL_MEM_BUFFER); - MPIR_ERR_CHKANDJUMP1(chunk_req == NULL, mpi_errno, MPI_ERR_OTHER, "**nomem", - "**nomem %s", "GPU pipelining chunk_req alloc"); - chunk_req->parent = request; - chunk_req->event_id = MPIDI_OFI_EVENT_SEND_GPU_PIPELINE; - chunk_req->buf = task->buf; - MPIDI_OFI_CALL_RETRY(fi_tsenddata - (MPIDI_OFI_global.ctx - [MPIDI_OFI_REQUEST(request, pipeline_info.ctx_idx)].tx, - task->buf, task->len, NULL /* desc */ , - MPIDI_OFI_REQUEST(request, pipeline_info.cq_data), - MPIDI_OFI_REQUEST(request, pipeline_info.remote_addr), - MPIDI_OFI_REQUEST(request, - pipeline_info.match_bits) | - MPIDI_OFI_GPU_PIPELINE_SEND, (void *) &chunk_req->context), - vni, fi_tsenddata); - DL_DELETE(gpu_queue[vni], task); - MPL_free(task); - } else { - MPIR_Assert(task->type == MPIDI_OFI_PIPELINE_RECV); - int c; - MPIR_cc_decr(request->cc_ptr, &c); - if (c == 0) { - /* If synchronous, ack and complete when the ack is done */ - if (unlikely(MPIDI_OFI_REQUEST(request, pipeline_info.is_sync))) { - MPIR_Comm *comm = request->comm; - uint64_t ss_bits = - MPIDI_OFI_init_sendtag(MPL_atomic_relaxed_load_int - (&MPIDI_OFI_REQUEST(request, util_id)), - MPIR_Comm_rank(comm), request->status.MPI_TAG, - MPIDI_OFI_SYNC_SEND_ACK); - int r = request->status.MPI_SOURCE; - int vci_src = MPIDI_get_vci(SRC_VCI_FROM_RECVER, comm, r, comm->rank, - request->status.MPI_TAG); - int vci_dst = MPIDI_get_vci(DST_VCI_FROM_RECVER, comm, r, comm->rank, - request->status.MPI_TAG); - int vci_local = vci_dst; - int vci_remote = vci_src; - int nic = 0; - int ctx_idx = MPIDI_OFI_get_ctx_index(vci_local, nic); - fi_addr_t dest_addr = MPIDI_OFI_comm_to_phys(comm, r, nic, vci_remote); - MPIDI_OFI_CALL_RETRY(fi_tinjectdata - (MPIDI_OFI_global.ctx[ctx_idx].tx, NULL /* buf */ , - 0 /* len */ , - MPIR_Comm_rank(comm), dest_addr, ss_bits), - vci_local, tinjectdata); - } - - MPIR_Datatype_release_if_not_builtin(MPIDI_OFI_REQUEST(request, datatype)); - /* Set number of bytes in status. */ - MPIR_STATUS_SET_COUNT(request->status, - MPIDI_OFI_REQUEST(request, pipeline_info.offset)); - - MPIR_Request_free(request); - } - - /* For recv, now task can be deleted from DL. */ - DL_DELETE(gpu_queue[vni], task); - /* Free host buffer, yaksa request and task. */ - if (task->type == MPIDI_OFI_PIPELINE_RECV) - MPIDU_genq_private_pool_free_cell(MPIDI_OFI_global.gpu_pipeline_recv_pool, - task->buf); - else - MPIDI_OFI_gpu_free_pack_buffer(task->buf); - MPL_free(task); - } - } else { - goto fn_exit; + if (MPIDI_OFI_ENABLE_DATA) { + if (unlikely(has_err)) { + return wc->data & ((1 << MPIDI_OFI_IDATA_SRC_BITS) - 1); } + return wc->data; + } else { + return MPIDI_OFI_init_get_source(wc->tag); } - - fn_exit: - return mpi_errno; - fn_fail: - mpi_errno = MPI_ERR_OTHER; - goto fn_exit; } -MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_gpu_progress(int vni) -{ - int mpi_errno = MPI_SUCCESS; - - mpi_errno = MPIDI_OFI_gpu_progress_task(MPIDI_OFI_global.gpu_recv_task_queue, vni); - MPIR_ERR_CHECK(mpi_errno); - mpi_errno = MPIDI_OFI_gpu_progress_task(MPIDI_OFI_global.gpu_send_task_queue, vni); - MPIR_ERR_CHECK(mpi_errno); - mpi_errno = MPIDI_OFI_gpu_progress_send(); - MPIR_ERR_CHECK(mpi_errno); - mpi_errno = MPIDI_OFI_gpu_progress_recv(); - MPIR_ERR_CHECK(mpi_errno); - - fn_exit: - return mpi_errno; - fn_fail: - goto fn_exit; -} +int MPIDI_OFI_gpu_pipeline_send(MPIR_Request * sreq, const void *send_buf, + MPI_Aint count, MPI_Datatype datatype, + MPL_pointer_attr_t attr, MPI_Aint data_sz, + uint64_t cq_data, fi_addr_t remote_addr, + int vci_local, int ctx_idx, uint64_t match_bits, int pipeline_tag); +int MPIDI_OFI_gpu_pipeline_recv(MPIR_Request * rreq, + void *recv_buf, MPI_Aint count, MPI_Datatype datatype, + fi_addr_t remote_addr, int vci_local, + uint64_t match_bits, uint64_t mask_bits, + MPI_Aint data_sz, int ctx_idx); +int MPIDI_OFI_gpu_pipeline_send_event(struct fi_cq_tagged_entry *wc, MPIR_Request * r); +int MPIDI_OFI_gpu_pipeline_recv_event(struct fi_cq_tagged_entry *wc, MPIR_Request * r); +int MPIDI_OFI_gpu_pipeline_recv_unexp_event(struct fi_cq_tagged_entry *wc, MPIR_Request * r); #endif /* OFI_IMPL_H_INCLUDED */ diff --git a/src/mpid/ch4/netmod/ofi/ofi_init.c b/src/mpid/ch4/netmod/ofi/ofi_init.c index 134a62cea4e..59131aea264 100644 --- a/src/mpid/ch4/netmod/ofi/ofi_init.c +++ b/src/mpid/ch4/netmod/ofi/ofi_init.c @@ -734,8 +734,6 @@ int MPIDI_OFI_init_local(int *tag_bits) host_free_registered, &MPIDI_OFI_global.gpu_pipeline_recv_pool); MPIR_ERR_CHECK(mpi_errno); - MPIDI_OFI_global.gpu_send_queue = NULL; - MPIDI_OFI_global.gpu_recv_queue = NULL; } /* Initialize RMA keys allocator */ diff --git a/src/mpid/ch4/netmod/ofi/ofi_pre.h b/src/mpid/ch4/netmod/ofi/ofi_pre.h index 8ccdb44a499..d7b67ae123d 100644 --- a/src/mpid/ch4/netmod/ofi/ofi_pre.h +++ b/src/mpid/ch4/netmod/ofi/ofi_pre.h @@ -48,6 +48,7 @@ typedef struct { int enable_striping; /* Flag to enable striping per communicator. */ int enable_hashing; /* Flag to enable hashing per communicator. */ int *pref_nic; /* Array to specify the preferred NIC for each rank (if needed) */ + int pipeline_tag; /* match_bits for gpu_pipeline chunks */ } MPIDI_OFI_comm_t; enum { MPIDI_AMTYPE_NONE = 0, @@ -200,37 +201,63 @@ typedef struct { * if needed. */ enum MPIDI_OFI_req_kind kind; union { - struct fid_mr **send_mrs; - void *remote_info; - } huge; - union { + /* send path */ + struct { + void *pack_buffer; + } pack_send; + struct { + struct iovec *iovs; + } nopack_send; + struct { + struct fid_mr **mrs; + void *pack_buffer; + } huge_send; + struct { + int vci_local; + int ctx_idx; + fi_addr_t remote_addr; + uint64_t cq_data; + uint64_t match_bits; + int pipeline_tag; + int num_remain; + } pipeline_send; + struct { + void *inject_buf; + } am_inject_emu; + + /* The recv path can be uncertain depend on the actual send path, + * thus some fields are significant and need be preset to NULL. + */ struct { + void *remote_info; /* huge path if not NULL */ + char *pack_buffer; /* need unpack if not NULL */ void *buf; - size_t count; + MPI_Aint count; MPI_Datatype datatype; - char *pack_buffer; - } pack; - struct iovec *nopack; - } noncontig; - union { - struct iovec iov; - void *inject_buf; /* Internal buffer for inject emulation */ - } util; - struct { - fi_addr_t remote_addr; - int ctx_idx; - int vci_local; - int chunk_sz; - bool is_sync; - uint64_t cq_data; - uint64_t match_bits; - uint64_t mask_bits; - size_t offset; - size_t data_sz; - char *pack_recv_buf; - void *usm_host_buf; /* recv */ - MPIR_Request *req; - } pipeline_info; /* GPU pipeline */ + struct iovec msg_iov; /* FI_CLAIM require fi_trecvmsg which require usage of iov. + * We always set it with {recv_buf, data_sz} since they are + * useful for the huge recv path as well. + */ + } recv; + struct { + struct iovec *iovs; + } nopack_recv; + struct { + int vci_local; + int ctx_idx; + fi_addr_t remote_addr; + uint64_t match_bits; + uint64_t mask_bits; + MPI_Aint offset; + int pipeline_tag; + int num_inrecv; + int num_remain; + bool is_sync; + void *buf; + MPI_Aint count; + MPI_Datatype datatype; + } pipeline_recv; + } u; } MPIDI_OFI_request_t; typedef struct { diff --git a/src/mpid/ch4/netmod/ofi/ofi_probe.h b/src/mpid/ch4/netmod/ofi/ofi_probe.h index a1267133975..0414096714e 100644 --- a/src/mpid/ch4/netmod/ofi/ofi_probe.h +++ b/src/mpid/ch4/netmod/ofi/ofi_probe.h @@ -49,7 +49,7 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_do_iprobe(int source, } else { MPIDI_OFI_REQUEST(rreq, kind) = MPIDI_OFI_req_kind__probe; } - MPIDI_OFI_REQUEST(rreq, huge.remote_info) = NULL; + MPIDI_OFI_REQUEST(rreq, u.recv.remote_info) = NULL; rreq->comm = comm; MPIR_Comm_add_ref(comm); diff --git a/src/mpid/ch4/netmod/ofi/ofi_progress.h b/src/mpid/ch4/netmod/ofi/ofi_progress.h index bf87b13ca88..7a24e0f8562 100644 --- a/src/mpid/ch4/netmod/ofi/ofi_progress.h +++ b/src/mpid/ch4/netmod/ofi/ofi_progress.h @@ -82,12 +82,9 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_NM_progress(int vci, int *made_progress) * to do, so simply return. * NOTE: it is not an error since global progress will poll every vci. */ - return MPI_SUCCESS; + goto fn_exit; } - mpi_errno = MPIDI_OFI_gpu_progress(vci); - MPIR_ERR_CHECK(mpi_errno); - if (unlikely(MPIDI_OFI_has_cq_buffered(vci))) { int num = MPIDI_OFI_get_buffered(vci, wc); mpi_errno = MPIDI_OFI_handle_cq_entries(vci, wc, num); @@ -113,8 +110,6 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_NM_progress(int vci, int *made_progress) fn_exit: MPIR_FUNC_EXIT; return mpi_errno; - fn_fail: - goto fn_exit; } #endif /* OFI_PROGRESS_H_INCLUDED */ diff --git a/src/mpid/ch4/netmod/ofi/ofi_recv.h b/src/mpid/ch4/netmod/ofi/ofi_recv.h index fd00fc42fbb..f5beb6fd8ec 100644 --- a/src/mpid/ch4/netmod/ofi/ofi_recv.h +++ b/src/mpid/ch4/netmod/ofi/ofi_recv.h @@ -56,18 +56,18 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_recv_iov(void *buf, MPI_Aint count, size_ flags = FI_COMPLETION; } - size = num_contig * sizeof(struct iovec) + sizeof(*(MPIDI_OFI_REQUEST(rreq, noncontig.nopack))); + size = (num_contig + 1) * sizeof(struct iovec); - MPIDI_OFI_REQUEST(rreq, noncontig.nopack) = MPL_malloc(size, MPL_MEM_BUFFER); - memset(MPIDI_OFI_REQUEST(rreq, noncontig.nopack), 0, size); + MPIDI_OFI_REQUEST(rreq, u.nopack_recv.iovs) = MPL_malloc(size, MPL_MEM_BUFFER); + memset(MPIDI_OFI_REQUEST(rreq, u.nopack_recv.iovs), 0, size); MPI_Aint actual_iov_len; MPIR_Typerep_to_iov_offset(buf, count, MPIDI_OFI_REQUEST(rreq, datatype), 0, - MPIDI_OFI_REQUEST(rreq, noncontig.nopack), num_contig, + MPIDI_OFI_REQUEST(rreq, u.nopack_recv.iovs), num_contig, &actual_iov_len); assert(num_contig == actual_iov_len); - originv = &(MPIDI_OFI_REQUEST(rreq, noncontig.nopack[0])); + originv = &(MPIDI_OFI_REQUEST(rreq, u.nopack_recv.iovs[0])); if (rreq->comm == NULL) { rreq->comm = comm; @@ -153,11 +153,13 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_do_irecv(void *buf, goto fn_fail; } - *request = rreq; MPIDI_OFI_REQUEST(rreq, kind) = MPIDI_OFI_req_kind__any; + /* preset some fields to NULL */ if (!flags) { - MPIDI_OFI_REQUEST(rreq, huge.remote_info) = NULL; /* for huge recv remote info */ + /* remote_info may get set by mprobe, so exclude mrecv. */ + MPIDI_OFI_REQUEST(rreq, u.recv.remote_info) = NULL; } + MPIDI_OFI_REQUEST(rreq, u.recv.pack_buffer) = NULL; /* Calculate the correct NICs. */ receiver_nic = @@ -222,14 +224,6 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_do_irecv(void *buf, data_sz >= MPIR_CVAR_CH4_OFI_GPU_PIPELINE_THRESHOLD) { /* Pipeline path */ MPL_atomic_relaxed_store_int(&MPIDI_OFI_REQUEST(rreq, util_id), context_id); - MPIDI_OFI_REQUEST(rreq, event_id) = MPIDI_OFI_EVENT_RECV_GPU_PIPELINE_INIT; - /* Only post first recv with pipeline chunk size. */ - char *host_buf = NULL; - MPIDU_genq_private_pool_force_alloc_cell(MPIDI_OFI_global.gpu_pipeline_recv_pool, - (void **) &host_buf); - MPIR_ERR_CHKANDJUMP1(host_buf == NULL, mpi_errno, - MPI_ERR_OTHER, "**nomem", "**nomem %s", - "Pipeline Init recv alloc"); fi_addr_t remote_addr; if (MPI_ANY_SOURCE == rank) @@ -241,55 +235,24 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_do_irecv(void *buf, remote_addr = MPIDI_OFI_av_to_phys(addr, sender_nic, vci_remote); } - /* Save pipeline information. */ - MPIDI_OFI_REQUEST(rreq, pipeline_info.offset) = 0; - MPIDI_OFI_REQUEST(rreq, pipeline_info.is_sync) = false; - MPIDI_OFI_REQUEST(rreq, pipeline_info.remote_addr) = remote_addr; - MPIDI_OFI_REQUEST(rreq, pipeline_info.vci_local) = vci_local; - MPIDI_OFI_REQUEST(rreq, pipeline_info.match_bits) = match_bits; - MPIDI_OFI_REQUEST(rreq, pipeline_info.mask_bits) = mask_bits; - MPIDI_OFI_REQUEST(rreq, pipeline_info.data_sz) = data_sz; - MPIDI_OFI_REQUEST(rreq, pipeline_info.ctx_idx) = ctx_idx; - - /* Save original buf, datatype and count */ - MPIDI_OFI_REQUEST(rreq, noncontig.pack.pack_buffer) = host_buf; - MPIDI_OFI_REQUEST(rreq, noncontig.pack.buf) = buf; - MPIDI_OFI_REQUEST(rreq, noncontig.pack.count) = count; - MPIDI_OFI_REQUEST(rreq, noncontig.pack.datatype) = datatype; - if (rreq->comm == NULL) { rreq->comm = comm; MPIR_Comm_add_ref(comm); } - MPIDI_OFI_gpu_pipeline_request *chunk_req; - chunk_req = (MPIDI_OFI_gpu_pipeline_request *) - MPL_malloc(sizeof(MPIDI_OFI_gpu_pipeline_request), MPL_MEM_BUFFER); - MPIR_ERR_CHKANDJUMP1(chunk_req == NULL, mpi_errno, - MPI_ERR_OTHER, "**nomem", "**nomem %s", "Recv chunk_req alloc"); - chunk_req->event_id = MPIDI_OFI_EVENT_RECV_GPU_PIPELINE_INIT; - chunk_req->parent = rreq; - chunk_req->buf = host_buf; - int ret = 0; - ret = fi_trecv(MPIDI_OFI_global.ctx[ctx_idx].rx, - host_buf, - MPIR_CVAR_CH4_OFI_GPU_PIPELINE_BUFFER_SZ, - NULL, remote_addr, match_bits, mask_bits, (void *) &chunk_req->context); - if (MPIDI_OFI_global.gpu_recv_queue || !host_buf || ret != 0) { - MPIDI_OFI_gpu_pending_recv_t *recv_task = - MPIDI_OFI_create_recv_task(chunk_req, 0, -1); - DL_APPEND(MPIDI_OFI_global.gpu_recv_queue, recv_task); - } + mpi_errno = MPIDI_OFI_gpu_pipeline_recv(rreq, buf, count, datatype, + remote_addr, vci_local, + match_bits, mask_bits, data_sz, ctx_idx); goto fn_exit; } /* Unpack */ MPIDI_OFI_REQUEST(rreq, event_id) = MPIDI_OFI_EVENT_RECV_PACK; - MPIDI_OFI_REQUEST(rreq, noncontig.pack.pack_buffer) = MPL_malloc(data_sz, MPL_MEM_OTHER); - MPIR_ERR_CHKANDJUMP1(MPIDI_OFI_REQUEST(rreq, noncontig.pack.pack_buffer) == NULL, mpi_errno, + MPIDI_OFI_REQUEST(rreq, u.recv.pack_buffer) = MPL_malloc(data_sz, MPL_MEM_OTHER); + MPIR_ERR_CHKANDJUMP1(MPIDI_OFI_REQUEST(rreq, u.recv.pack_buffer) == NULL, mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %s", "Recv Pack Buffer alloc"); - recv_buf = MPIDI_OFI_REQUEST(rreq, noncontig.pack.pack_buffer); - MPIDI_OFI_REQUEST(rreq, noncontig.pack.buf) = buf; + recv_buf = MPIDI_OFI_REQUEST(rreq, u.recv.pack_buffer); + MPIDI_OFI_REQUEST(rreq, u.recv.buf) = buf; #ifdef MPL_HAVE_ZE if (dt_contig && attr.type == MPL_GPU_POINTER_DEV) { int mpl_err = MPL_SUCCESS; @@ -297,14 +260,11 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_do_irecv(void *buf, mpl_err = MPL_ze_mmap_device_pointer(buf, &attr.device_attr, attr.device, &ptr); MPIR_ERR_CHKANDJUMP(mpl_err != MPL_SUCCESS, mpi_errno, MPI_ERR_OTHER, "**mpl_ze_mmap_device_ptr"); - MPIDI_OFI_REQUEST(rreq, noncontig.pack.buf) = ptr; + MPIDI_OFI_REQUEST(rreq, u.recv.buf) = ptr; } #endif - MPIDI_OFI_REQUEST(rreq, noncontig.pack.count) = count; - MPIDI_OFI_REQUEST(rreq, noncontig.pack.datatype) = datatype; - } else { - MPIDI_OFI_REQUEST(rreq, noncontig.pack.pack_buffer) = NULL; - MPIDI_OFI_REQUEST(rreq, noncontig.nopack) = NULL; + MPIDI_OFI_REQUEST(rreq, u.recv.count) = count; + MPIDI_OFI_REQUEST(rreq, u.recv.datatype) = datatype; } if (rreq->comm == NULL) { @@ -313,8 +273,9 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_do_irecv(void *buf, } /* Read ordering unnecessary for context_id, so use relaxed load */ MPL_atomic_relaxed_store_int(&MPIDI_OFI_REQUEST(rreq, util_id), context_id); - MPIDI_OFI_REQUEST(rreq, util.iov.iov_base) = recv_buf; - MPIDI_OFI_REQUEST(rreq, util.iov.iov_len) = data_sz; + /* msg_iov is needed to use fi_trecvmsg (with FI_CLAIM) or the huge recv path */ + MPIDI_OFI_REQUEST(rreq, u.recv.msg_iov.iov_base) = recv_buf; + MPIDI_OFI_REQUEST(rreq, u.recv.msg_iov.iov_len) = data_sz; if (unlikely(data_sz >= MPIDI_OFI_global.max_msg_size) && !MPIDI_OFI_COMM(comm).enable_striping) { MPIDI_OFI_REQUEST(rreq, event_id) = MPIDI_OFI_EVENT_RECV_HUGE; @@ -342,7 +303,7 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_do_irecv(void *buf, (void *) &(MPIDI_OFI_REQUEST(rreq, context))), vci_local, trecv); } else { - msg.msg_iov = &MPIDI_OFI_REQUEST(rreq, util.iov); + msg.msg_iov = &MPIDI_OFI_REQUEST(rreq, u.recv.msg_iov); msg.desc = desc; msg.iov_count = 1; msg.tag = match_bits; @@ -475,7 +436,7 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_NM_mpi_cancel_recv(MPIR_Request * rreq, bool fi_cancel((fid_t) MPIDI_OFI_global.ctx[ctx_idx].rx, &(MPIDI_OFI_REQUEST(rreq, context))); if (is_blocking) { - /* progress until the rreq complets, either with cancel-bit set, + /* progress until the rreq completes, either with cancel-bit set, * or with message received */ while (!MPIR_cc_is_complete(&rreq->cc)) { mpi_errno = MPIDI_OFI_progress_uninlined(vci); diff --git a/src/mpid/ch4/netmod/ofi/ofi_send.h b/src/mpid/ch4/netmod/ofi/ofi_send.h index fc3eb86315f..ae1a58996e5 100644 --- a/src/mpid/ch4/netmod/ofi/ofi_send.h +++ b/src/mpid/ch4/netmod/ofi/ofi_send.h @@ -8,19 +8,6 @@ #include "ofi_impl.h" -MPL_STATIC_INLINE_PREFIX MPL_gpu_engine_type_t MPIDI_OFI_gpu_get_send_engine_type(int cvar) -{ - if (cvar == MPIR_CVAR_CH4_OFI_GPU_SEND_ENGINE_TYPE_compute) { - return MPL_GPU_ENGINE_TYPE_COMPUTE; - } else if (cvar == MPIR_CVAR_CH4_OFI_GPU_SEND_ENGINE_TYPE_copy_high_bandwidth) { - return MPL_GPU_ENGINE_TYPE_COPY_HIGH_BANDWIDTH; - } else if (cvar == MPIR_CVAR_CH4_OFI_GPU_SEND_ENGINE_TYPE_copy_low_latency) { - return MPL_GPU_ENGINE_TYPE_COPY_LOW_LATENCY; - } else { - return MPL_GPU_ENGINE_TYPE_LAST; - } -} - MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_send_lightweight(const void *buf, size_t data_sz, uint64_t cq_data, @@ -114,18 +101,18 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_send_iov(const void *buf, MPI_Aint count, flags = FI_COMPLETION | (MPIDI_OFI_ENABLE_DATA ? FI_REMOTE_CQ_DATA : 0); MPIDI_OFI_REQUEST(sreq, event_id) = MPIDI_OFI_EVENT_SEND_NOPACK; - size = num_contig * sizeof(struct iovec) + sizeof(*(MPIDI_OFI_REQUEST(sreq, noncontig.nopack))); + size = (num_contig + 1) * sizeof(struct iovec); - MPIDI_OFI_REQUEST(sreq, noncontig.nopack) = MPL_malloc(size, MPL_MEM_BUFFER); - memset(MPIDI_OFI_REQUEST(sreq, noncontig.nopack), 0, size); + MPIDI_OFI_REQUEST(sreq, u.nopack_send.iovs) = MPL_malloc(size, MPL_MEM_BUFFER); + memset(MPIDI_OFI_REQUEST(sreq, u.nopack_send.iovs), 0, size); MPI_Aint actual_iov_len; MPIR_Typerep_to_iov_offset(buf, count, MPIDI_OFI_REQUEST(sreq, datatype), 0, - MPIDI_OFI_REQUEST(sreq, noncontig.nopack), num_contig, + MPIDI_OFI_REQUEST(sreq, u.nopack_send.iovs), num_contig, &actual_iov_len); assert(num_contig == actual_iov_len); - originv = &(MPIDI_OFI_REQUEST(sreq, noncontig.nopack[0])); + originv = &(MPIDI_OFI_REQUEST(sreq, u.nopack_send.iovs[0])); msg.msg_iov = originv; msg.desc = NULL; @@ -164,6 +151,7 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_send_normal(const void *buf, MPI_Aint cou { int mpi_errno = MPI_SUCCESS; char *send_buf; + void *pack_buffer = NULL; uint64_t match_bits; bool force_gpu_pack = false; int vci_local = vci_src; @@ -274,61 +262,29 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_send_normal(const void *buf, MPI_Aint cou if (force_gpu_pack && MPIR_CVAR_CH4_OFI_ENABLE_GPU_PIPELINE && data_sz >= MPIR_CVAR_CH4_OFI_GPU_PIPELINE_THRESHOLD) { /* Pipeline path */ - uint32_t n_chunks = 0; - int chunk_size = MPIR_CVAR_CH4_OFI_GPU_PIPELINE_BUFFER_SZ; - if (dt_contig) { - /* Update correct number of chunks in immediate data. */ - chunk_size = MPIDI_OFI_gpu_pipeline_chunk_size(data_sz); - n_chunks = data_sz / chunk_size; - if (data_sz % chunk_size) - n_chunks++; - MPIDI_OFI_idata_set_gpuchunk_bits(&cq_data, n_chunks); - } + fi_addr_t remote_addr = MPIDI_OFI_av_to_phys(addr, receiver_nic, vci_remote); + MPIDI_OFI_COMM(comm).pipeline_tag += 1; + mpi_errno = MPIDI_OFI_gpu_pipeline_send(sreq, buf, count, datatype, attr, data_sz, + cq_data, remote_addr, vci_local, ctx_idx, + match_bits, MPIDI_OFI_COMM(comm).pipeline_tag); + MPIR_ERR_CHECK(mpi_errno); - /* Update sender packed bit if necessary. */ - uint64_t is_packed = datatype == MPI_PACKED ? 1 : 0; - MPIDI_OFI_idata_set_gpu_packed_bit(&cq_data, is_packed); - MPIR_ERR_CHKANDJUMP(is_packed, mpi_errno, MPI_ERR_OTHER, "**gpu_pipeline_packed"); - - /* Save pipeline information. */ - MPIDI_OFI_REQUEST(sreq, pipeline_info.chunk_sz) = chunk_size; - MPIDI_OFI_REQUEST(sreq, pipeline_info.cq_data) = cq_data; - MPIDI_OFI_REQUEST(sreq, pipeline_info.remote_addr) = - MPIDI_OFI_av_to_phys(addr, receiver_nic, vci_remote); - MPIDI_OFI_REQUEST(sreq, pipeline_info.vci_local) = vci_local; - MPIDI_OFI_REQUEST(sreq, pipeline_info.ctx_idx) = ctx_idx; - MPIDI_OFI_REQUEST(sreq, pipeline_info.match_bits) = match_bits; - MPIDI_OFI_REQUEST(sreq, pipeline_info.data_sz) = data_sz; - - /* send an empty message for tag matching */ - MPIDI_OFI_CALL_RETRY(fi_tinjectdata(MPIDI_OFI_global.ctx[ctx_idx].tx, - NULL, - 0, - cq_data, - MPIDI_OFI_REQUEST(sreq, pipeline_info.remote_addr), - match_bits), vci_local, tinjectdata); MPIR_T_PVAR_COUNTER_INC(MULTINIC, nic_sent_bytes_count[sender_nic], data_sz); - - MPIDI_OFI_gpu_pending_send_t *send_task = - MPIDI_OFI_create_send_task(sreq, (void *) buf, attr, data_sz, count, dt_contig); - DL_APPEND(MPIDI_OFI_global.gpu_send_queue, send_task); - MPIDI_OFI_gpu_progress_send(); - goto fn_exit; } /* Pack */ MPIDI_OFI_REQUEST(sreq, event_id) = MPIDI_OFI_EVENT_SEND_PACK; - MPIDI_OFI_REQUEST(sreq, noncontig.pack.pack_buffer) = MPL_malloc(data_sz, MPL_MEM_OTHER); - MPIR_ERR_CHKANDJUMP1(MPIDI_OFI_REQUEST(sreq, noncontig.pack.pack_buffer) == NULL, mpi_errno, + pack_buffer = MPL_malloc(data_sz, MPL_MEM_OTHER); + MPIR_ERR_CHKANDJUMP1(pack_buffer == NULL, mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %s", "Send Pack buffer alloc"); int fast_copy = 0; if (attr.type == MPL_GPU_POINTER_DEV && dt_contig && data_sz <= MPIR_CVAR_CH4_IPC_GPU_FAST_COPY_MAX_SIZE) { int mpl_err = MPL_gpu_fast_memcpy(send_buf, &attr, - MPIDI_OFI_REQUEST(sreq, noncontig.pack.pack_buffer), + pack_buffer, NULL, data_sz); if (mpl_err == MPL_SUCCESS) fast_copy = 1; @@ -339,21 +295,18 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_send_normal(const void *buf, MPI_Aint cou if (dt_contig && engine != MPL_GPU_ENGINE_TYPE_LAST && MPL_gpu_query_pointer_is_dev(send_buf, &attr)) { mpi_errno = MPIR_Localcopy_gpu(send_buf, data_sz, MPI_BYTE, 0, &attr, - MPIDI_OFI_REQUEST(sreq, noncontig.pack.pack_buffer), + pack_buffer, data_sz, MPI_BYTE, 0, NULL, MPL_GPU_COPY_DIRECTION_NONE, engine, true); MPIR_ERR_CHECK(mpi_errno); } else { MPI_Aint actual_pack_bytes; MPIR_Typerep_pack(buf, count, datatype, 0, - MPIDI_OFI_REQUEST(sreq, noncontig.pack.pack_buffer), data_sz, - &actual_pack_bytes, MPIR_TYPEREP_FLAG_NONE); + pack_buffer, data_sz, &actual_pack_bytes, MPIR_TYPEREP_FLAG_NONE); } } - send_buf = MPIDI_OFI_REQUEST(sreq, noncontig.pack.pack_buffer); - } else { - MPIDI_OFI_REQUEST(sreq, noncontig.pack.pack_buffer) = NULL; - MPIDI_OFI_REQUEST(sreq, noncontig.nopack) = NULL; + send_buf = pack_buffer; + MPIDI_OFI_REQUEST(sreq, u.pack_send.pack_buffer) = pack_buffer; } fi_addr_t dest_addr = MPIDI_OFI_av_to_phys(addr, receiver_nic, vci_remote); @@ -384,6 +337,7 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_send_normal(const void *buf, MPI_Aint cou } MPIR_T_PVAR_COUNTER_INC(MULTINIC, nic_sent_bytes_count[sender_nic], data_sz); } else if (unlikely(1)) { + /* is_huge_send */ int num_nics = MPIDI_OFI_global.num_nics; uint64_t rma_keys[MPIDI_OFI_MAX_NICS]; struct fid_mr **huge_send_mrs; @@ -425,7 +379,9 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_send_normal(const void *buf, MPI_Aint cou (vci_local, i)].ep, NULL); MPIR_ERR_CHECK(mpi_errno); } - MPIDI_OFI_REQUEST(sreq, huge.send_mrs) = huge_send_mrs; + MPIDI_OFI_REQUEST(sreq, u.huge_send.mrs) = huge_send_mrs; + MPIDI_OFI_REQUEST(sreq, u.huge_send.pack_buffer) = pack_buffer; + if (MPIDI_OFI_ENABLE_MR_PROV_KEY) { /* MR_BASIC */ for (int i = 0; i < num_nics; i++) { @@ -527,8 +483,8 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_send_fallback(const void *buf, MPI_Aint c iovs[0].iov_base = send_buf; iovs[0].iov_len = data_sz; - MPIDI_OFI_REQUEST(sreq, noncontig.pack.pack_buffer) = NULL; - MPIDI_OFI_REQUEST(sreq, noncontig.nopack) = NULL; + /* why do we need to set this to NULL? */ + MPIDI_OFI_REQUEST(sreq, u.pack_send.pack_buffer) = NULL; struct fi_msg_tagged msg; msg.msg_iov = iovs; diff --git a/src/mpid/ch4/netmod/ofi/ofi_types.h b/src/mpid/ch4/netmod/ofi/ofi_types.h index fe32de1918f..657fe08261a 100644 --- a/src/mpid/ch4/netmod/ofi/ofi_types.h +++ b/src/mpid/ch4/netmod/ofi/ofi_types.h @@ -37,14 +37,12 @@ #define MPIDI_OFI_IDATA_ERROR_BITS (2) /* The number of bits in the immediate data field allocated to the source rank and error propagation. */ #define MPIDI_OFI_IDATA_SRC_ERROR_BITS (MPIDI_OFI_IDATA_SRC_BITS + MPIDI_OFI_IDATA_ERROR_BITS) -/* The number of bits in the immediate data field allocated to MPI_Packed datatype for GPU. */ -#define MPIDI_OFI_IDATA_GPU_PACKED_BITS (1) -/* The offset of bits in the immediate data field allocated to number of message chunks. */ -#define MPIDI_OFI_IDATA_GPUCHUNK_OFFSET (MPIDI_OFI_IDATA_SRC_ERROR_BITS + MPIDI_OFI_IDATA_GPU_PACKED_BITS) /* Bit mask for MPIR_ERR_OTHER */ #define MPIDI_OFI_ERR_OTHER (0x1ULL) /* Bit mask for MPIR_PROC_FAILED */ #define MPIDI_OFI_ERR_PROC_FAILED (0x2ULL) +/* Bit mask for gpu pipeline */ +#define MPIDI_OFI_IDATA_PIPELINE (1ULL << 32) /* Set the error bits */ MPL_STATIC_INLINE_PREFIX void MPIDI_OFI_idata_set_error_bits(uint64_t * data_field, @@ -75,30 +73,6 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_idata_get_error_bits(uint64_t idata) } } -/* Set the gpu packed bit */ -static inline void MPIDI_OFI_idata_set_gpu_packed_bit(uint64_t * data_field, uint64_t is_packed) -{ - *data_field = (*data_field) | (is_packed << MPIDI_OFI_IDATA_SRC_ERROR_BITS); -} - -/* Get the gpu packed bit from the OFI data field. */ -static inline uint32_t MPIDI_OFI_idata_get_gpu_packed_bit(uint64_t idata) -{ - return (idata >> MPIDI_OFI_IDATA_SRC_ERROR_BITS) & 0x1ULL; -} - -/* Set gpu chunk bits */ -static inline void MPIDI_OFI_idata_set_gpuchunk_bits(uint64_t * data_field, uint64_t n_chunks) -{ - *data_field = (*data_field) | (n_chunks << MPIDI_OFI_IDATA_GPUCHUNK_OFFSET); -} - -/* Get gpu chunks from the OFI data field. */ -static inline uint32_t MPIDI_OFI_idata_get_gpuchunk_bits(uint64_t idata) -{ - return (idata >> MPIDI_OFI_IDATA_GPUCHUNK_OFFSET); -} - /* There are 4 protocol bits: * - MPIDI_DYNPROC_SEND * - MPIDI_OFI_HUGE_SEND @@ -337,43 +311,6 @@ typedef struct { struct fid_cq *cq; } MPIDI_OFI_context_t; -/* GPU pipelining */ -typedef struct { - char pad[MPIDI_REQUEST_HDR_SIZE]; - struct fi_context context[MPIDI_OFI_CONTEXT_STRUCTS]; /* fixed field, do not move */ - int event_id; /* fixed field, do not move */ - MPIR_Request *parent; /* Parent request */ - void *buf; -} MPIDI_OFI_gpu_pipeline_request; - -typedef struct MPIDI_OFI_gpu_task { - MPIDI_OFI_pipeline_type_t type; - MPIDI_OFI_pipeline_status_t status; - void *buf; - size_t len; - MPIR_Request *request; - MPIR_gpu_req yreq; - struct MPIDI_OFI_gpu_task *next, *prev; -} MPIDI_OFI_gpu_task_t; - -typedef struct MPIDI_OFI_gpu_pending_recv { - MPIDI_OFI_gpu_pipeline_request *req; - int idx; - uint32_t n_chunks; - struct MPIDI_OFI_gpu_pending_recv *next, *prev; -} MPIDI_OFI_gpu_pending_recv_t; - -typedef struct MPIDI_OFI_gpu_pending_send { - MPIR_Request *sreq; - void *send_buf; - MPL_pointer_attr_t attr; - MPI_Aint offset; - uint32_t n_chunks; - MPI_Aint left_sz, count; - int dt_contig; - struct MPIDI_OFI_gpu_pending_send *next, *prev; -} MPIDI_OFI_gpu_pending_send_t; - typedef union { MPID_Thread_mutex_t m; char cacheline[MPL_CACHELINE_SIZE]; @@ -491,10 +428,6 @@ typedef struct { /* GPU pipeline */ MPIDU_genq_private_pool_t gpu_pipeline_send_pool; MPIDU_genq_private_pool_t gpu_pipeline_recv_pool; - MPIDI_OFI_gpu_task_t *gpu_send_task_queue[MPIDI_CH4_MAX_VCIS]; - MPIDI_OFI_gpu_task_t *gpu_recv_task_queue[MPIDI_CH4_MAX_VCIS]; - MPIDI_OFI_gpu_pending_recv_t *gpu_recv_queue; - MPIDI_OFI_gpu_pending_send_t *gpu_send_queue; /* Process management and PMI globals */ int pname_set; diff --git a/src/mpid/ch4/shm/ipc/gpu/gpu_post.c b/src/mpid/ch4/shm/ipc/gpu/gpu_post.c index 0ba044992a4..538cfdfb46a 100644 --- a/src/mpid/ch4/shm/ipc/gpu/gpu_post.c +++ b/src/mpid/ch4/shm/ipc/gpu/gpu_post.c @@ -453,7 +453,7 @@ int MPIDI_GPU_ipc_fast_memcpy(MPIDI_IPCI_ipc_handle_t ipc_handle, void *dest_vad struct gpu_ipc_async { MPIR_Request *rreq; /* async handle */ - MPIR_gpu_req yreq; + MPIR_async_req async_req; /* for unmap */ void *src_buf; MPIDI_GPU_ipc_handle_t gpu_handle; @@ -465,21 +465,7 @@ static int gpu_ipc_async_poll(MPIR_Async_thing * thing) int is_done = 0; struct gpu_ipc_async *p = MPIR_Async_thing_get_state(thing); - switch (p->yreq.type) { - case MPIR_NULL_REQUEST: - /* a dummy, immediately complete */ - is_done = 1; - break; - case MPIR_TYPEREP_REQUEST: - MPIR_Typerep_test(p->yreq.u.y_req, &is_done); - break; - case MPIR_GPU_REQUEST: - err = MPL_gpu_test(&p->yreq.u.gpu_req, &is_done); - MPIR_Assertp(err == MPL_SUCCESS); - break; - default: - MPIR_Assert(0); - } + MPIR_async_test(&(p->async_req), &is_done); if (is_done) { int vci = MPIDIG_REQUEST(p->rreq, req->local_vci); @@ -498,7 +484,7 @@ static int gpu_ipc_async_poll(MPIR_Async_thing * thing) return MPIR_ASYNC_THING_NOPROGRESS; } -int MPIDI_GPU_ipc_async_start(MPIR_Request * rreq, MPIR_gpu_req * req_p, +int MPIDI_GPU_ipc_async_start(MPIR_Request * rreq, MPIR_async_req * req_p, void *src_buf, MPIDI_GPU_ipc_handle_t gpu_handle) { int mpi_errno = MPI_SUCCESS; @@ -509,9 +495,9 @@ int MPIDI_GPU_ipc_async_start(MPIR_Request * rreq, MPIR_gpu_req * req_p, p->src_buf = src_buf; p->gpu_handle = gpu_handle; if (req_p) { - p->yreq = *req_p; + p->async_req = *req_p; } else { - p->yreq.type = MPIR_NULL_REQUEST; + p->async_req.type = MPIR_NULL_REQUEST; } mpi_errno = MPIR_Async_things_add(gpu_ipc_async_poll, p); diff --git a/src/mpid/ch4/shm/ipc/gpu/gpu_post.h b/src/mpid/ch4/shm/ipc/gpu/gpu_post.h index 8a113cb5e03..78285de80bd 100644 --- a/src/mpid/ch4/shm/ipc/gpu/gpu_post.h +++ b/src/mpid/ch4/shm/ipc/gpu/gpu_post.h @@ -25,7 +25,7 @@ typedef struct { int max_subdev_id; } MPIDI_GPU_device_info_t; -int MPIDI_GPU_ipc_async_start(MPIR_Request * rreq, MPIR_gpu_req * req_p, +int MPIDI_GPU_ipc_async_start(MPIR_Request * rreq, MPIR_async_req * req_p, void *src_buf, MPIDI_GPU_ipc_handle_t gpu_handle); #endif /* GPU_POST_H_INCLUDED */ diff --git a/src/mpid/ch4/shm/ipc/src/ipc_p2p.h b/src/mpid/ch4/shm/ipc/src/ipc_p2p.h index c658f4e1a09..45f88ff7a5a 100644 --- a/src/mpid/ch4/shm/ipc/src/ipc_p2p.h +++ b/src/mpid/ch4/shm/ipc/src/ipc_p2p.h @@ -246,19 +246,19 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_IPCI_handle_lmt_recv(MPIDI_IPC_hdr * ipc_hdr, src_count = ipc_hdr->count; src_dt = src_dt_ptr->handle; } - MPIR_gpu_req yreq; + MPIR_async_req async_req; MPL_gpu_engine_type_t engine = MPIDI_IPCI_choose_engine(ipc_hdr->ipc_handle.gpu.global_dev_id, dev_id); mpi_errno = MPIR_Ilocalcopy_gpu(src_buf, src_count, src_dt, 0, NULL, MPIDIG_REQUEST(rreq, buffer), MPIDIG_REQUEST(rreq, count), MPIDIG_REQUEST(rreq, datatype), 0, &attr, - MPL_GPU_COPY_DIRECTION_NONE, engine, true, &yreq); + MPL_GPU_COPY_DIRECTION_NONE, engine, true, &async_req); MPIR_ERR_CHECK(mpi_errno); if (src_dt_ptr) { MPIR_Datatype_free(src_dt_ptr); } - mpi_errno = MPIDI_GPU_ipc_async_start(rreq, &yreq, src_buf, ipc_hdr->ipc_handle.gpu); + mpi_errno = MPIDI_GPU_ipc_async_start(rreq, &async_req, src_buf, ipc_hdr->ipc_handle.gpu); MPIR_ERR_CHECK(mpi_errno); goto fn_exit; } else if (ipc_hdr->ipc_type == MPIDI_IPCI_TYPE__NONE) {