Skip to content

Commit

Permalink
ch4/ofi: use internal tag for pipeline chunk match_bits
Browse files Browse the repository at this point in the history
Follow a similar approach as nonblocking collectives, internal pipeline
chunks use separate tag space (MPIDI_OFI_GPU_PIPELINE_SEND) and
incrementing tags to avoid mismatch with regular messages.
  • Loading branch information
hzhou committed Feb 6, 2024
1 parent 7cbc315 commit 4632550
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 48 deletions.
3 changes: 3 additions & 0 deletions src/mpid/ch4/netmod/ofi/ofi_comm.c
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,9 @@ int MPIDI_OFI_mpi_comm_commit_pre_hook(MPIR_Comm * comm)
MPIDI_OFI_COMM(comm).enable_hashing = 0;
MPIDI_OFI_COMM(comm).pref_nic = NULL;

/* Initialize tag for gpu_pipeline chunks; incremented by sender. */
MPIDI_OFI_COMM(comm).pipeline_tag = 0;

if (comm->hints[MPIR_COMM_HINT_ENABLE_MULTI_NIC_STRIPING] == -1) {
comm->hints[MPIR_COMM_HINT_ENABLE_MULTI_NIC_STRIPING] =
MPIR_CVAR_CH4_OFI_ENABLE_MULTI_NIC_STRIPING;
Expand Down
45 changes: 27 additions & 18 deletions src/mpid/ch4/netmod/ofi/ofi_gpu_pipeline.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ struct chunk_req {
void *buf;
};

struct pipeline_header {
int n_chunks;
int pipeline_tag;
};

static void spawn_send_copy(MPIR_Async_thing * thing, MPIR_Request * sreq, MPIR_async_req * areq,
const void *buf, MPI_Aint chunk_sz);
static int start_recv_chunk(MPIR_Request * rreq, int idx, int n_chunks);
Expand All @@ -39,12 +44,11 @@ int MPIDI_OFI_gpu_pipeline_send(MPIR_Request * sreq, const void *send_buf,
MPI_Aint count, MPI_Datatype datatype,
MPL_pointer_attr_t attr, MPI_Aint data_sz,
uint64_t cq_data, fi_addr_t remote_addr,
int vci_local, int ctx_idx, uint64_t match_bits)
int vci_local, int ctx_idx, uint64_t match_bits, int pipeline_tag)
{
int mpi_errno = MPI_SUCCESS;

uint32_t n_chunks = 0;
uint64_t is_packed = 0; /* always 0 ? */
MPI_Aint chunk_sz = MPIR_CVAR_CH4_OFI_GPU_PIPELINE_BUFFER_SZ;
if (data_sz <= chunk_sz) {
/* data fits in a single chunk */
Expand All @@ -56,18 +60,22 @@ int MPIDI_OFI_gpu_pipeline_send(MPIR_Request * sreq, const void *send_buf,
n_chunks++;
}
}
MPIDI_OFI_idata_set_gpuchunk_bits(&cq_data, n_chunks);
MPIDI_OFI_idata_set_gpu_packed_bit(&cq_data, is_packed);

MPIDI_OFI_REQUEST(sreq, pipeline_info.send.num_remain) = n_chunks;
MPIDI_OFI_REQUEST(sreq, pipeline_info.send.cq_data) = cq_data;
MPIDI_OFI_REQUEST(sreq, pipeline_info.send.remote_addr) = remote_addr;
MPIDI_OFI_REQUEST(sreq, pipeline_info.send.vci_local) = vci_local;
MPIDI_OFI_REQUEST(sreq, pipeline_info.send.ctx_idx) = ctx_idx;
MPIDI_OFI_REQUEST(sreq, pipeline_info.send.match_bits) = match_bits;
MPIDI_OFI_REQUEST(sreq, pipeline_info.send.pipeline_tag) = pipeline_tag;

struct pipeline_header hdr;
hdr.n_chunks = n_chunks;
hdr.pipeline_tag = pipeline_tag;

/* Send the initial empty packet for matching */
MPIDI_OFI_CALL_RETRY(fi_tinjectdata(MPIDI_OFI_global.ctx[ctx_idx].tx, NULL, 0, cq_data,
MPIDI_OFI_CALL_RETRY(fi_tinjectdata(MPIDI_OFI_global.ctx[ctx_idx].tx,
&hdr, sizeof(hdr), cq_data | MPIDI_OFI_IDATA_PIPELINE,
remote_addr, match_bits), vci_local, tinjectdata);

struct send_alloc *p;
Expand Down Expand Up @@ -197,7 +205,7 @@ static void send_copy_complete(MPIR_Request * sreq, const void *buf, MPI_Aint ch
int ctx_idx = MPIDI_OFI_REQUEST(sreq, pipeline_info.send.ctx_idx);
fi_addr_t remote_addr = MPIDI_OFI_REQUEST(sreq, pipeline_info.send.remote_addr);
uint64_t cq_data = MPIDI_OFI_REQUEST(sreq, pipeline_info.send.cq_data);
uint64_t match_bits = MPIDI_OFI_REQUEST(sreq, pipeline_info.send.match_bits) |
uint64_t match_bits = MPIDI_OFI_REQUEST(sreq, pipeline_info.send.pipeline_tag) |
MPIDI_OFI_GPU_PIPELINE_SEND;
MPID_THREAD_CS_ENTER(VCI, MPIDI_VCI(vci_local).lock);
MPIDI_OFI_CALL_RETRY(fi_tsenddata(MPIDI_OFI_global.ctx[ctx_idx].tx,
Expand Down Expand Up @@ -318,7 +326,6 @@ static int recv_alloc_poll(MPIR_Async_thing * thing)
fi_addr_t remote_addr = MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.remote_addr);
int ctx_idx = MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.ctx_idx);
int vci = MPIDI_Request_get_vci(rreq);
uint64_t match_bits = MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.match_bits);
uint64_t mask_bits = MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.mask_bits);

struct chunk_req *chunk_req;
Expand All @@ -327,10 +334,14 @@ static int recv_alloc_poll(MPIR_Async_thing * thing)

chunk_req->parent = rreq;
chunk_req->buf = host_buf;

uint64_t match_bits;
if (p->n_chunks == -1) {
match_bits = MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.match_bits);
chunk_req->event_id = MPIDI_OFI_EVENT_RECV_GPU_PIPELINE_INIT;
} else {
match_bits |= MPIDI_OFI_GPU_PIPELINE_SEND;
match_bits = MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.pipeline_tag) |
MPIDI_OFI_GPU_PIPELINE_SEND;
chunk_req->event_id = MPIDI_OFI_EVENT_RECV_GPU_PIPELINE;
}
MPID_THREAD_CS_ENTER(VCI, MPIDI_VCI(vci).lock);
Expand Down Expand Up @@ -380,24 +391,22 @@ int MPIDI_OFI_gpu_pipeline_recv_event(struct fi_cq_tagged_entry *wc, MPIR_Reques
MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.is_sync) = true;
}

uint32_t packed = MPIDI_OFI_idata_get_gpu_packed_bit(wc->data);
uint32_t n_chunks = MPIDI_OFI_idata_get_gpuchunk_bits(wc->data);
/* ? - Not sure why sender cannot send packed data */
MPIR_Assert(packed == 0);
if (wc->len > 0) {
bool is_pipeline = (wc->data & MPIDI_OFI_IDATA_PIPELINE);
if (!is_pipeline) {
/* message from a normal send */
MPIR_Assert(n_chunks == 0);
MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.num_remain) = 1;
mpi_errno = start_recv_copy(rreq, host_buf, wc->len, recv_buf, recv_count, datatype);
MPIR_ERR_CHECK(mpi_errno);
} else {
MPIR_Assert(n_chunks > 0);
MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.num_remain) = n_chunks;
struct pipeline_header *p_hdr = host_buf;
MPIR_Assert(p_hdr->n_chunks > 0);
MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.num_remain) = p_hdr->n_chunks;
MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.pipeline_tag) = p_hdr->pipeline_tag;
/* There is no data in the init chunk, free the buffer */
MPIDU_genq_private_pool_free_cell(MPIDI_OFI_global.gpu_pipeline_recv_pool, host_buf);
/* Post recv for the remaining chunks. */
for (int i = 0; i < n_chunks; i++) {
mpi_errno = start_recv_chunk(rreq, i, n_chunks);
for (int i = 0; i < p_hdr->n_chunks; i++) {
mpi_errno = start_recv_chunk(rreq, i, p_hdr->n_chunks);
MPIR_ERR_CHECK(mpi_errno);
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/mpid/ch4/netmod/ofi/ofi_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -831,7 +831,7 @@ int MPIDI_OFI_gpu_pipeline_send(MPIR_Request * sreq, const void *send_buf,
MPI_Aint count, MPI_Datatype datatype,
MPL_pointer_attr_t attr, MPI_Aint data_sz,
uint64_t cq_data, fi_addr_t remote_addr,
int vci_local, int ctx_idx, uint64_t match_bits);
int vci_local, int ctx_idx, uint64_t match_bits, int pipeline_tag);
int MPIDI_OFI_gpu_pipeline_recv(MPIR_Request * rreq,
void *recv_buf, MPI_Aint count, MPI_Datatype datatype,
fi_addr_t remote_addr, int vci_local,
Expand Down
3 changes: 3 additions & 0 deletions src/mpid/ch4/netmod/ofi/ofi_pre.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ typedef struct {
int enable_striping; /* Flag to enable striping per communicator. */
int enable_hashing; /* Flag to enable hashing per communicator. */
int *pref_nic; /* Array to specify the preferred NIC for each rank (if needed) */
int pipeline_tag; /* match_bits for gpu_pipeline chunks */
} MPIDI_OFI_comm_t;
enum {
MPIDI_AMTYPE_NONE = 0,
Expand Down Expand Up @@ -223,6 +224,7 @@ typedef struct {
fi_addr_t remote_addr;
uint64_t cq_data;
uint64_t match_bits;
int pipeline_tag;
int num_remain;
} send;
struct {
Expand All @@ -232,6 +234,7 @@ typedef struct {
uint64_t match_bits;
uint64_t mask_bits;
MPI_Aint offset;
int pipeline_tag;
int num_inrecv;
int num_remain;
bool is_sync;
Expand Down
3 changes: 2 additions & 1 deletion src/mpid/ch4/netmod/ofi/ofi_send.h
Original file line number Diff line number Diff line change
Expand Up @@ -275,9 +275,10 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_send_normal(const void *buf, MPI_Aint cou
data_sz >= MPIR_CVAR_CH4_OFI_GPU_PIPELINE_THRESHOLD) {
/* Pipeline path */
fi_addr_t remote_addr = MPIDI_OFI_av_to_phys(addr, receiver_nic, vci_remote);
MPIDI_OFI_COMM(comm).pipeline_tag += 1;
mpi_errno = MPIDI_OFI_gpu_pipeline_send(sreq, buf, count, datatype, attr, data_sz,
cq_data, remote_addr, vci_local, ctx_idx,
match_bits);
match_bits, MPIDI_OFI_COMM(comm).pipeline_tag);
MPIR_ERR_CHECK(mpi_errno);

MPIR_T_PVAR_COUNTER_INC(MULTINIC, nic_sent_bytes_count[sender_nic], data_sz);
Expand Down
30 changes: 2 additions & 28 deletions src/mpid/ch4/netmod/ofi/ofi_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,12 @@
#define MPIDI_OFI_IDATA_ERROR_BITS (2)
/* The number of bits in the immediate data field allocated to the source rank and error propagation. */
#define MPIDI_OFI_IDATA_SRC_ERROR_BITS (MPIDI_OFI_IDATA_SRC_BITS + MPIDI_OFI_IDATA_ERROR_BITS)
/* The number of bits in the immediate data field allocated to MPI_Packed datatype for GPU. */
#define MPIDI_OFI_IDATA_GPU_PACKED_BITS (1)
/* The offset of bits in the immediate data field allocated to number of message chunks. */
#define MPIDI_OFI_IDATA_GPUCHUNK_OFFSET (MPIDI_OFI_IDATA_SRC_ERROR_BITS + MPIDI_OFI_IDATA_GPU_PACKED_BITS)
/* Bit mask for MPIR_ERR_OTHER */
#define MPIDI_OFI_ERR_OTHER (0x1ULL)
/* Bit mask for MPIR_PROC_FAILED */
#define MPIDI_OFI_ERR_PROC_FAILED (0x2ULL)
/* Bit mask for gpu pipeline */
#define MPIDI_OFI_IDATA_PIPELINE (1ULL << 32)

/* Set the error bits */
MPL_STATIC_INLINE_PREFIX void MPIDI_OFI_idata_set_error_bits(uint64_t * data_field,
Expand Down Expand Up @@ -75,30 +73,6 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_idata_get_error_bits(uint64_t idata)
}
}

/* Set the gpu packed bit */
static inline void MPIDI_OFI_idata_set_gpu_packed_bit(uint64_t * data_field, uint64_t is_packed)
{
*data_field = (*data_field) | (is_packed << MPIDI_OFI_IDATA_SRC_ERROR_BITS);
}

/* Get the gpu packed bit from the OFI data field. */
static inline uint32_t MPIDI_OFI_idata_get_gpu_packed_bit(uint64_t idata)
{
return (idata >> MPIDI_OFI_IDATA_SRC_ERROR_BITS) & 0x1ULL;
}

/* Set gpu chunk bits */
static inline void MPIDI_OFI_idata_set_gpuchunk_bits(uint64_t * data_field, uint64_t n_chunks)
{
*data_field = (*data_field) | (n_chunks << MPIDI_OFI_IDATA_GPUCHUNK_OFFSET);
}

/* Get gpu chunks from the OFI data field. */
static inline uint32_t MPIDI_OFI_idata_get_gpuchunk_bits(uint64_t idata)
{
return (idata >> MPIDI_OFI_IDATA_GPUCHUNK_OFFSET);
}

/* There are 4 protocol bits:
* - MPIDI_DYNPROC_SEND
* - MPIDI_OFI_HUGE_SEND
Expand Down

0 comments on commit 4632550

Please sign in to comment.