Skip to content

Commit

Permalink
Merge pull request #3133 from hajimefu/ofi-am-reordering-pub
Browse files Browse the repository at this point in the history
ch4/ofi: Implement reorder logic in AM transport
  • Loading branch information
raffenet authored Apr 23, 2019
2 parents 5899c8d + 633b4ed commit aa91671
Show file tree
Hide file tree
Showing 7 changed files with 285 additions and 59 deletions.
80 changes: 80 additions & 0 deletions src/mpid/ch4/netmod/ofi/ofi_am_events.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,83 @@

#include "ofi_am_impl.h"

MPL_STATIC_INLINE_PREFIX uint16_t MPIDI_OFI_am_get_next_recv_seqno(fi_addr_t addr)
{
uint64_t id = addr;
void *r;

r = MPIDIU_map_lookup(MPIDI_OFI_global.am_recv_seq_tracker, id);
if (r == MPIDIU_MAP_NOT_FOUND) {
MPL_DBG_MSG_FMT(MPIDI_CH4_DBG_GENERAL, VERBOSE,
(MPL_DBG_FDEST, "First time adding recv seqno addr=0x%016lx\n", addr));
MPIDIU_map_set(MPIDI_OFI_global.am_recv_seq_tracker, id, 0, MPL_MEM_OTHER);
return 0;
} else {
return (uint16_t) (uintptr_t) r;
}
}

MPL_STATIC_INLINE_PREFIX void MPIDI_OFI_am_set_next_recv_seqno(fi_addr_t addr, uint16_t seqno)
{
uint64_t id = addr;

MPL_DBG_MSG_FMT(MPIDI_CH4_DBG_GENERAL, VERBOSE,
(MPL_DBG_FDEST, "Next recv seqno=%d addr=0x%016lx\n", seqno, addr));

MPIDIU_map_update(MPIDI_OFI_global.am_recv_seq_tracker, id, (void *) (uintptr_t) seqno,
MPL_MEM_OTHER);
}

MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_am_enqueue_unordered_msg(const MPIDI_OFI_am_header_t *
am_hdr)
{
MPIDI_OFI_am_unordered_msg_t *uo_msg;
size_t uo_msg_len, packet_len;
/* Essentially, uo_msg_len == packet_len + sizeof(next,prev pointers) */

uo_msg_len = sizeof(*uo_msg) + am_hdr->am_hdr_sz + am_hdr->data_sz;

/* Allocate a new memory region to store this unordered message.
* We are doing this because the original am_hdr comes from FI_MULTI_RECV
* buffer, which may be reused soon by OFI. */
uo_msg = MPL_malloc(uo_msg_len, MPL_MEM_BUFFER);
if (uo_msg == NULL)
return MPI_ERR_NO_MEM;

packet_len = sizeof(*am_hdr) + am_hdr->am_hdr_sz + am_hdr->data_sz;
MPIR_Memcpy(&uo_msg->am_hdr, am_hdr, packet_len);

DL_APPEND(MPIDI_OFI_global.am_unordered_msgs, uo_msg);

return MPI_SUCCESS;
}

/* Find and dequeue a message that matches (comm, src_rank, seqno), then return it.
* Caller must free the returned pointer. */
MPL_STATIC_INLINE_PREFIX MPIDI_OFI_am_unordered_msg_t
* MPIDI_OFI_am_claim_unordered_msg(fi_addr_t addr, uint16_t seqno)
{
MPIDI_OFI_am_unordered_msg_t *uo_msg;

/* Future optimization note:
* Currently we are doing linear search every time, assuming that the number of items
* in the queue is extremely small.
* If it's not the case, we should consider using better data structure and algorithm
* to look up. */
DL_FOREACH(MPIDI_OFI_global.am_unordered_msgs, uo_msg) {
if (uo_msg->am_hdr.fi_src_addr == addr && uo_msg->am_hdr.seqno == seqno) {
MPL_DBG_MSG_FMT(MPIDI_CH4_DBG_GENERAL, TERSE,
(MPL_DBG_FDEST,
"Found unordered message in the queue: addr=0x%016lx, seqno=%d\n",
addr, seqno));
DL_DELETE(MPIDI_OFI_global.am_unordered_msgs, uo_msg);
return uo_msg;
}
}

return NULL;
}

static inline int MPIDI_OFI_handle_short_am(MPIDI_OFI_am_header_t * msg_hdr)
{
int mpi_errno = MPI_SUCCESS;
Expand Down Expand Up @@ -346,6 +423,9 @@ static inline int MPIDI_OFI_dispatch_ack(int rank, int context_id, uint64_t sreq
msg.hdr.am_hdr_sz = sizeof(msg.pyld);
msg.hdr.data_sz = 0;
msg.hdr.am_type = am_type;
msg.hdr.seqno = MPIDI_OFI_am_fetch_incr_send_seqno(comm, rank);
msg.hdr.fi_src_addr
= MPIDI_OFI_comm_to_phys(MPIR_Process.comm_world, MPIR_Process.comm_world->rank);
msg.pyld.sreq_ptr = sreq_ptr;
MPIDI_OFI_CALL_RETRY_AM(fi_inject(MPIDI_OFI_global.ctx[0].tx, &msg, sizeof(msg),
MPIDI_OFI_comm_to_phys(comm, rank)),
Expand Down
41 changes: 41 additions & 0 deletions src/mpid/ch4/netmod/ofi/ofi_am_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,35 @@

static inline int MPIDI_OFI_progress_do_queue(int vci_idx);

/* Acquire a sequence number to send, and record the next number */
MPL_STATIC_INLINE_PREFIX uint16_t MPIDI_OFI_am_fetch_incr_send_seqno(MPIR_Comm * comm,
int dest_rank)
{
fi_addr_t addr = MPIDI_OFI_comm_to_phys(comm, dest_rank);
uint64_t id = addr;
uint16_t seq, old_seq;
void *ret;
ret = MPIDIU_map_lookup(MPIDI_OFI_global.am_send_seq_tracker, id);
if (ret == MPIDIU_MAP_NOT_FOUND)
old_seq = 0;
else
old_seq = (uint16_t) (uintptr_t) ret;

seq = old_seq + 1;
MPIDIU_map_update(MPIDI_OFI_global.am_send_seq_tracker, id, (void *) (uintptr_t) seq,
MPL_MEM_OTHER);

MPL_DBG_MSG_FMT(MPIDI_CH4_DBG_GENERAL, VERBOSE,
(MPL_DBG_FDEST,
"Generated seqno=%d for dest_rank=%d "
"(context_id=0x%08x, src_addr=0x%016lx, dest_addr=0x%016lx)\n",
old_seq, dest_rank, comm->context_id,
MPIDI_OFI_comm_to_phys(MPIR_Process.comm_world, MPIR_Process.comm_world->rank),
addr));

return old_seq;
}

/*
Per-object lock for OFI
Expand Down Expand Up @@ -193,6 +222,9 @@ static inline int MPIDI_OFI_do_am_isend_header(int rank,
msg_hdr->am_hdr_sz = am_hdr_sz;
msg_hdr->data_sz = 0;
msg_hdr->am_type = MPIDI_AMTYPE_SHORT_HDR;
msg_hdr->seqno = MPIDI_OFI_am_fetch_incr_send_seqno(comm, rank);
msg_hdr->fi_src_addr
= MPIDI_OFI_comm_to_phys(MPIR_Process.comm_world, MPIR_Process.comm_world->rank);

MPIR_Assert((uint64_t) comm->rank < (1ULL << MPIDI_OFI_AM_RANK_BITS));

Expand Down Expand Up @@ -245,6 +277,9 @@ static inline int MPIDI_OFI_am_isend_long(int rank,
msg_hdr->am_hdr_sz = am_hdr_sz;
msg_hdr->data_sz = data_sz;
msg_hdr->am_type = MPIDI_AMTYPE_LMT_REQ;
msg_hdr->seqno = MPIDI_OFI_am_fetch_incr_send_seqno(comm, rank);
msg_hdr->fi_src_addr
= MPIDI_OFI_comm_to_phys(MPIR_Process.comm_world, MPIR_Process.comm_world->rank);

lmt_info = &MPIDI_OFI_AMREQUEST_HDR(sreq, lmt_info);
lmt_info->context_id = comm->context_id;
Expand Down Expand Up @@ -332,6 +367,9 @@ static inline int MPIDI_OFI_am_isend_short(int rank,
msg_hdr->am_hdr_sz = am_hdr_sz;
msg_hdr->data_sz = count;
msg_hdr->am_type = MPIDI_AMTYPE_SHORT;
msg_hdr->seqno = MPIDI_OFI_am_fetch_incr_send_seqno(comm, rank);
msg_hdr->fi_src_addr
= MPIDI_OFI_comm_to_phys(MPIR_Process.comm_world, MPIR_Process.comm_world->rank);

iov = MPIDI_OFI_AMREQUEST_HDR(sreq, iov);

Expand Down Expand Up @@ -499,6 +537,9 @@ static inline int MPIDI_OFI_do_inject(int rank,
msg_hdr.am_hdr_sz = am_hdr_sz;
msg_hdr.data_sz = 0;
msg_hdr.am_type = MPIDI_AMTYPE_SHORT_HDR;
msg_hdr.seqno = MPIDI_OFI_am_fetch_incr_send_seqno(comm, rank);
msg_hdr.fi_src_addr
= MPIDI_OFI_comm_to_phys(MPIR_Process.comm_world, MPIR_Process.comm_world->rank);

MPIR_Assert((uint64_t) comm->rank < (1ULL << MPIDI_OFI_AM_RANK_BITS));

Expand Down
Loading

0 comments on commit aa91671

Please sign in to comment.