Skip to content

Commit

Permalink
ch4/ofi: move gpu pipeline events into ofi_gpu_pipeline.c
Browse files Browse the repository at this point in the history
Consolidate the gpu pipeline code.

MPIDI_OFI_gpu_pipeline_request is now an internal struct in
ofi_gpu_pipeline.c, rename to struct chunk_req.

MPIDI_OFI_gpu_pipeline_recv_copy is now an internal function, rename to
start_recv_copy.
  • Loading branch information
hzhou committed Feb 6, 2024
1 parent 355058a commit ad31a0f
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 146 deletions.
102 changes: 3 additions & 99 deletions src/mpid/ch4/netmod/ofi/ofi_events.c
Original file line number Diff line number Diff line change
Expand Up @@ -80,102 +80,6 @@ static int peek_empty_event(int vci, struct fi_cq_tagged_entry *wc, MPIR_Request
return MPI_SUCCESS;
}

/* GPU pipeline events */
static int pipeline_send_event(struct fi_cq_tagged_entry *wc, MPIR_Request * r)
{
int mpi_errno = MPI_SUCCESS;
int c;
MPIDI_OFI_gpu_pipeline_request *req;
MPIR_Request *sreq;
void *wc_buf = NULL;
MPIR_FUNC_ENTER;

req = (MPIDI_OFI_gpu_pipeline_request *) r;
/* get original mpi request */
sreq = req->parent;
wc_buf = req->buf;
MPIDU_genq_private_pool_free_cell(MPIDI_OFI_global.gpu_pipeline_send_pool, wc_buf);

MPIR_cc_decr(sreq->cc_ptr, &c);
if (c == 0) {
MPIR_Datatype_release_if_not_builtin(MPIDI_OFI_REQUEST(sreq, datatype));
MPIR_Request_free(sreq);
}
MPL_free(r);

MPIR_FUNC_EXIT;
return mpi_errno;
}

static int pipeline_recv_event(struct fi_cq_tagged_entry *wc, MPIR_Request * r, int event_id)
{
int mpi_errno = MPI_SUCCESS;
int i;
MPIDI_OFI_gpu_pipeline_request *req;
MPIR_Request *rreq;
void *wc_buf = NULL;
int in_use MPL_UNUSED;

MPIR_FUNC_ENTER;

req = (MPIDI_OFI_gpu_pipeline_request *) r;
rreq = req->parent;
wc_buf = req->buf;
MPL_free(r);

void *recv_buf = MPIDI_OFI_REQUEST(rreq, noncontig.pack.buf);
size_t recv_count = MPIDI_OFI_REQUEST(rreq, noncontig.pack.count);
MPI_Datatype datatype = MPIDI_OFI_REQUEST(rreq, noncontig.pack.datatype);

if (event_id == MPIDI_OFI_EVENT_RECV_GPU_PIPELINE_INIT) {
rreq->status.MPI_SOURCE = MPIDI_OFI_cqe_get_source(wc, true);
rreq->status.MPI_ERROR = MPIDI_OFI_idata_get_error_bits(wc->data);
rreq->status.MPI_TAG = MPIDI_OFI_init_get_tag(wc->tag);

if (unlikely(MPIDI_OFI_is_tag_sync(wc->tag))) {
MPIDI_OFI_REQUEST(rreq, pipeline_info.is_sync) = true;
}

uint32_t packed = MPIDI_OFI_idata_get_gpu_packed_bit(wc->data);
uint32_t n_chunks = MPIDI_OFI_idata_get_gpuchunk_bits(wc->data);
if (likely(packed == 0)) {
if (wc->len > 0) {
MPIR_Assert(n_chunks == 0);
/* First chunk arrives. */
mpi_errno = MPIDI_OFI_gpu_pipeline_recv_copy(rreq, wc_buf, wc->len,
recv_buf, recv_count, datatype);
MPIR_ERR_CHECK(mpi_errno);
} else {
/* free this chunk */
MPIDU_genq_private_pool_free_cell(MPIDI_OFI_global.gpu_pipeline_recv_pool, wc_buf);
MPIR_Assert(n_chunks > 0);
/* Post recv for remaining chunks. */
MPIR_cc_dec(rreq->cc_ptr);
for (i = 0; i < n_chunks; i++) {
mpi_errno = MPIDI_OFI_gpu_pipeline_recv(rreq, i, n_chunks);
MPIR_ERR_CHECK(mpi_errno);
}
}
} else {
MPIR_ERR_CHKANDJUMP(true, mpi_errno, MPI_ERR_OTHER, "**gpu_pipeline_packed");
}
} else {
if (likely(event_id == MPIDI_OFI_EVENT_RECV_GPU_PIPELINE)) {
mpi_errno = MPIDI_OFI_gpu_pipeline_recv_copy(rreq, wc_buf, wc->len,
recv_buf, recv_count, datatype);
MPIR_ERR_CHECK(mpi_errno);
} else {
MPIR_ERR_CHKANDJUMP(true, mpi_errno, MPI_ERR_OTHER, "**gpu_pipeline_packed");
}
}
fn_exit:
MPIR_FUNC_EXIT;
return mpi_errno;
fn_fail:
rreq->status.MPI_ERROR = mpi_errno;
goto fn_exit;
}

static int send_huge_event(int vci, struct fi_cq_tagged_entry *wc, MPIR_Request * sreq)
{
int mpi_errno = MPI_SUCCESS;
Expand Down Expand Up @@ -567,13 +471,13 @@ int MPIDI_OFI_dispatch_function(int vci, struct fi_cq_tagged_entry *wc, MPIR_Req
mpi_errno = am_read_event(vci, wc, req);
goto fn_exit;
} else if (MPIDI_OFI_REQUEST(req, event_id) == MPIDI_OFI_EVENT_SEND_GPU_PIPELINE) {
mpi_errno = pipeline_send_event(wc, req);
mpi_errno = MPIDI_OFI_gpu_pipeline_send_event(wc, req);
goto fn_exit;
} else if (MPIDI_OFI_REQUEST(req, event_id) == MPIDI_OFI_EVENT_RECV_GPU_PIPELINE_INIT) {
mpi_errno = pipeline_recv_event(wc, req, MPIDI_OFI_EVENT_RECV_GPU_PIPELINE_INIT);
mpi_errno = MPIDI_OFI_gpu_pipeline_recv_event(wc, req);
goto fn_exit;
} else if (MPIDI_OFI_REQUEST(req, event_id) == MPIDI_OFI_EVENT_RECV_GPU_PIPELINE) {
mpi_errno = pipeline_recv_event(wc, req, MPIDI_OFI_EVENT_RECV_GPU_PIPELINE);
mpi_errno = MPIDI_OFI_gpu_pipeline_recv_event(wc, req);
goto fn_exit;
} else if (unlikely(1)) {
switch (MPIDI_OFI_REQUEST(req, event_id)) {
Expand Down
110 changes: 102 additions & 8 deletions src/mpid/ch4/netmod/ofi/ofi_gpu_pipeline.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,19 @@
#include "mpidimpl.h"
#include "mpir_async_things.h"

struct chunk_req {
char pad[MPIDI_REQUEST_HDR_SIZE];
struct fi_context context[MPIDI_OFI_CONTEXT_STRUCTS]; /* fixed field, do not move */
int event_id; /* fixed field, do not move */
MPIR_Request *parent; /* Parent request */
void *buf;
};

static void spawn_send_copy(MPIR_Async_thing * thing, MPIR_Request * sreq, MPIR_async_req * areq,
const void *buf, MPI_Aint chunk_sz);
static int start_recv_copy(MPIR_Request * rreq, void *buf, MPI_Aint chunk_sz,
void *recv_buf, MPI_Aint count, MPI_Datatype datatype);

/* ------------------------------------
* send_alloc: allocate send chunks and start copy, may postpone as async
*/
Expand All @@ -20,8 +33,6 @@ struct send_alloc {
};

static int send_alloc_poll(MPIR_Async_thing * thing);
static void spawn_send_copy(MPIR_Async_thing * thing, MPIR_Request * sreq, MPIR_async_req * areq,
const void *buf, MPI_Aint chunk_sz);

int MPIDI_OFI_gpu_pipeline_send(MPIR_Request * sreq, const void *send_buf,
MPI_Aint count, MPI_Datatype datatype,
Expand Down Expand Up @@ -144,8 +155,7 @@ static void send_copy_complete(MPIR_Request * sreq, const void *buf, MPI_Aint ch
int mpi_errno = MPI_SUCCESS;
int vci_local = MPIDI_OFI_REQUEST(sreq, pipeline_info.vci_local);

MPIDI_OFI_gpu_pipeline_request *chunk_req = (MPIDI_OFI_gpu_pipeline_request *)
MPL_malloc(sizeof(MPIDI_OFI_gpu_pipeline_request), MPL_MEM_BUFFER);
struct chunk_req *chunk_req = MPL_malloc(sizeof(struct chunk_req), MPL_MEM_BUFFER);
MPIR_Assertp(chunk_req);

chunk_req->parent = sreq;
Expand All @@ -170,12 +180,36 @@ static void send_copy_complete(MPIR_Request * sreq, const void *buf, MPI_Aint ch
MPIR_Assert(0);
}

/* ------------------------------------
* send_event: callback for MPIDI_OFI_dispatch_function in ofi_events.c
*/
int MPIDI_OFI_gpu_pipeline_send_event(struct fi_cq_tagged_entry *wc, MPIR_Request * r)
{
int mpi_errno = MPI_SUCCESS;

struct chunk_req *chunk_req = (void *) r;
MPIR_Request *sreq = chunk_req->parent;;
void *host_buf = chunk_req->buf;
MPL_free(chunk_req);

MPIDU_genq_private_pool_free_cell(MPIDI_OFI_global.gpu_pipeline_send_pool, host_buf);

int c;
MPIR_cc_decr(sreq->cc_ptr, &c);
if (c == 0) {
MPIR_Datatype_release_if_not_builtin(MPIDI_OFI_REQUEST(sreq, datatype));
MPIR_Request_free(sreq);
}

return mpi_errno;
}

/* ------------------------------------
* recv_alloc: allocate recv chunk buffer and post fi_trecv
*/
struct recv_alloc {
MPIR_Request *rreq;
MPIDI_OFI_gpu_pipeline_request *chunk_req;
struct chunk_req *chunk_req;
int idx;
int n_chunks;
};
Expand Down Expand Up @@ -220,7 +254,7 @@ static int recv_alloc_poll(MPIR_Async_thing * thing)
uint64_t match_bits = MPIDI_OFI_REQUEST(rreq, pipeline_info.match_bits);
uint64_t mask_bits = MPIDI_OFI_REQUEST(rreq, pipeline_info.mask_bits);

MPIDI_OFI_gpu_pipeline_request *chunk_req;
struct chunk_req *chunk_req;
chunk_req = MPL_malloc(sizeof(*chunk_req), MPL_MEM_BUFFER);
MPIR_Assert(chunk_req);

Expand Down Expand Up @@ -251,6 +285,66 @@ static int recv_alloc_poll(MPIR_Async_thing * thing)
return MPIR_ASYNC_THING_NOPROGRESS;
};

/* ------------------------------------
* recv_event: callback for MPIDI_OFI_dispatch_function in ofi_events.c
*/
int MPIDI_OFI_gpu_pipeline_recv_event(struct fi_cq_tagged_entry *wc, MPIR_Request * r)
{
int mpi_errno = MPI_SUCCESS;

struct chunk_req *chunk_req = (void *) r;
int event_id = chunk_req->event_id;
MPIR_Request *rreq = chunk_req->parent;
void *host_buf = chunk_req->buf;

MPL_free(chunk_req);

void *recv_buf = MPIDI_OFI_REQUEST(rreq, noncontig.pack.buf);
size_t recv_count = MPIDI_OFI_REQUEST(rreq, noncontig.pack.count);
MPI_Datatype datatype = MPIDI_OFI_REQUEST(rreq, noncontig.pack.datatype);

if (event_id == MPIDI_OFI_EVENT_RECV_GPU_PIPELINE_INIT) {
rreq->status.MPI_SOURCE = MPIDI_OFI_cqe_get_source(wc, true);
rreq->status.MPI_ERROR = MPIDI_OFI_idata_get_error_bits(wc->data);
rreq->status.MPI_TAG = MPIDI_OFI_init_get_tag(wc->tag);

if (unlikely(MPIDI_OFI_is_tag_sync(wc->tag))) {
MPIDI_OFI_REQUEST(rreq, pipeline_info.is_sync) = true;
}

uint32_t packed = MPIDI_OFI_idata_get_gpu_packed_bit(wc->data);
uint32_t n_chunks = MPIDI_OFI_idata_get_gpuchunk_bits(wc->data);
/* ? - Not sure why sender cannot send packed data */
MPIR_Assertp(packed == 0);
if (wc->len > 0) {
/* message from a normal send */
MPIR_Assert(n_chunks == 0);
mpi_errno = start_recv_copy(rreq, host_buf, wc->len, recv_buf, recv_count, datatype);
MPIR_ERR_CHECK(mpi_errno);
} else {
MPIR_Assert(n_chunks > 0);
/* There is no data in the init chunk, free the buffer */
MPIDU_genq_private_pool_free_cell(MPIDI_OFI_global.gpu_pipeline_recv_pool, host_buf);
MPIR_cc_dec(rreq->cc_ptr);
/* Post recv for the remaining chunks. */
for (int i = 0; i < n_chunks; i++) {
mpi_errno = MPIDI_OFI_gpu_pipeline_recv(rreq, i, n_chunks);
MPIR_ERR_CHECK(mpi_errno);
}
}
} else {
MPIR_Assert(event_id == MPIDI_OFI_EVENT_RECV_GPU_PIPELINE);
mpi_errno = start_recv_copy(rreq, host_buf, wc->len, recv_buf, recv_count, datatype);
MPIR_ERR_CHECK(mpi_errno);
}

fn_exit:
return mpi_errno;
fn_fail:
rreq->status.MPI_ERROR = mpi_errno;
goto fn_exit;
}

/* ------------------------------------
* recv_copy: async copy from host_buf to user buffer in recv event
*/
Expand All @@ -265,8 +359,8 @@ struct recv_copy {
static int recv_copy_poll(MPIR_Async_thing * thing);
static void recv_copy_complete(MPIR_Request * rreq, void *buf);

int MPIDI_OFI_gpu_pipeline_recv_copy(MPIR_Request * rreq, void *buf, MPI_Aint chunk_sz,
void *recv_buf, MPI_Aint count, MPI_Datatype datatype)
static int start_recv_copy(MPIR_Request * rreq, void *buf, MPI_Aint chunk_sz,
void *recv_buf, MPI_Aint count, MPI_Datatype datatype)
{
int mpi_errno = MPI_SUCCESS;

Expand Down
4 changes: 2 additions & 2 deletions src/mpid/ch4/netmod/ofi/ofi_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -831,8 +831,8 @@ int MPIDI_OFI_gpu_pipeline_send(MPIR_Request * sreq, const void *send_buf,
MPI_Aint count, MPI_Datatype datatype,
MPL_pointer_attr_t attr, MPI_Aint data_sz);
int MPIDI_OFI_gpu_pipeline_recv(MPIR_Request * rreq, int idx, int n_chunks);
int MPIDI_OFI_gpu_pipeline_recv_copy(MPIR_Request * rreq, void *buf, MPI_Aint chunk_sz,
void *recv_buf, MPI_Aint count, MPI_Datatype datatype);
int MPIDI_OFI_gpu_pipeline_send_event(struct fi_cq_tagged_entry *wc, MPIR_Request * r);
int MPIDI_OFI_gpu_pipeline_recv_event(struct fi_cq_tagged_entry *wc, MPIR_Request * r);

MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_gpu_pipeline_chunk_size(size_t data_sz)
{
Expand Down
37 changes: 0 additions & 37 deletions src/mpid/ch4/netmod/ofi/ofi_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -337,43 +337,6 @@ typedef struct {
struct fid_cq *cq;
} MPIDI_OFI_context_t;

/* GPU pipelining */
typedef struct {
char pad[MPIDI_REQUEST_HDR_SIZE];
struct fi_context context[MPIDI_OFI_CONTEXT_STRUCTS]; /* fixed field, do not move */
int event_id; /* fixed field, do not move */
MPIR_Request *parent; /* Parent request */
void *buf;
} MPIDI_OFI_gpu_pipeline_request;

typedef struct MPIDI_OFI_gpu_task {
MPIDI_OFI_pipeline_type_t type;
MPIDI_OFI_pipeline_status_t status;
void *buf;
size_t len;
MPIR_Request *request;
MPIR_async_req async_req;
struct MPIDI_OFI_gpu_task *next, *prev;
} MPIDI_OFI_gpu_task_t;

typedef struct MPIDI_OFI_gpu_pending_recv {
MPIDI_OFI_gpu_pipeline_request *req;
int idx;
uint32_t n_chunks;
struct MPIDI_OFI_gpu_pending_recv *next, *prev;
} MPIDI_OFI_gpu_pending_recv_t;

typedef struct MPIDI_OFI_gpu_pending_send {
MPIR_Request *sreq;
void *send_buf;
MPL_pointer_attr_t attr;
MPI_Aint offset;
uint32_t n_chunks;
MPI_Aint left_sz, count;
int dt_contig;
struct MPIDI_OFI_gpu_pending_send *next, *prev;
} MPIDI_OFI_gpu_pending_send_t;

typedef union {
MPID_Thread_mutex_t m;
char cacheline[MPL_CACHELINE_SIZE];
Expand Down

0 comments on commit ad31a0f

Please sign in to comment.