From ad31a0fe375ad846e14b1eba75b8a9f397684489 Mon Sep 17 00:00:00 2001 From: Hui Zhou Date: Mon, 5 Feb 2024 10:43:37 -0600 Subject: [PATCH] ch4/ofi: move gpu pipeline events into ofi_gpu_pipeline.c Consolidate the gpu pipeline code. MPIDI_OFI_gpu_pipeline_request is now an internal struct in ofi_gpu_pipeline.c, rename to struct chunk_req. MPIDI_OFI_gpu_pipeline_recv_copy is now an internal function, rename to start_recv_copy. --- src/mpid/ch4/netmod/ofi/ofi_events.c | 102 +------------------ src/mpid/ch4/netmod/ofi/ofi_gpu_pipeline.c | 110 +++++++++++++++++++-- src/mpid/ch4/netmod/ofi/ofi_impl.h | 4 +- src/mpid/ch4/netmod/ofi/ofi_types.h | 37 ------- 4 files changed, 107 insertions(+), 146 deletions(-) diff --git a/src/mpid/ch4/netmod/ofi/ofi_events.c b/src/mpid/ch4/netmod/ofi/ofi_events.c index 85042ce27a3..4f31c755ed3 100644 --- a/src/mpid/ch4/netmod/ofi/ofi_events.c +++ b/src/mpid/ch4/netmod/ofi/ofi_events.c @@ -80,102 +80,6 @@ static int peek_empty_event(int vci, struct fi_cq_tagged_entry *wc, MPIR_Request return MPI_SUCCESS; } -/* GPU pipeline events */ -static int pipeline_send_event(struct fi_cq_tagged_entry *wc, MPIR_Request * r) -{ - int mpi_errno = MPI_SUCCESS; - int c; - MPIDI_OFI_gpu_pipeline_request *req; - MPIR_Request *sreq; - void *wc_buf = NULL; - MPIR_FUNC_ENTER; - - req = (MPIDI_OFI_gpu_pipeline_request *) r; - /* get original mpi request */ - sreq = req->parent; - wc_buf = req->buf; - MPIDU_genq_private_pool_free_cell(MPIDI_OFI_global.gpu_pipeline_send_pool, wc_buf); - - MPIR_cc_decr(sreq->cc_ptr, &c); - if (c == 0) { - MPIR_Datatype_release_if_not_builtin(MPIDI_OFI_REQUEST(sreq, datatype)); - MPIR_Request_free(sreq); - } - MPL_free(r); - - MPIR_FUNC_EXIT; - return mpi_errno; -} - -static int pipeline_recv_event(struct fi_cq_tagged_entry *wc, MPIR_Request * r, int event_id) -{ - int mpi_errno = MPI_SUCCESS; - int i; - MPIDI_OFI_gpu_pipeline_request *req; - MPIR_Request *rreq; - void *wc_buf = NULL; - int in_use MPL_UNUSED; - - MPIR_FUNC_ENTER; - - req = (MPIDI_OFI_gpu_pipeline_request *) r; - rreq = req->parent; - wc_buf = req->buf; - MPL_free(r); - - void *recv_buf = MPIDI_OFI_REQUEST(rreq, noncontig.pack.buf); - size_t recv_count = MPIDI_OFI_REQUEST(rreq, noncontig.pack.count); - MPI_Datatype datatype = MPIDI_OFI_REQUEST(rreq, noncontig.pack.datatype); - - if (event_id == MPIDI_OFI_EVENT_RECV_GPU_PIPELINE_INIT) { - rreq->status.MPI_SOURCE = MPIDI_OFI_cqe_get_source(wc, true); - rreq->status.MPI_ERROR = MPIDI_OFI_idata_get_error_bits(wc->data); - rreq->status.MPI_TAG = MPIDI_OFI_init_get_tag(wc->tag); - - if (unlikely(MPIDI_OFI_is_tag_sync(wc->tag))) { - MPIDI_OFI_REQUEST(rreq, pipeline_info.is_sync) = true; - } - - uint32_t packed = MPIDI_OFI_idata_get_gpu_packed_bit(wc->data); - uint32_t n_chunks = MPIDI_OFI_idata_get_gpuchunk_bits(wc->data); - if (likely(packed == 0)) { - if (wc->len > 0) { - MPIR_Assert(n_chunks == 0); - /* First chunk arrives. */ - mpi_errno = MPIDI_OFI_gpu_pipeline_recv_copy(rreq, wc_buf, wc->len, - recv_buf, recv_count, datatype); - MPIR_ERR_CHECK(mpi_errno); - } else { - /* free this chunk */ - MPIDU_genq_private_pool_free_cell(MPIDI_OFI_global.gpu_pipeline_recv_pool, wc_buf); - MPIR_Assert(n_chunks > 0); - /* Post recv for remaining chunks. */ - MPIR_cc_dec(rreq->cc_ptr); - for (i = 0; i < n_chunks; i++) { - mpi_errno = MPIDI_OFI_gpu_pipeline_recv(rreq, i, n_chunks); - MPIR_ERR_CHECK(mpi_errno); - } - } - } else { - MPIR_ERR_CHKANDJUMP(true, mpi_errno, MPI_ERR_OTHER, "**gpu_pipeline_packed"); - } - } else { - if (likely(event_id == MPIDI_OFI_EVENT_RECV_GPU_PIPELINE)) { - mpi_errno = MPIDI_OFI_gpu_pipeline_recv_copy(rreq, wc_buf, wc->len, - recv_buf, recv_count, datatype); - MPIR_ERR_CHECK(mpi_errno); - } else { - MPIR_ERR_CHKANDJUMP(true, mpi_errno, MPI_ERR_OTHER, "**gpu_pipeline_packed"); - } - } - fn_exit: - MPIR_FUNC_EXIT; - return mpi_errno; - fn_fail: - rreq->status.MPI_ERROR = mpi_errno; - goto fn_exit; -} - static int send_huge_event(int vci, struct fi_cq_tagged_entry *wc, MPIR_Request * sreq) { int mpi_errno = MPI_SUCCESS; @@ -567,13 +471,13 @@ int MPIDI_OFI_dispatch_function(int vci, struct fi_cq_tagged_entry *wc, MPIR_Req mpi_errno = am_read_event(vci, wc, req); goto fn_exit; } else if (MPIDI_OFI_REQUEST(req, event_id) == MPIDI_OFI_EVENT_SEND_GPU_PIPELINE) { - mpi_errno = pipeline_send_event(wc, req); + mpi_errno = MPIDI_OFI_gpu_pipeline_send_event(wc, req); goto fn_exit; } else if (MPIDI_OFI_REQUEST(req, event_id) == MPIDI_OFI_EVENT_RECV_GPU_PIPELINE_INIT) { - mpi_errno = pipeline_recv_event(wc, req, MPIDI_OFI_EVENT_RECV_GPU_PIPELINE_INIT); + mpi_errno = MPIDI_OFI_gpu_pipeline_recv_event(wc, req); goto fn_exit; } else if (MPIDI_OFI_REQUEST(req, event_id) == MPIDI_OFI_EVENT_RECV_GPU_PIPELINE) { - mpi_errno = pipeline_recv_event(wc, req, MPIDI_OFI_EVENT_RECV_GPU_PIPELINE); + mpi_errno = MPIDI_OFI_gpu_pipeline_recv_event(wc, req); goto fn_exit; } else if (unlikely(1)) { switch (MPIDI_OFI_REQUEST(req, event_id)) { diff --git a/src/mpid/ch4/netmod/ofi/ofi_gpu_pipeline.c b/src/mpid/ch4/netmod/ofi/ofi_gpu_pipeline.c index 6427a3fb740..191b1f3b794 100644 --- a/src/mpid/ch4/netmod/ofi/ofi_gpu_pipeline.c +++ b/src/mpid/ch4/netmod/ofi/ofi_gpu_pipeline.c @@ -6,6 +6,19 @@ #include "mpidimpl.h" #include "mpir_async_things.h" +struct chunk_req { + char pad[MPIDI_REQUEST_HDR_SIZE]; + struct fi_context context[MPIDI_OFI_CONTEXT_STRUCTS]; /* fixed field, do not move */ + int event_id; /* fixed field, do not move */ + MPIR_Request *parent; /* Parent request */ + void *buf; +}; + +static void spawn_send_copy(MPIR_Async_thing * thing, MPIR_Request * sreq, MPIR_async_req * areq, + const void *buf, MPI_Aint chunk_sz); +static int start_recv_copy(MPIR_Request * rreq, void *buf, MPI_Aint chunk_sz, + void *recv_buf, MPI_Aint count, MPI_Datatype datatype); + /* ------------------------------------ * send_alloc: allocate send chunks and start copy, may postpone as async */ @@ -20,8 +33,6 @@ struct send_alloc { }; static int send_alloc_poll(MPIR_Async_thing * thing); -static void spawn_send_copy(MPIR_Async_thing * thing, MPIR_Request * sreq, MPIR_async_req * areq, - const void *buf, MPI_Aint chunk_sz); int MPIDI_OFI_gpu_pipeline_send(MPIR_Request * sreq, const void *send_buf, MPI_Aint count, MPI_Datatype datatype, @@ -144,8 +155,7 @@ static void send_copy_complete(MPIR_Request * sreq, const void *buf, MPI_Aint ch int mpi_errno = MPI_SUCCESS; int vci_local = MPIDI_OFI_REQUEST(sreq, pipeline_info.vci_local); - MPIDI_OFI_gpu_pipeline_request *chunk_req = (MPIDI_OFI_gpu_pipeline_request *) - MPL_malloc(sizeof(MPIDI_OFI_gpu_pipeline_request), MPL_MEM_BUFFER); + struct chunk_req *chunk_req = MPL_malloc(sizeof(struct chunk_req), MPL_MEM_BUFFER); MPIR_Assertp(chunk_req); chunk_req->parent = sreq; @@ -170,12 +180,36 @@ static void send_copy_complete(MPIR_Request * sreq, const void *buf, MPI_Aint ch MPIR_Assert(0); } +/* ------------------------------------ + * send_event: callback for MPIDI_OFI_dispatch_function in ofi_events.c + */ +int MPIDI_OFI_gpu_pipeline_send_event(struct fi_cq_tagged_entry *wc, MPIR_Request * r) +{ + int mpi_errno = MPI_SUCCESS; + + struct chunk_req *chunk_req = (void *) r; + MPIR_Request *sreq = chunk_req->parent;; + void *host_buf = chunk_req->buf; + MPL_free(chunk_req); + + MPIDU_genq_private_pool_free_cell(MPIDI_OFI_global.gpu_pipeline_send_pool, host_buf); + + int c; + MPIR_cc_decr(sreq->cc_ptr, &c); + if (c == 0) { + MPIR_Datatype_release_if_not_builtin(MPIDI_OFI_REQUEST(sreq, datatype)); + MPIR_Request_free(sreq); + } + + return mpi_errno; +} + /* ------------------------------------ * recv_alloc: allocate recv chunk buffer and post fi_trecv */ struct recv_alloc { MPIR_Request *rreq; - MPIDI_OFI_gpu_pipeline_request *chunk_req; + struct chunk_req *chunk_req; int idx; int n_chunks; }; @@ -220,7 +254,7 @@ static int recv_alloc_poll(MPIR_Async_thing * thing) uint64_t match_bits = MPIDI_OFI_REQUEST(rreq, pipeline_info.match_bits); uint64_t mask_bits = MPIDI_OFI_REQUEST(rreq, pipeline_info.mask_bits); - MPIDI_OFI_gpu_pipeline_request *chunk_req; + struct chunk_req *chunk_req; chunk_req = MPL_malloc(sizeof(*chunk_req), MPL_MEM_BUFFER); MPIR_Assert(chunk_req); @@ -251,6 +285,66 @@ static int recv_alloc_poll(MPIR_Async_thing * thing) return MPIR_ASYNC_THING_NOPROGRESS; }; +/* ------------------------------------ + * recv_event: callback for MPIDI_OFI_dispatch_function in ofi_events.c + */ +int MPIDI_OFI_gpu_pipeline_recv_event(struct fi_cq_tagged_entry *wc, MPIR_Request * r) +{ + int mpi_errno = MPI_SUCCESS; + + struct chunk_req *chunk_req = (void *) r; + int event_id = chunk_req->event_id; + MPIR_Request *rreq = chunk_req->parent; + void *host_buf = chunk_req->buf; + + MPL_free(chunk_req); + + void *recv_buf = MPIDI_OFI_REQUEST(rreq, noncontig.pack.buf); + size_t recv_count = MPIDI_OFI_REQUEST(rreq, noncontig.pack.count); + MPI_Datatype datatype = MPIDI_OFI_REQUEST(rreq, noncontig.pack.datatype); + + if (event_id == MPIDI_OFI_EVENT_RECV_GPU_PIPELINE_INIT) { + rreq->status.MPI_SOURCE = MPIDI_OFI_cqe_get_source(wc, true); + rreq->status.MPI_ERROR = MPIDI_OFI_idata_get_error_bits(wc->data); + rreq->status.MPI_TAG = MPIDI_OFI_init_get_tag(wc->tag); + + if (unlikely(MPIDI_OFI_is_tag_sync(wc->tag))) { + MPIDI_OFI_REQUEST(rreq, pipeline_info.is_sync) = true; + } + + uint32_t packed = MPIDI_OFI_idata_get_gpu_packed_bit(wc->data); + uint32_t n_chunks = MPIDI_OFI_idata_get_gpuchunk_bits(wc->data); + /* ? - Not sure why sender cannot send packed data */ + MPIR_Assertp(packed == 0); + if (wc->len > 0) { + /* message from a normal send */ + MPIR_Assert(n_chunks == 0); + mpi_errno = start_recv_copy(rreq, host_buf, wc->len, recv_buf, recv_count, datatype); + MPIR_ERR_CHECK(mpi_errno); + } else { + MPIR_Assert(n_chunks > 0); + /* There is no data in the init chunk, free the buffer */ + MPIDU_genq_private_pool_free_cell(MPIDI_OFI_global.gpu_pipeline_recv_pool, host_buf); + MPIR_cc_dec(rreq->cc_ptr); + /* Post recv for the remaining chunks. */ + for (int i = 0; i < n_chunks; i++) { + mpi_errno = MPIDI_OFI_gpu_pipeline_recv(rreq, i, n_chunks); + MPIR_ERR_CHECK(mpi_errno); + } + } + } else { + MPIR_Assert(event_id == MPIDI_OFI_EVENT_RECV_GPU_PIPELINE); + mpi_errno = start_recv_copy(rreq, host_buf, wc->len, recv_buf, recv_count, datatype); + MPIR_ERR_CHECK(mpi_errno); + } + + fn_exit: + return mpi_errno; + fn_fail: + rreq->status.MPI_ERROR = mpi_errno; + goto fn_exit; +} + /* ------------------------------------ * recv_copy: async copy from host_buf to user buffer in recv event */ @@ -265,8 +359,8 @@ struct recv_copy { static int recv_copy_poll(MPIR_Async_thing * thing); static void recv_copy_complete(MPIR_Request * rreq, void *buf); -int MPIDI_OFI_gpu_pipeline_recv_copy(MPIR_Request * rreq, void *buf, MPI_Aint chunk_sz, - void *recv_buf, MPI_Aint count, MPI_Datatype datatype) +static int start_recv_copy(MPIR_Request * rreq, void *buf, MPI_Aint chunk_sz, + void *recv_buf, MPI_Aint count, MPI_Datatype datatype) { int mpi_errno = MPI_SUCCESS; diff --git a/src/mpid/ch4/netmod/ofi/ofi_impl.h b/src/mpid/ch4/netmod/ofi/ofi_impl.h index 66478af9abe..50280106362 100644 --- a/src/mpid/ch4/netmod/ofi/ofi_impl.h +++ b/src/mpid/ch4/netmod/ofi/ofi_impl.h @@ -831,8 +831,8 @@ int MPIDI_OFI_gpu_pipeline_send(MPIR_Request * sreq, const void *send_buf, MPI_Aint count, MPI_Datatype datatype, MPL_pointer_attr_t attr, MPI_Aint data_sz); int MPIDI_OFI_gpu_pipeline_recv(MPIR_Request * rreq, int idx, int n_chunks); -int MPIDI_OFI_gpu_pipeline_recv_copy(MPIR_Request * rreq, void *buf, MPI_Aint chunk_sz, - void *recv_buf, MPI_Aint count, MPI_Datatype datatype); +int MPIDI_OFI_gpu_pipeline_send_event(struct fi_cq_tagged_entry *wc, MPIR_Request * r); +int MPIDI_OFI_gpu_pipeline_recv_event(struct fi_cq_tagged_entry *wc, MPIR_Request * r); MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_gpu_pipeline_chunk_size(size_t data_sz) { diff --git a/src/mpid/ch4/netmod/ofi/ofi_types.h b/src/mpid/ch4/netmod/ofi/ofi_types.h index 0466c613e7f..46f49ea4478 100644 --- a/src/mpid/ch4/netmod/ofi/ofi_types.h +++ b/src/mpid/ch4/netmod/ofi/ofi_types.h @@ -337,43 +337,6 @@ typedef struct { struct fid_cq *cq; } MPIDI_OFI_context_t; -/* GPU pipelining */ -typedef struct { - char pad[MPIDI_REQUEST_HDR_SIZE]; - struct fi_context context[MPIDI_OFI_CONTEXT_STRUCTS]; /* fixed field, do not move */ - int event_id; /* fixed field, do not move */ - MPIR_Request *parent; /* Parent request */ - void *buf; -} MPIDI_OFI_gpu_pipeline_request; - -typedef struct MPIDI_OFI_gpu_task { - MPIDI_OFI_pipeline_type_t type; - MPIDI_OFI_pipeline_status_t status; - void *buf; - size_t len; - MPIR_Request *request; - MPIR_async_req async_req; - struct MPIDI_OFI_gpu_task *next, *prev; -} MPIDI_OFI_gpu_task_t; - -typedef struct MPIDI_OFI_gpu_pending_recv { - MPIDI_OFI_gpu_pipeline_request *req; - int idx; - uint32_t n_chunks; - struct MPIDI_OFI_gpu_pending_recv *next, *prev; -} MPIDI_OFI_gpu_pending_recv_t; - -typedef struct MPIDI_OFI_gpu_pending_send { - MPIR_Request *sreq; - void *send_buf; - MPL_pointer_attr_t attr; - MPI_Aint offset; - uint32_t n_chunks; - MPI_Aint left_sz, count; - int dt_contig; - struct MPIDI_OFI_gpu_pending_send *next, *prev; -} MPIDI_OFI_gpu_pending_send_t; - typedef union { MPID_Thread_mutex_t m; char cacheline[MPL_CACHELINE_SIZE];