Skip to content

Commit

Permalink
smp optimization for subcomms, alltoall
Browse files Browse the repository at this point in the history
  • Loading branch information
jjwilke committed Apr 3, 2019
1 parent 622b0ee commit b8239e9
Show file tree
Hide file tree
Showing 12 changed files with 103 additions and 16 deletions.
2 changes: 2 additions & 0 deletions sstmac/hardware/merlin/merlin_nic.cc
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,8 @@ class MerlinNIC :
auto* req = link_control_->recv(vn);
while (req){
MyRequest* myreq = static_cast<MyRequest*>(req);
uint64_t size = req->size_in_bits/8;
Timestamp delay = size / link_control_->bw
#if MERLIN_DEBUG_PACKET
if (myreq->flow_id == -1){
std::cout << "Got packet of size " << (req->size_in_bits/8) << " at t=" << now().sec() << std::endl;
Expand Down
8 changes: 8 additions & 0 deletions sstmac/test_skeletons/mpi_smp_collectives.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,14 @@ int USER_MAIN(int argc, char** argv)
for (int r=0; r < nrepeat; ++r){
MPI_Allreduce(nullptr, nullptr, 1000, MPI_INT, MPI_SUM, MPI_COMM_WORLD);
}
MPI_Alltoall(nullptr, 1000, MPI_INT, nullptr, 1000, MPI_INT, MPI_COMM_WORLD);
MPI_Allgather(nullptr, 1000, MPI_INT, nullptr, 1000, MPI_INT, MPI_COMM_WORLD);

MPI_Comm subComm;
MPI_Comm_split(MPI_COMM_WORLD, me % 2, me, &subComm);
MPI_Alltoall(nullptr, 1000, MPI_INT, nullptr, 1000, MPI_INT, subComm);

MPI_Comm_free(&subComm);

MPI_Finalize();
return 0;
Expand Down
6 changes: 6 additions & 0 deletions sumi-mpi/mpi_api_comm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ MpiApi::commSplit(MPI_Comm incomm, int color, int key, MPI_Comm *outcomm)
start_comm_call(MPI_Comm_split,incomm);
MpiComm* incommPtr = getComm(incomm);
MpiComm* outcommPtr = comm_factory_.commSplit(incommPtr, color, key);

addCommPtr(outcommPtr, outcomm);
mpi_api_debug(sprockit::dbg::mpi,
"MPI_Comm_split(%s,%d,%d,*%s) exit",
Expand All @@ -278,8 +279,13 @@ MpiApi::commSplit(MPI_Comm incomm, int color, int key, MPI_Comm *outcomm)
//but also assign an id to the underlying group
if (outcommPtr->id() != MPI_COMM_NULL){
outcommPtr->group()->setId(group_counter_++);
if (smp_optimize_){
outcommPtr->createSmpCommunicator(smp_neighbors_, engine(), Message::default_cq);
}
}



endAPICall();
#ifdef SSTMAC_OTF2_ENABLED
if (otf2_writer_){
Expand Down
14 changes: 14 additions & 0 deletions sumi-mpi/mpi_comm/mpi_comm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,20 @@ MpiComm::globalRankSetIntersection(const std::set<int> &neighbors) const
}
}

std::set<int>
MpiComm::commNeighbors(const std::set<int>& commWorldNeighbors) const
{
std::set<int> ret;
for (int i=0; i < nproc(); ++i){
int glbl = peerTask(i);
auto iter = commWorldNeighbors.find(glbl);
if (iter != commWorldNeighbors.end()){
ret.insert(i);
}
}
return ret;
}

void
MpiComm::deleteStatics()
{
Expand Down
2 changes: 2 additions & 0 deletions sumi-mpi/mpi_comm/mpi_comm.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ class MpiComm : public Communicator
return int(peerTask(comm_rank));
}

std::set<int> commNeighbors(const std::set<int>& commWorldNeighbors) const;

int globalToCommRank(int global_rank) const override;

int nproc() const override {
Expand Down
5 changes: 3 additions & 2 deletions sumi/collective_actor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -667,8 +667,9 @@ DagCollectiveActor::dataRecved(
Action* ac = active_comms_[id];
if (ac == nullptr){
spkt_throw_printf(sprockit::ValueError,
"on %d, received data for unknown receive %u from %d on round %d",
dom_me_, id, msg->domSender(), msg->round());
"on %d, received data for unknown receive %u from %d on round %d\n%s",
dom_me_, id, msg->domSender(), msg->round(),
msg->toString().c_str());
}

dataRecved(ac, msg, recvd_buffer);
Expand Down
18 changes: 12 additions & 6 deletions sumi/communicator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,17 +67,22 @@ Communicator::createSmpCommunicator(const std::set<int>& neighbors, CollectiveEn
auto neighbors_subset = globalRankSetIntersection(neighbors);
if (neighbors_subset.size() == 1) return; //no smp parallelism


int myGlobalRank = commToGlobalRank(my_comm_rank_);
if (neighbors_subset.size() > 1){ //there is smp parallelism to be had here
int idx = 0;
int my_smp_rank = 0;
std::vector<int> local_to_global(neighbors_subset.size());
for (int rank : neighbors_subset){
local_to_global[idx] = rank;
if (rank == this->myCommRank()){
for (int glblRank : neighbors_subset){
local_to_global[idx] = glblRank;
if (glblRank == myGlobalRank){
my_smp_rank = idx;
}
idx++;
}



smp_comm_ = new MapCommunicator(my_smp_rank, std::move(local_to_global));
if (smp_comm_->nproc() == 0){
spkt_abort_printf("Created SMP communicator of size 0!"
Expand All @@ -86,19 +91,20 @@ Communicator::createSmpCommunicator(const std::set<int>& neighbors, CollectiveEn
}

std::vector<int> smp_ranks(this->nproc());
engine->allgather(smp_ranks.data(), &my_smp_rank, 1, sizeof(int), -2, cq_id);
int tag = -2;
engine->allgather(smp_ranks.data(), &my_smp_rank, 1, sizeof(int), tag, cq_id, this);
engine->blockUntilNext(cq_id);

std::map<int,int> rank_counts;
int my_owner_rank = -1;
if (my_smp_rank == 0){
int my_owner_rank = -1;
std::vector<int> owner_to_global;
idx = 0;
for (int rank=0; rank < this->nproc(); ++rank){
int local_smp_rank = smp_ranks[rank];
rank_counts[local_smp_rank]++;
if (local_smp_rank == 0){
owner_to_global.push_back(rank);
owner_to_global.push_back(commToGlobalRank(rank));
if (rank == this->myCommRank()){
my_owner_rank = idx;
}
Expand Down
39 changes: 36 additions & 3 deletions sumi/sim_transport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -689,9 +689,40 @@ CollectiveEngine::alltoall(void *dst, void *src, int nelems, int type_size, int

if (!comm) comm = global_domain_;

DagCollective* coll = AllToAllCollective::getBuilderLibrary("macro")->getBuilder(alltoall_type_)
->create(this, dst, src, nelems, type_size, tag, cq_id, comm);
return startCollective(coll);
auto* fact = AllToAllCollective::getBuilderLibrary("macro");
auto* builder = fact->getBuilder(alltoall_type_);
if (!builder){
spkt_abort_printf("invalid alltoall type requested: %s", allgather_type_.c_str());
}

if (comm->smpComm() && comm->smpBalanced()){
int smpSize = comm->smpComm()->nproc();
void* intraDst = dst ? new char[nelems*type_size*smpSize] : nullptr;
int intra_tag = 1<<28 | tag;

BtreeGather* intra = new BtreeGather(this, 0, intraDst, src, smpSize*nelems,
type_size, intra_tag, cq_id, comm->smpComm());
DagCollective* prev;
if (comm->ownerComm()){
int inter_tag = 2<<28 | tag;
AllToAllCollective* inter = builder->create(this, dst, intraDst, smpSize*nelems,
type_size, inter_tag, cq_id, comm->ownerComm());
intra->setSubsequent(inter);
prev = inter;
} else {
prev = intra;
}
int bcast_tag = 3<<28 | tag;
auto* bcast = new BinaryTreeBcastCollective(this, 0, dst, comm->nproc()*nelems,
type_size, bcast_tag, cq_id, comm->smpComm());
prev->setSubsequent(bcast);
auto* final = new DoNothingCollective(this, tag, cq_id, comm);
bcast->setSubsequent(final);
return startCollective(intra);
} else {
AllToAllCollective* coll = builder->create(this, dst, src, nelems, type_size, tag, cq_id, comm);
return startCollective(coll);
}
}

CollectiveDoneMessage*
Expand Down Expand Up @@ -732,12 +763,14 @@ CollectiveEngine::allgather(void *dst, void *src, int nelems, int type_size, int
int intra_tag = 1<<28 | tag;



AllgatherCollective* intra = builder->create(this, intraDst, src, nelems,
type_size, intra_tag, cq_id, comm->smpComm());

DagCollective* prev;
if (comm->ownerComm()){
int inter_tag = 2<<28 | tag;

AllgatherCollective* inter = builder->create(this, dst, intraDst, smpSize*nelems, type_size,
inter_tag, cq_id, comm->ownerComm());
intra->setSubsequent(inter);
Expand Down
2 changes: 1 addition & 1 deletion tests/reference/test_core_apps_direct_alltoall.ref-out
Original file line number Diff line number Diff line change
@@ -1 +1 @@
Estimated total runtime of 0.01380041 seconds
Estimated total runtime of 0.00209360 seconds
Original file line number Diff line number Diff line change
@@ -1 +1 @@
Estimated total runtime of 0.00001778 seconds
Estimated total runtime of 0.00386767 seconds
Original file line number Diff line number Diff line change
@@ -1 +1 @@
Estimated total runtime of 0.00002888 seconds
Estimated total runtime of 0.12326445 seconds
19 changes: 17 additions & 2 deletions tests/test_configs/test_smp_collectives_optimized.ini
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,25 @@ node {
indexing = block
allocation = first_available
name = mpi_smp_collectives
launch_cmd = aprun -n 256 -N 4
launch_cmd = aprun -n 64 -N 4
start = 0ms
message_size = 1KB
mpi {
alltoall = bruck
allgather = bruck
smp_optimize = true
}
}
app2 {
indexing = block
allocation = first_available
name = mpi_smp_collectives
launch_cmd = aprun -n 64 -N 4
start = 0ms
message_size = 1KB
mpi {
alltoall = direct
allgather = ring
smp_optimize = true
}
}
Expand Down Expand Up @@ -64,7 +79,7 @@ switch {

topology {
name = torus
geometry = [4,2,4]
geometry = [4,3,4]
concentration = 2
}

Expand Down

0 comments on commit b8239e9

Please sign in to comment.