diff --git a/src/mpid/ch4/netmod/ofi/ofi_am_events.h b/src/mpid/ch4/netmod/ofi/ofi_am_events.h index 960eebe14c5..f7571870f78 100644 --- a/src/mpid/ch4/netmod/ofi/ofi_am_events.h +++ b/src/mpid/ch4/netmod/ofi/ofi_am_events.h @@ -13,6 +13,83 @@ #include "ofi_am_impl.h" +MPL_STATIC_INLINE_PREFIX uint16_t MPIDI_OFI_am_get_next_recv_seqno(fi_addr_t addr) +{ + uint64_t id = addr; + void *r; + + r = MPIDIU_map_lookup(MPIDI_OFI_global.am_recv_seq_tracker, id); + if (r == MPIDIU_MAP_NOT_FOUND) { + MPL_DBG_MSG_FMT(MPIDI_CH4_DBG_GENERAL, VERBOSE, + (MPL_DBG_FDEST, "First time adding recv seqno addr=0x%016lx\n", addr)); + MPIDIU_map_set(MPIDI_OFI_global.am_recv_seq_tracker, id, 0, MPL_MEM_OTHER); + return 0; + } else { + return (uint16_t) (uintptr_t) r; + } +} + +MPL_STATIC_INLINE_PREFIX void MPIDI_OFI_am_set_next_recv_seqno(fi_addr_t addr, uint16_t seqno) +{ + uint64_t id = addr; + + MPL_DBG_MSG_FMT(MPIDI_CH4_DBG_GENERAL, VERBOSE, + (MPL_DBG_FDEST, "Next recv seqno=%d addr=0x%016lx\n", seqno, addr)); + + MPIDIU_map_update(MPIDI_OFI_global.am_recv_seq_tracker, id, (void *) (uintptr_t) seqno, + MPL_MEM_OTHER); +} + +MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_am_enqueue_unordered_msg(const MPIDI_OFI_am_header_t * + am_hdr) +{ + MPIDI_OFI_am_unordered_msg_t *uo_msg; + size_t uo_msg_len, packet_len; + /* Essentially, uo_msg_len == packet_len + sizeof(next,prev pointers) */ + + uo_msg_len = sizeof(*uo_msg) + am_hdr->am_hdr_sz + am_hdr->data_sz; + + /* Allocate a new memory region to store this unordered message. + * We are doing this because the original am_hdr comes from FI_MULTI_RECV + * buffer, which may be reused soon by OFI. */ + uo_msg = MPL_malloc(uo_msg_len, MPL_MEM_BUFFER); + if (uo_msg == NULL) + return MPI_ERR_NO_MEM; + + packet_len = sizeof(*am_hdr) + am_hdr->am_hdr_sz + am_hdr->data_sz; + MPIR_Memcpy(&uo_msg->am_hdr, am_hdr, packet_len); + + DL_APPEND(MPIDI_OFI_global.am_unordered_msgs, uo_msg); + + return MPI_SUCCESS; +} + +/* Find and dequeue a message that matches (comm, src_rank, seqno), then return it. + * Caller must free the returned pointer. */ +MPL_STATIC_INLINE_PREFIX MPIDI_OFI_am_unordered_msg_t + * MPIDI_OFI_am_claim_unordered_msg(fi_addr_t addr, uint16_t seqno) +{ + MPIDI_OFI_am_unordered_msg_t *uo_msg; + + /* Future optimization note: + * Currently we are doing linear search every time, assuming that the number of items + * in the queue is extremely small. + * If it's not the case, we should consider using better data structure and algorithm + * to look up. */ + DL_FOREACH(MPIDI_OFI_global.am_unordered_msgs, uo_msg) { + if (uo_msg->am_hdr.fi_src_addr == addr && uo_msg->am_hdr.seqno == seqno) { + MPL_DBG_MSG_FMT(MPIDI_CH4_DBG_GENERAL, TERSE, + (MPL_DBG_FDEST, + "Found unordered message in the queue: addr=0x%016lx, seqno=%d\n", + addr, seqno)); + DL_DELETE(MPIDI_OFI_global.am_unordered_msgs, uo_msg); + return uo_msg; + } + } + + return NULL; +} + static inline int MPIDI_OFI_handle_short_am(MPIDI_OFI_am_header_t * msg_hdr) { int mpi_errno = MPI_SUCCESS; @@ -346,6 +423,9 @@ static inline int MPIDI_OFI_dispatch_ack(int rank, int context_id, uint64_t sreq msg.hdr.am_hdr_sz = sizeof(msg.pyld); msg.hdr.data_sz = 0; msg.hdr.am_type = am_type; + msg.hdr.seqno = MPIDI_OFI_am_fetch_incr_send_seqno(comm, rank); + msg.hdr.fi_src_addr + = MPIDI_OFI_comm_to_phys(MPIR_Process.comm_world, MPIR_Process.comm_world->rank); msg.pyld.sreq_ptr = sreq_ptr; MPIDI_OFI_CALL_RETRY_AM(fi_inject(MPIDI_OFI_global.ctx[0].tx, &msg, sizeof(msg), MPIDI_OFI_comm_to_phys(comm, rank)), diff --git a/src/mpid/ch4/netmod/ofi/ofi_am_impl.h b/src/mpid/ch4/netmod/ofi/ofi_am_impl.h index f5e73604d50..b68565dad84 100644 --- a/src/mpid/ch4/netmod/ofi/ofi_am_impl.h +++ b/src/mpid/ch4/netmod/ofi/ofi_am_impl.h @@ -15,6 +15,35 @@ static inline int MPIDI_OFI_progress_do_queue(int vci_idx); +/* Acquire a sequence number to send, and record the next number */ +MPL_STATIC_INLINE_PREFIX uint16_t MPIDI_OFI_am_fetch_incr_send_seqno(MPIR_Comm * comm, + int dest_rank) +{ + fi_addr_t addr = MPIDI_OFI_comm_to_phys(comm, dest_rank); + uint64_t id = addr; + uint16_t seq, old_seq; + void *ret; + ret = MPIDIU_map_lookup(MPIDI_OFI_global.am_send_seq_tracker, id); + if (ret == MPIDIU_MAP_NOT_FOUND) + old_seq = 0; + else + old_seq = (uint16_t) (uintptr_t) ret; + + seq = old_seq + 1; + MPIDIU_map_update(MPIDI_OFI_global.am_send_seq_tracker, id, (void *) (uintptr_t) seq, + MPL_MEM_OTHER); + + MPL_DBG_MSG_FMT(MPIDI_CH4_DBG_GENERAL, VERBOSE, + (MPL_DBG_FDEST, + "Generated seqno=%d for dest_rank=%d " + "(context_id=0x%08x, src_addr=0x%016lx, dest_addr=0x%016lx)\n", + old_seq, dest_rank, comm->context_id, + MPIDI_OFI_comm_to_phys(MPIR_Process.comm_world, MPIR_Process.comm_world->rank), + addr)); + + return old_seq; +} + /* Per-object lock for OFI @@ -193,6 +222,9 @@ static inline int MPIDI_OFI_do_am_isend_header(int rank, msg_hdr->am_hdr_sz = am_hdr_sz; msg_hdr->data_sz = 0; msg_hdr->am_type = MPIDI_AMTYPE_SHORT_HDR; + msg_hdr->seqno = MPIDI_OFI_am_fetch_incr_send_seqno(comm, rank); + msg_hdr->fi_src_addr + = MPIDI_OFI_comm_to_phys(MPIR_Process.comm_world, MPIR_Process.comm_world->rank); MPIR_Assert((uint64_t) comm->rank < (1ULL << MPIDI_OFI_AM_RANK_BITS)); @@ -245,6 +277,9 @@ static inline int MPIDI_OFI_am_isend_long(int rank, msg_hdr->am_hdr_sz = am_hdr_sz; msg_hdr->data_sz = data_sz; msg_hdr->am_type = MPIDI_AMTYPE_LMT_REQ; + msg_hdr->seqno = MPIDI_OFI_am_fetch_incr_send_seqno(comm, rank); + msg_hdr->fi_src_addr + = MPIDI_OFI_comm_to_phys(MPIR_Process.comm_world, MPIR_Process.comm_world->rank); lmt_info = &MPIDI_OFI_AMREQUEST_HDR(sreq, lmt_info); lmt_info->context_id = comm->context_id; @@ -332,6 +367,9 @@ static inline int MPIDI_OFI_am_isend_short(int rank, msg_hdr->am_hdr_sz = am_hdr_sz; msg_hdr->data_sz = count; msg_hdr->am_type = MPIDI_AMTYPE_SHORT; + msg_hdr->seqno = MPIDI_OFI_am_fetch_incr_send_seqno(comm, rank); + msg_hdr->fi_src_addr + = MPIDI_OFI_comm_to_phys(MPIR_Process.comm_world, MPIR_Process.comm_world->rank); iov = MPIDI_OFI_AMREQUEST_HDR(sreq, iov); @@ -499,6 +537,9 @@ static inline int MPIDI_OFI_do_inject(int rank, msg_hdr.am_hdr_sz = am_hdr_sz; msg_hdr.data_sz = 0; msg_hdr.am_type = MPIDI_AMTYPE_SHORT_HDR; + msg_hdr.seqno = MPIDI_OFI_am_fetch_incr_send_seqno(comm, rank); + msg_hdr.fi_src_addr + = MPIDI_OFI_comm_to_phys(MPIR_Process.comm_world, MPIR_Process.comm_world->rank); MPIR_Assert((uint64_t) comm->rank < (1ULL << MPIDI_OFI_AM_RANK_BITS)); diff --git a/src/mpid/ch4/netmod/ofi/ofi_events.h b/src/mpid/ch4/netmod/ofi/ofi_events.h index 8849671f269..ec54f116825 100644 --- a/src/mpid/ch4/netmod/ofi/ofi_events.h +++ b/src/mpid/ch4/netmod/ofi/ofi_events.h @@ -29,23 +29,23 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_peek_event(struct fi_cq_tagged_entry *wc, MPIR_Request * rreq) { size_t count; - MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_NETMOD_OFI_NETMOD_PEEK_EVENT); - MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_NETMOD_OFI_NETMOD_PEEK_EVENT); + MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDI_OFI_PEEK_EVENT); + MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDI_OFI_PEEK_EVENT); MPIDI_OFI_REQUEST(rreq, util_id) = MPIDI_OFI_PEEK_FOUND; rreq->status.MPI_SOURCE = MPIDI_OFI_cqe_get_source(wc); rreq->status.MPI_TAG = MPIDI_OFI_init_get_tag(wc->tag); count = wc->len; rreq->status.MPI_ERROR = MPI_SUCCESS; MPIR_STATUS_SET_COUNT(rreq->status, count); - MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_NETMOD_OFI_NETMOD_PEEK_EVENT); + MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDI_OFI_PEEK_EVENT); return MPI_SUCCESS; } MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_peek_empty_event(struct fi_cq_tagged_entry *wc, MPIR_Request * rreq) { - MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_NETMOD_OFI_NETMOD_PEEK_EMPTY_EVENT); - MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_NETMOD_OFI_NETMOD_PEEK_EMPTY_EVENT); + MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDI_OFI_PEEK_EMPTY_EVENT); + MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDI_OFI_PEEK_EMPTY_EVENT); MPIDI_OFI_dynamic_process_request_t *ctrl; switch (MPIDI_OFI_REQUEST(rreq, event_id)) { @@ -64,7 +64,7 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_peek_empty_event(struct fi_cq_tagged_entr break; } - MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_NETMOD_OFI_NETMOD_PEEK_EMPTY_EVENT); + MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDI_OFI_PEEK_EMPTY_EVENT); return MPI_SUCCESS; } @@ -74,8 +74,8 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_recv_event(struct fi_cq_tagged_entry *wc, int mpi_errno = MPI_SUCCESS; MPI_Aint last; size_t count; - MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_NETMOD_OFI_RECV_EVENT); - MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_NETMOD_OFI_RECV_EVENT); + MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDI_OFI_RECV_EVENT); + MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDI_OFI_RECV_EVENT); rreq->status.MPI_ERROR = MPI_SUCCESS; rreq->status.MPI_SOURCE = MPIDI_OFI_cqe_get_source(wc); @@ -150,7 +150,7 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_recv_event(struct fi_cq_tagged_entry *wc, /* Polling loop will check for truncation */ fn_exit: - MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_NETMOD_OFI_RECV_EVENT); + MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDI_OFI_RECV_EVENT); return mpi_errno; fn_fail: rreq->status.MPI_ERROR = mpi_errno; @@ -165,8 +165,8 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_recv_huge_event(struct fi_cq_tagged_entry int mpi_errno = MPI_SUCCESS; MPIDI_OFI_huge_recv_t *recv = NULL; MPIR_Comm *comm_ptr; - MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_NETMOD_OFI_RECV_HUGE_EVENT); - MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_NETMOD_OFI_RECV_HUGE_EVENT); + MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDI_OFI_RECV_HUGE_EVENT); + MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDI_OFI_RECV_HUGE_EVENT); /* Check that the sender didn't underflow the message by sending less than * the huge message threshold. */ @@ -239,7 +239,7 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_recv_huge_event(struct fi_cq_tagged_entry MPIDI_OFI_get_huge_event(NULL, (MPIR_Request *) recv); fn_exit: - MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_NETMOD_OFI_RECV_HUGE_EVENT); + MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDI_OFI_RECV_HUGE_EVENT); return mpi_errno; fn_fail: goto fn_exit; @@ -250,8 +250,8 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_send_event(struct fi_cq_tagged_entry *wc, MPIR_Request * sreq, int event_id) { int c; - MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_NETMOD_OFI_SEND_EVENT); - MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_NETMOD_OFI_SEND_EVENT); + MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDI_OFI_SEND_EVENT); + MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDI_OFI_SEND_EVENT); MPIR_cc_decr(sreq->cc_ptr, &c); @@ -266,7 +266,7 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_send_event(struct fi_cq_tagged_entry *wc, MPIR_Request_free(sreq); } /* c != 0, ssend */ - MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_NETMOD_OFI_SEND_EVENT); + MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDI_OFI_SEND_EVENT); return MPI_SUCCESS; } @@ -275,8 +275,8 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_send_huge_event(struct fi_cq_tagged_entry { int mpi_errno = MPI_SUCCESS; int c; - MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_NETMOD_OFI_SEND_EVENT_HUGE); - MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_NETMOD_OFI_SEND_EVENT_HUGE); + MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDI_OFI_SEND_EVENT_HUGE); + MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDI_OFI_SEND_EVENT_HUGE); MPIR_cc_decr(sreq->cc_ptr, &c); @@ -315,7 +315,7 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_send_huge_event(struct fi_cq_tagged_entry } /* c != 0, ssend */ fn_exit: - MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_NETMOD_OFI_SEND_EVENT_HUGE); + MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDI_OFI_SEND_EVENT_HUGE); return mpi_errno; fn_fail: goto fn_exit; @@ -326,13 +326,13 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_ssend_ack_event(struct fi_cq_tagged_entry { int mpi_errno; MPIDI_OFI_ssendack_request_t *req = (MPIDI_OFI_ssendack_request_t *) sreq; - MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_NETMOD_OFI_SSEND_ACK_EVENT); - MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_NETMOD_OFI_SSEND_ACK_EVENT); + MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDI_OFI_SSEND_ACK_EVENT); + MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDI_OFI_SSEND_ACK_EVENT); mpi_errno = MPIDI_OFI_send_event(NULL, req->signal_req, MPIDI_OFI_REQUEST(req->signal_req, event_id)); MPL_free(req); - MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_NETMOD_OFI_SSEND_ACK_EVENT); + MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDI_OFI_SSEND_ACK_EVENT); return mpi_errno; } @@ -351,8 +351,8 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_get_huge_event(struct fi_cq_tagged_entry int mpi_errno = MPI_SUCCESS; MPIDI_OFI_huge_recv_t *recv = (MPIDI_OFI_huge_recv_t *) req; uint64_t remote_key; - MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_NETMOD_OFI_GETHUGE_EVENT); - MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_NETMOD_OFI_GETHUGE_EVENT); + MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDI_OFI_GETHUGE_EVENT); + MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDI_OFI_GETHUGE_EVENT); if (recv->localreq && recv->cur_offset != 0) { /* If this is true, then the message has a posted * receive already and we'll be able to find the @@ -398,7 +398,7 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_get_huge_event(struct fi_cq_tagged_entry } fn_exit: - MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_NETMOD_OFI_GETHUGE_EVENT); + MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDI_OFI_GETHUGE_EVENT); return mpi_errno; fn_fail: goto fn_exit; @@ -408,8 +408,8 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_chunk_done_event(struct fi_cq_tagged_entr MPIR_Request * req) { int c; - MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_NETMOD_OFI_CHUNK_DONE_EVENT); - MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_NETMOD_OFI_CHUNK_DONE_EVENT); + MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDI_OFI_CHUNK_DONE_EVENT); + MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDI_OFI_CHUNK_DONE_EVENT); MPIDI_OFI_chunk_request *creq = (MPIDI_OFI_chunk_request *) req; MPIR_cc_decr(creq->parent->cc_ptr, &c); @@ -418,7 +418,7 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_chunk_done_event(struct fi_cq_tagged_entr MPIR_Request_free(creq->parent); MPL_free(creq); - MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_NETMOD_OFI_CHUNK_DONE_EVENT); + MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDI_OFI_CHUNK_DONE_EVENT); return MPI_SUCCESS; } @@ -426,8 +426,8 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_inject_emu_event(struct fi_cq_tagged_entr MPIR_Request * req) { int incomplete; - MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_NETMOD_OFI_INJECT_EMU_EVENT); - MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_NETMOD_OFI_INJECT_EMU_EVENT); + MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDI_OFI_INJECT_EMU_EVENT); + MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDI_OFI_INJECT_EMU_EVENT); MPIR_cc_decr(req->cc_ptr, &incomplete); @@ -437,45 +437,45 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_inject_emu_event(struct fi_cq_tagged_entr OPA_decr_int(&MPIDI_OFI_global.am_inflight_inject_emus); } - MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_NETMOD_OFI_INJECT_EMU_EVENT); + MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDI_OFI_INJECT_EMU_EVENT); return MPI_SUCCESS; } MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_rma_done_event(struct fi_cq_tagged_entry *wc, MPIR_Request * in_req) { - MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_CH4_OFI_RMA_DONE_EVENT); - MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_CH4_OFI_RMA_DONE_EVENT); + MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDI_OFI_RMA_DONE_EVENT); + MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDI_OFI_RMA_DONE_EVENT); MPIDI_OFI_win_request_t *req = (MPIDI_OFI_win_request_t *) in_req; MPIDI_OFI_win_request_complete(req); - MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_CH4_OFI_RMA_DONE_EVENT); + MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDI_OFI_RMA_DONE_EVENT); return MPI_SUCCESS; } MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_accept_probe_event(struct fi_cq_tagged_entry *wc, MPIR_Request * rreq) { - MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_CH4_OFI_ACCEPT_PROBE_EVENT); - MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_CH4_OFI_ACCEPT_PROBE_EVENT); + MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDI_OFI_ACCEPT_PROBE_EVENT); + MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDI_OFI_ACCEPT_PROBE_EVENT); MPIDI_OFI_dynamic_process_request_t *ctrl = (MPIDI_OFI_dynamic_process_request_t *) rreq; ctrl->source = MPIDI_OFI_cqe_get_source(wc); ctrl->tag = MPIDI_OFI_init_get_tag(wc->tag); ctrl->msglen = wc->len; ctrl->done = MPIDI_OFI_PEEK_FOUND; - MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_CH4_OFI_ACCEPT_PROBE_EVENT); + MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDI_OFI_ACCEPT_PROBE_EVENT); return MPI_SUCCESS; } MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_dynproc_done_event(struct fi_cq_tagged_entry *wc, MPIR_Request * rreq) { - MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_CH4_OFI_DYNPROC_DONE_EVENT); - MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_CH4_OFI_DYNPROC_DONE_EVENT); + MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDI_OFI_DYNPROC_DONE_EVENT); + MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDI_OFI_DYNPROC_DONE_EVENT); MPIDI_OFI_dynamic_process_request_t *ctrl = (MPIDI_OFI_dynamic_process_request_t *) rreq; ctrl->done++; - MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_CH4_OFI_DYNPROC_DONE_EVENT); + MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDI_OFI_DYNPROC_DONE_EVENT); return MPI_SUCCESS; } @@ -485,8 +485,8 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_am_isend_event(struct fi_cq_tagged_entry int mpi_errno = MPI_SUCCESS; MPIDI_OFI_am_header_t *msg_hdr; - MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_NETMOD_HANDLE_SEND_COMPLETION); - MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_NETMOD_HANDLE_SEND_COMPLETION); + MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDI_OFI_AM_ISEND_EVENT); + MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDI_OFI_AM_ISEND_EVENT); msg_hdr = &MPIDI_OFI_AMREQUEST_HDR(sreq, msg_hdr); MPID_Request_complete(sreq); /* FIXME: Should not call MPIDI in NM ? */ @@ -509,7 +509,7 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_am_isend_event(struct fi_cq_tagged_entry MPIR_ERR_POP(mpi_errno); fn_exit: - MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_NETMOD_HANDLE_SEND_COMPLETION); + MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDI_OFI_AM_ISEND_EVENT); return mpi_errno; fn_fail: goto fn_exit; @@ -520,11 +520,33 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_am_recv_event(struct fi_cq_tagged_entry * { int mpi_errno = MPI_SUCCESS; MPIDI_OFI_am_header_t *am_hdr; - MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_NETMOD_HANDLE_RECV_COMPLETION); - MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_NETMOD_HANDLE_RECV_COMPLETION); + MPIDI_OFI_am_unordered_msg_t *uo_msg = NULL; + fi_addr_t fi_src_addr; + uint16_t expected_seqno, next_seqno; + MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDI_OFI_AM_RECV_EVENT); + MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDI_OFI_AM_RECV_EVENT); am_hdr = (MPIDI_OFI_am_header_t *) wc->buf; + expected_seqno = MPIDI_OFI_am_get_next_recv_seqno(am_hdr->fi_src_addr); + if (am_hdr->seqno != expected_seqno) { + /* This message came earlier than the one that we were expecting. + * Put it in the queue to process it later. */ + MPL_DBG_MSG_FMT(MPIDI_CH4_DBG_GENERAL, TERSE, + (MPL_DBG_FDEST, + "Expected seqno=%d but got %d (am_type=%d addr=0x%016lx). " + "Enqueueing it to the queue.\n", + expected_seqno, am_hdr->seqno, am_hdr->am_type, am_hdr->fi_src_addr)); + mpi_errno = MPIDI_OFI_am_enqueue_unordered_msg(am_hdr); + if (mpi_errno != MPI_SUCCESS) + MPIR_ERR_POP(mpi_errno); + goto fn_exit; + } + + /* Received an expected message */ + repeat: + fi_src_addr = am_hdr->fi_src_addr; + next_seqno = am_hdr->seqno + 1; switch (am_hdr->am_type) { case MPIDI_AMTYPE_SHORT_HDR: mpi_errno = MPIDI_OFI_handle_short_am_hdr(am_hdr, am_hdr->payload); @@ -562,8 +584,22 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_am_recv_event(struct fi_cq_tagged_entry * MPIR_Assert(0); } + /* For the first iteration (=in case we can process the message just received + * from OFI immediately), uo_msg is NULL, so freeing it is no-op. + * Otherwise, free it here before getting another uo_msg. */ + MPL_free(uo_msg); + + /* See if we can process other messages in the queue */ + if ((uo_msg = MPIDI_OFI_am_claim_unordered_msg(fi_src_addr, next_seqno)) != NULL) { + am_hdr = &uo_msg->am_hdr; + goto repeat; + } + + /* Record the next expected sequence number from fi_src_addr */ + MPIDI_OFI_am_set_next_recv_seqno(fi_src_addr, next_seqno); + fn_exit: - MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_NETMOD_HANDLE_RECV_COMPLETION); + MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDI_OFI_AM_RECV_EVENT); return mpi_errno; fn_fail: goto fn_exit; @@ -576,8 +612,8 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_am_read_event(struct fi_cq_tagged_entry * MPIR_Request *rreq; MPIDI_OFI_am_request_t *ofi_req; - MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_NETMOD_HANDLE_READ_COMPLETION); - MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_NETMOD_HANDLE_READ_COMPLETION); + MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDI_OFI_AM_READ_EVENT); + MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDI_OFI_AM_READ_EVENT); ofi_req = MPL_container_of(wc->op_context, MPIDI_OFI_am_request_t, context); ofi_req->req_hdr->lmt_cntr--; @@ -598,7 +634,7 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_am_read_event(struct fi_cq_tagged_entry * ofi_req->req_hdr->target_cmpl_cb(rreq); fn_exit: MPIDIU_release_buf((void *) ofi_req); - MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_NETMOD_HANDLE_READ_COMPLETION); + MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDI_OFI_AM_READ_EVENT); return mpi_errno; fn_fail: goto fn_exit; @@ -608,12 +644,12 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_am_repost_event(struct fi_cq_tagged_entry MPIR_Request * rreq) { int mpi_errno = MPI_SUCCESS; - MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_NETMOD_REPOST_BUFFER); - MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_NETMOD_REPOST_BUFFER); + MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDI_OFI_AM_REPOST_EVENT); + MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDI_OFI_AM_REPOST_EVENT); mpi_errno = MPIDI_OFI_repost_buffer(wc->op_context, rreq); - MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_NETMOD_REPOST_BUFFER); + MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDI_OFI_AM_REPOST_EVENT); return mpi_errno; } @@ -748,8 +784,8 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_handle_cq_entries(struct fi_cq_tagged_ent { int i, mpi_errno = MPI_SUCCESS; MPIR_Request *req; - MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_NETMOD_HANDLE_CQ_ENTRIES); - MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_NETMOD_HANDLE_CQ_ENTRIES); + MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDI_OFI_HANDLE_CQ_ENTRIES); + MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDI_OFI_HANDLE_CQ_ENTRIES); for (i = 0; i < num; i++) { req = MPIDI_OFI_context_to_request(wc[i].op_context); @@ -757,7 +793,7 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_handle_cq_entries(struct fi_cq_tagged_ent } fn_exit: - MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_NETMOD_HANDLE_CQ_ENTRIES); + MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDI_OFI_HANDLE_CQ_ENTRIES); return mpi_errno; fn_fail: goto fn_exit; @@ -768,8 +804,8 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_handle_cq_error(int vci_idx, ssize_t ret) int mpi_errno = MPI_SUCCESS; struct fi_cq_err_entry e; MPIR_Request *req; - MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_NETMOD_HANDLE_CQ_ERROR); - MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_NETMOD_HANDLE_CQ_ERROR); + MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDI_OFI_HANDLE_CQ_ERROR); + MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDI_OFI_HANDLE_CQ_ERROR); switch (ret) { case -FI_EAVAIL: @@ -825,7 +861,7 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_handle_cq_error(int vci_idx, ssize_t ret) } fn_exit: - MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_NETMOD_HANDLE_CQ_ERROR); + MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDI_OFI_HANDLE_CQ_ERROR); return mpi_errno; fn_fail: goto fn_exit; diff --git a/src/mpid/ch4/netmod/ofi/ofi_init.c b/src/mpid/ch4/netmod/ofi/ofi_init.c index d397ee431ea..663ec89c93e 100644 --- a/src/mpid/ch4/netmod/ofi/ofi_init.c +++ b/src/mpid/ch4/netmod/ofi/ofi_init.c @@ -932,6 +932,10 @@ int MPIDI_OFI_mpi_init_hook(int rank, int size, int appnum, int *tag_bits, MPIR_ FI_OPT_ENDPOINT, FI_OPT_MIN_MULTI_RECV, &optlen, sizeof(optlen)), setopt); + MPIDIU_map_create(&MPIDI_OFI_global.am_recv_seq_tracker, MPL_MEM_BUFFER); + MPIDIU_map_create(&MPIDI_OFI_global.am_send_seq_tracker, MPL_MEM_BUFFER); + MPIDI_OFI_global.am_unordered_msgs = NULL; + for (i = 0; i < MPIDI_OFI_NUM_AM_BUFFERS; i++) { MPIDI_OFI_global.am_bufs[i] = MPL_malloc(MPIDI_OFI_AM_BUFF_SZ, MPL_MEM_BUFFER); MPIDI_OFI_global.am_reqs[i].event_id = MPIDI_OFI_EVENT_AM_RECV; @@ -1073,6 +1077,13 @@ int MPIDI_OFI_mpi_finalize_hook(void) MPIDIU_map_destroy(MPIDI_OFI_global.win_map); if (MPIDI_OFI_ENABLE_AM) { + while (MPIDI_OFI_global.am_unordered_msgs) { + MPIDI_OFI_am_unordered_msg_t *uo_msg = MPIDI_OFI_global.am_unordered_msgs; + DL_DELETE(MPIDI_OFI_global.am_unordered_msgs, uo_msg); + } + MPIDIU_map_destroy(MPIDI_OFI_global.am_send_seq_tracker); + MPIDIU_map_destroy(MPIDI_OFI_global.am_recv_seq_tracker); + for (i = 0; i < MPIDI_OFI_NUM_AM_BUFFERS; i++) MPL_free(MPIDI_OFI_global.am_bufs[i]); diff --git a/src/mpid/ch4/netmod/ofi/ofi_pre.h b/src/mpid/ch4/netmod/ofi/ofi_pre.h index 227aa9a6629..881f46a28cd 100644 --- a/src/mpid/ch4/netmod/ofi/ofi_pre.h +++ b/src/mpid/ch4/netmod/ofi/ofi_pre.h @@ -74,9 +74,22 @@ typedef struct MPIDI_OFI_am_header_t { uint64_t am_type:MPIDI_OFI_AM_TYPE_BITS; uint64_t am_hdr_sz:MPIDI_OFI_AM_HDR_SZ_BITS; uint64_t data_sz:MPIDI_OFI_AM_DATA_SZ_BITS; + uint16_t seqno; /* Sequence number of this message. + * Number is unique to (fi_src_addr, fi_dest_addr) pair. */ + fi_addr_t fi_src_addr; /* OFI address of the sender */ uint64_t payload[0]; } MPIDI_OFI_am_header_t; +/* Represents early-arrived active messages. + * Queued to MPIDI_OFI_global.am_unordered_msgs */ +typedef struct MPIDI_OFI_am_unordered_msg { + struct MPIDI_OFI_am_unordered_msg *next; + struct MPIDI_OFI_am_unordered_msg *prev; + MPIDI_OFI_am_header_t am_hdr; + /* This is used as a variable-length structure. + * Additional memory region may follow. */ +} MPIDI_OFI_am_unordered_msg_t; + typedef struct { MPIDI_OFI_am_header_t hdr; MPIDI_OFI_ack_msg_payload_t pyld; diff --git a/src/mpid/ch4/netmod/ofi/ofi_types.h b/src/mpid/ch4/netmod/ofi/ofi_types.h index 52ec0e31201..95a4b55deb6 100644 --- a/src/mpid/ch4/netmod/ofi/ofi_types.h +++ b/src/mpid/ch4/netmod/ofi/ofi_types.h @@ -353,6 +353,11 @@ typedef struct { MPIDIU_buf_pool_t *am_buf_pool; OPA_int_t am_inflight_inject_emus; OPA_int_t am_inflight_rma_send_mrs; + /* Sequence number trackers for active messages */ + void *am_send_seq_tracker; + void *am_recv_seq_tracker; + /* Queue (utlist) to store early-arrival active messages */ + MPIDI_OFI_am_unordered_msg_t *am_unordered_msgs; /* Completion queue buffering */ MPIDI_OFI_cq_buff_entry_t cq_buffered_static_list[MPIDI_OFI_NUM_CQ_BUFFERED]; diff --git a/src/mpid/ch4/src/ch4_impl.h b/src/mpid/ch4/src/ch4_impl.h index 73d8afd3df4..3d655911002 100644 --- a/src/mpid/ch4/src/ch4_impl.h +++ b/src/mpid/ch4/src/ch4_impl.h @@ -784,15 +784,23 @@ MPL_STATIC_INLINE_PREFIX void MPIDIU_map_create(void **out_map, MPL_memory_class MPL_STATIC_INLINE_PREFIX void MPIDIU_map_destroy(void *in_map) { MPIDIU_map_t *map = in_map; + MPIDIU_map_entry_t *e, *etmp; + HASH_ITER(hh, map->head, e, etmp) { + /* Free all remaining entries in the hash */ + HASH_DELETE(hh, map->head, e); + MPL_free(e); + } HASH_CLEAR(hh, map->head); MPL_free(map); } -MPL_STATIC_INLINE_PREFIX void MPIDIU_map_set(void *in_map, uint64_t id, void *val, - MPL_memory_class class) +MPL_STATIC_INLINE_PREFIX void MPIDIU_map_set_unsafe(void *in_map, uint64_t id, void *val, + MPL_memory_class class) { MPIDIU_map_t *map; MPIDIU_map_entry_t *map_entry; + /* MPIDIU_MAP_NOT_FOUND may be used as a special value to indicate an error. */ + MPIR_Assert(val != MPIDIU_MAP_NOT_FOUND); map = (MPIDIU_map_t *) in_map; map_entry = MPL_malloc(sizeof(MPIDIU_map_entry_t), class); MPIR_Assert(map_entry != NULL); @@ -801,6 +809,15 @@ MPL_STATIC_INLINE_PREFIX void MPIDIU_map_set(void *in_map, uint64_t id, void *va HASH_ADD(hh, map->head, key, sizeof(uint64_t), map_entry, class); } +/* Sets a (id -> val) pair into the map, assuming there's no entry with `id`. */ +MPL_STATIC_INLINE_PREFIX void MPIDIU_map_set(void *in_map, uint64_t id, void *val, + MPL_memory_class class) +{ + MPID_THREAD_CS_ENTER(POBJ, MPIDIU_THREAD_UTIL_MUTEX); + MPIDIU_map_set_unsafe(in_map, id, val, class); + MPID_THREAD_CS_EXIT(POBJ, MPIDIU_THREAD_UTIL_MUTEX); +} + MPL_STATIC_INLINE_PREFIX void MPIDIU_map_erase(void *in_map, uint64_t id) { MPIDIU_map_t *map; @@ -827,6 +844,29 @@ MPL_STATIC_INLINE_PREFIX void *MPIDIU_map_lookup(void *in_map, uint64_t id) return rc; } +/* Updates a value in the map which has `id` as a key. + If `id` does not exist in the map, it will be added. Returns the old value. */ +MPL_STATIC_INLINE_PREFIX void *MPIDIU_map_update(void *in_map, uint64_t id, void *new_val, + MPL_memory_class class) +{ + void *rc; + MPIDIU_map_t *map; + MPIDIU_map_entry_t *map_entry; + + MPID_THREAD_CS_ENTER(POBJ, MPIDI_THREAD_UTIL_MUTEX); + map = (MPIDIU_map_t *) in_map; + HASH_FIND(hh, map->head, &id, sizeof(uint64_t), map_entry); + if (map_entry == NULL) { + rc = MPIDIU_MAP_NOT_FOUND; + MPIDIU_map_set_unsafe(in_map, id, new_val, class); + } else { + rc = map_entry->value; + map_entry->value = new_val; + } + MPID_THREAD_CS_EXIT(POBJ, MPIDI_THREAD_UTIL_MUTEX); + return rc; +} + /* Wait until active message acc ops are done. */ MPL_STATIC_INLINE_PREFIX int MPIDIG_wait_am_acc(MPIR_Win * win, int target_rank, int order_needed) {