Skip to content

Commit

Permalink
ch4/ofi: move all gpu pipeline code into ofi_gpu_pipeline.c
Browse files Browse the repository at this point in the history
Move all gpu pipeline specific code into ofi_gpu_pipeline.c.

Make a new function MPIDI_OFI_gpu_pipeline_recv that fills rreq with
persistent pipeline_info data. Rename the original
MPIDI_OFI_gpu_pipeline_recv into static function start_recv_chunk.
  • Loading branch information
hzhou committed Feb 6, 2024
1 parent ad31a0f commit 1c8c6fa
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 67 deletions.
81 changes: 76 additions & 5 deletions src/mpid/ch4/netmod/ofi/ofi_gpu_pipeline.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ struct chunk_req {

static void spawn_send_copy(MPIR_Async_thing * thing, MPIR_Request * sreq, MPIR_async_req * areq,
const void *buf, MPI_Aint chunk_sz);
static int start_recv_chunk(MPIR_Request * rreq, int idx, int n_chunks);
static int start_recv_copy(MPIR_Request * rreq, void *buf, MPI_Aint chunk_sz,
void *recv_buf, MPI_Aint count, MPI_Datatype datatype);

Expand All @@ -28,18 +29,47 @@ struct send_alloc {
MPI_Aint count;
MPI_Datatype datatype;
MPL_pointer_attr_t attr;
MPI_Aint offset, left_sz;
MPI_Aint offset, left_sz, chunk_sz;
int n_chunks;
};

static int send_alloc_poll(MPIR_Async_thing * thing);

int MPIDI_OFI_gpu_pipeline_send(MPIR_Request * sreq, const void *send_buf,
MPI_Aint count, MPI_Datatype datatype,
MPL_pointer_attr_t attr, MPI_Aint data_sz)
MPL_pointer_attr_t attr, MPI_Aint data_sz,
uint64_t cq_data, fi_addr_t remote_addr,
int vci_local, int ctx_idx, uint64_t match_bits)
{
int mpi_errno = MPI_SUCCESS;

uint32_t n_chunks = 0;
uint64_t is_packed = 0; /* always 0 ? */
MPI_Aint chunk_sz = MPIR_CVAR_CH4_OFI_GPU_PIPELINE_BUFFER_SZ;
if (data_sz <= chunk_sz) {
/* data fits in a single chunk */
chunk_sz = data_sz;
n_chunks = 1;
} else {
n_chunks = data_sz / chunk_sz;
if (data_sz % chunk_sz > 0) {
n_chunks++;
}
}
MPIDI_OFI_idata_set_gpuchunk_bits(&cq_data, n_chunks);
MPIDI_OFI_idata_set_gpu_packed_bit(&cq_data, is_packed);

MPIDI_OFI_REQUEST(sreq, pipeline_info.cq_data) = cq_data;
MPIDI_OFI_REQUEST(sreq, pipeline_info.remote_addr) = remote_addr;
MPIDI_OFI_REQUEST(sreq, pipeline_info.vci_local) = vci_local;
MPIDI_OFI_REQUEST(sreq, pipeline_info.ctx_idx) = ctx_idx;
MPIDI_OFI_REQUEST(sreq, pipeline_info.match_bits) = match_bits;
MPIDI_OFI_REQUEST(sreq, pipeline_info.data_sz) = data_sz;

/* Send the initial empty packet for matching */
MPIDI_OFI_CALL_RETRY(fi_tinjectdata(MPIDI_OFI_global.ctx[ctx_idx].tx, NULL, 0, cq_data,
remote_addr, match_bits), vci_local, tinjectdata);

struct send_alloc *p;
p = MPL_malloc(sizeof(*p), MPL_MEM_OTHER);
MPIR_Assert(p);
Expand All @@ -50,12 +80,17 @@ int MPIDI_OFI_gpu_pipeline_send(MPIR_Request * sreq, const void *send_buf,
p->datatype = datatype;
p->attr = attr;
p->left_sz = data_sz;
p->chunk_sz = chunk_sz;
p->offset = 0;
p->n_chunks = 0;

mpi_errno = MPIR_Async_things_add(send_alloc_poll, p);
/* TODO: kick the progress right away */

fn_exit:
return mpi_errno;
fn_fail:
goto fn_exit;
}

static int send_alloc_poll(MPIR_Async_thing * thing)
Expand All @@ -70,7 +105,7 @@ static int send_alloc_poll(MPIR_Async_thing * thing)
return (num_new_chunks == 0) ? MPIR_ASYNC_THING_NOPROGRESS : MPIR_ASYNC_THING_UPDATED;
}
MPIR_async_req async_req;
MPI_Aint chunk_sz = MPL_MIN(p->left_sz, MPIDI_OFI_REQUEST(p->sreq, pipeline_info.chunk_sz));
MPI_Aint chunk_sz = MPL_MIN(p->left_sz, p->chunk_sz);
MPL_gpu_engine_type_t engine_type =
MPIDI_OFI_gpu_get_send_engine_type(MPIR_CVAR_CH4_OFI_GPU_SEND_ENGINE_TYPE);
int commit = p->left_sz <= chunk_sz ? 1 : 0;
Expand Down Expand Up @@ -216,7 +251,43 @@ struct recv_alloc {

static int recv_alloc_poll(MPIR_Async_thing * thing);

int MPIDI_OFI_gpu_pipeline_recv(MPIR_Request * rreq, int idx, int n_chunks)
int MPIDI_OFI_gpu_pipeline_recv(MPIR_Request * rreq,
void *recv_buf, MPI_Aint count, MPI_Datatype datatype,
fi_addr_t remote_addr, int vci_local,
uint64_t match_bits, uint64_t mask_bits,
MPI_Aint data_sz, int ctx_idx)
{
int mpi_errno = MPI_SUCCESS;

MPIDI_OFI_REQUEST(rreq, pipeline_info.offset) = 0;
MPIDI_OFI_REQUEST(rreq, pipeline_info.is_sync) = false;
MPIDI_OFI_REQUEST(rreq, pipeline_info.remote_addr) = remote_addr;
MPIDI_OFI_REQUEST(rreq, pipeline_info.vci_local) = vci_local;
MPIDI_OFI_REQUEST(rreq, pipeline_info.match_bits) = match_bits;
MPIDI_OFI_REQUEST(rreq, pipeline_info.mask_bits) = mask_bits;
MPIDI_OFI_REQUEST(rreq, pipeline_info.data_sz) = data_sz;
MPIDI_OFI_REQUEST(rreq, pipeline_info.ctx_idx) = ctx_idx;

/* Save original buf, datatype and count */
MPIDI_OFI_REQUEST(rreq, noncontig.pack.buf) = recv_buf;
MPIDI_OFI_REQUEST(rreq, noncontig.pack.count) = count;
MPIDI_OFI_REQUEST(rreq, noncontig.pack.datatype) = datatype;

struct recv_alloc *p;
p = MPL_malloc(sizeof(*p), MPL_MEM_OTHER);
MPIR_Assert(p);

p->rreq = rreq;
p->idx = 0;
p->n_chunks = -1; /* it's MPIDI_OFI_EVENT_RECV_GPU_PIPELINE_INIT */

mpi_errno = MPIR_Async_things_add(recv_alloc_poll, p);

return mpi_errno;
}

/* this is called from recv_event */
static int start_recv_chunk(MPIR_Request * rreq, int idx, int n_chunks)
{
int mpi_errno = MPI_SUCCESS;

Expand Down Expand Up @@ -328,7 +399,7 @@ int MPIDI_OFI_gpu_pipeline_recv_event(struct fi_cq_tagged_entry *wc, MPIR_Reques
MPIR_cc_dec(rreq->cc_ptr);
/* Post recv for the remaining chunks. */
for (int i = 0; i < n_chunks; i++) {
mpi_errno = MPIDI_OFI_gpu_pipeline_recv(rreq, i, n_chunks);
mpi_errno = start_recv_chunk(rreq, i, n_chunks);
MPIR_ERR_CHECK(mpi_errno);
}
}
Expand Down
19 changes: 8 additions & 11 deletions src/mpid/ch4/netmod/ofi/ofi_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -829,18 +829,15 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_gpu_free_pack_buffer(void *ptr)

int MPIDI_OFI_gpu_pipeline_send(MPIR_Request * sreq, const void *send_buf,
MPI_Aint count, MPI_Datatype datatype,
MPL_pointer_attr_t attr, MPI_Aint data_sz);
int MPIDI_OFI_gpu_pipeline_recv(MPIR_Request * rreq, int idx, int n_chunks);
MPL_pointer_attr_t attr, MPI_Aint data_sz,
uint64_t cq_data, fi_addr_t remote_addr,
int vci_local, int ctx_idx, uint64_t match_bits);
int MPIDI_OFI_gpu_pipeline_recv(MPIR_Request * rreq,
void *recv_buf, MPI_Aint count, MPI_Datatype datatype,
fi_addr_t remote_addr, int vci_local,
uint64_t match_bits, uint64_t mask_bits,
MPI_Aint data_sz, int ctx_idx);
int MPIDI_OFI_gpu_pipeline_send_event(struct fi_cq_tagged_entry *wc, MPIR_Request * r);
int MPIDI_OFI_gpu_pipeline_recv_event(struct fi_cq_tagged_entry *wc, MPIR_Request * r);

MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_gpu_pipeline_chunk_size(size_t data_sz)
{
int chunk_size = MPIR_CVAR_CH4_OFI_GPU_PIPELINE_BUFFER_SZ;
if (data_sz <= MPIR_CVAR_CH4_OFI_GPU_PIPELINE_BUFFER_SZ) {
chunk_size = data_sz;
}
return chunk_size;
}

#endif /* OFI_IMPL_H_INCLUDED */
19 changes: 3 additions & 16 deletions src/mpid/ch4/netmod/ofi/ofi_recv.h
Original file line number Diff line number Diff line change
Expand Up @@ -233,27 +233,14 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_do_irecv(void *buf,
remote_addr = MPIDI_OFI_av_to_phys(addr, sender_nic, vci_remote);
}

/* Save pipeline information. */
MPIDI_OFI_REQUEST(rreq, pipeline_info.offset) = 0;
MPIDI_OFI_REQUEST(rreq, pipeline_info.is_sync) = false;
MPIDI_OFI_REQUEST(rreq, pipeline_info.remote_addr) = remote_addr;
MPIDI_OFI_REQUEST(rreq, pipeline_info.vci_local) = vci_local;
MPIDI_OFI_REQUEST(rreq, pipeline_info.match_bits) = match_bits;
MPIDI_OFI_REQUEST(rreq, pipeline_info.mask_bits) = mask_bits;
MPIDI_OFI_REQUEST(rreq, pipeline_info.data_sz) = data_sz;
MPIDI_OFI_REQUEST(rreq, pipeline_info.ctx_idx) = ctx_idx;

/* Save original buf, datatype and count */
MPIDI_OFI_REQUEST(rreq, noncontig.pack.buf) = buf;
MPIDI_OFI_REQUEST(rreq, noncontig.pack.count) = count;
MPIDI_OFI_REQUEST(rreq, noncontig.pack.datatype) = datatype;

if (rreq->comm == NULL) {
rreq->comm = comm;
MPIR_Comm_add_ref(comm);
}

mpi_errno = MPIDI_OFI_gpu_pipeline_recv(rreq, 0, -1);
mpi_errno = MPIDI_OFI_gpu_pipeline_recv(rreq, buf, count, datatype,
remote_addr, vci_local,
match_bits, mask_bits, data_sz, ctx_idx);
goto fn_exit;
}

Expand Down
40 changes: 5 additions & 35 deletions src/mpid/ch4/netmod/ofi/ofi_send.h
Original file line number Diff line number Diff line change
Expand Up @@ -274,43 +274,13 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_send_normal(const void *buf, MPI_Aint cou
if (force_gpu_pack && MPIR_CVAR_CH4_OFI_ENABLE_GPU_PIPELINE &&
data_sz >= MPIR_CVAR_CH4_OFI_GPU_PIPELINE_THRESHOLD) {
/* Pipeline path */
uint32_t n_chunks = 0;
int chunk_size = MPIR_CVAR_CH4_OFI_GPU_PIPELINE_BUFFER_SZ;
if (dt_contig) {
/* Update correct number of chunks in immediate data. */
chunk_size = MPIDI_OFI_gpu_pipeline_chunk_size(data_sz);
n_chunks = data_sz / chunk_size;
if (data_sz % chunk_size)
n_chunks++;
MPIDI_OFI_idata_set_gpuchunk_bits(&cq_data, n_chunks);
}
fi_addr_t remote_addr = MPIDI_OFI_av_to_phys(addr, receiver_nic, vci_remote);
mpi_errno = MPIDI_OFI_gpu_pipeline_send(sreq, buf, count, datatype, attr, data_sz,
cq_data, remote_addr, vci_local, ctx_idx,
match_bits);
MPIR_ERR_CHECK(mpi_errno);

/* Update sender packed bit if necessary. */
uint64_t is_packed = datatype == MPI_PACKED ? 1 : 0;
MPIDI_OFI_idata_set_gpu_packed_bit(&cq_data, is_packed);
MPIR_ERR_CHKANDJUMP(is_packed, mpi_errno, MPI_ERR_OTHER, "**gpu_pipeline_packed");

/* Save pipeline information. */
MPIDI_OFI_REQUEST(sreq, pipeline_info.chunk_sz) = chunk_size;
MPIDI_OFI_REQUEST(sreq, pipeline_info.cq_data) = cq_data;
MPIDI_OFI_REQUEST(sreq, pipeline_info.remote_addr) =
MPIDI_OFI_av_to_phys(addr, receiver_nic, vci_remote);
MPIDI_OFI_REQUEST(sreq, pipeline_info.vci_local) = vci_local;
MPIDI_OFI_REQUEST(sreq, pipeline_info.ctx_idx) = ctx_idx;
MPIDI_OFI_REQUEST(sreq, pipeline_info.match_bits) = match_bits;
MPIDI_OFI_REQUEST(sreq, pipeline_info.data_sz) = data_sz;

/* send an empty message for tag matching */
MPIDI_OFI_CALL_RETRY(fi_tinjectdata(MPIDI_OFI_global.ctx[ctx_idx].tx,
NULL,
0,
cq_data,
MPIDI_OFI_REQUEST(sreq, pipeline_info.remote_addr),
match_bits), vci_local, tinjectdata);
MPIR_T_PVAR_COUNTER_INC(MULTINIC, nic_sent_bytes_count[sender_nic], data_sz);

MPIDI_OFI_gpu_pipeline_send(sreq, buf, count, datatype, attr, data_sz);

goto fn_exit;
}

Expand Down

0 comments on commit 1c8c6fa

Please sign in to comment.