From b8239e986ea7aed1f0d60e0c0532db696ae8fe71 Mon Sep 17 00:00:00 2001 From: jjwilke Date: Wed, 3 Apr 2019 14:36:59 -0700 Subject: [PATCH] smp optimization for subcomms, alltoall --- sstmac/hardware/merlin/merlin_nic.cc | 2 + sstmac/test_skeletons/mpi_smp_collectives.cc | 8 ++++ sumi-mpi/mpi_api_comm.cc | 6 +++ sumi-mpi/mpi_comm/mpi_comm.cc | 14 +++++++ sumi-mpi/mpi_comm/mpi_comm.h | 2 + sumi/collective_actor.cc | 5 ++- sumi/communicator.cc | 18 ++++++--- sumi/sim_transport.cc | 39 +++++++++++++++++-- .../test_core_apps_direct_alltoall.ref-out | 2 +- ...ore_apps_smp_collectives_optimized.ref-out | 2 +- ...e_apps_smp_collectives_unoptimized.ref-out | 2 +- .../test_smp_collectives_optimized.ini | 19 ++++++++- 12 files changed, 103 insertions(+), 16 deletions(-) diff --git a/sstmac/hardware/merlin/merlin_nic.cc b/sstmac/hardware/merlin/merlin_nic.cc index 240935fb9..85ca61b20 100644 --- a/sstmac/hardware/merlin/merlin_nic.cc +++ b/sstmac/hardware/merlin/merlin_nic.cc @@ -191,6 +191,8 @@ class MerlinNIC : auto* req = link_control_->recv(vn); while (req){ MyRequest* myreq = static_cast(req); + uint64_t size = req->size_in_bits/8; + Timestamp delay = size / link_control_->bw #if MERLIN_DEBUG_PACKET if (myreq->flow_id == -1){ std::cout << "Got packet of size " << (req->size_in_bits/8) << " at t=" << now().sec() << std::endl; diff --git a/sstmac/test_skeletons/mpi_smp_collectives.cc b/sstmac/test_skeletons/mpi_smp_collectives.cc index cf7b7d307..b6bb4e6cd 100644 --- a/sstmac/test_skeletons/mpi_smp_collectives.cc +++ b/sstmac/test_skeletons/mpi_smp_collectives.cc @@ -66,6 +66,14 @@ int USER_MAIN(int argc, char** argv) for (int r=0; r < nrepeat; ++r){ MPI_Allreduce(nullptr, nullptr, 1000, MPI_INT, MPI_SUM, MPI_COMM_WORLD); } + MPI_Alltoall(nullptr, 1000, MPI_INT, nullptr, 1000, MPI_INT, MPI_COMM_WORLD); + MPI_Allgather(nullptr, 1000, MPI_INT, nullptr, 1000, MPI_INT, MPI_COMM_WORLD); + + MPI_Comm subComm; + MPI_Comm_split(MPI_COMM_WORLD, me % 2, me, &subComm); + MPI_Alltoall(nullptr, 1000, MPI_INT, nullptr, 1000, MPI_INT, subComm); + + MPI_Comm_free(&subComm); MPI_Finalize(); return 0; diff --git a/sumi-mpi/mpi_api_comm.cc b/sumi-mpi/mpi_api_comm.cc index 648c6285d..073f31df6 100644 --- a/sumi-mpi/mpi_api_comm.cc +++ b/sumi-mpi/mpi_api_comm.cc @@ -270,6 +270,7 @@ MpiApi::commSplit(MPI_Comm incomm, int color, int key, MPI_Comm *outcomm) start_comm_call(MPI_Comm_split,incomm); MpiComm* incommPtr = getComm(incomm); MpiComm* outcommPtr = comm_factory_.commSplit(incommPtr, color, key); + addCommPtr(outcommPtr, outcomm); mpi_api_debug(sprockit::dbg::mpi, "MPI_Comm_split(%s,%d,%d,*%s) exit", @@ -278,8 +279,13 @@ MpiApi::commSplit(MPI_Comm incomm, int color, int key, MPI_Comm *outcomm) //but also assign an id to the underlying group if (outcommPtr->id() != MPI_COMM_NULL){ outcommPtr->group()->setId(group_counter_++); + if (smp_optimize_){ + outcommPtr->createSmpCommunicator(smp_neighbors_, engine(), Message::default_cq); + } } + + endAPICall(); #ifdef SSTMAC_OTF2_ENABLED if (otf2_writer_){ diff --git a/sumi-mpi/mpi_comm/mpi_comm.cc b/sumi-mpi/mpi_comm/mpi_comm.cc index 790595d1a..4c73483f8 100644 --- a/sumi-mpi/mpi_comm/mpi_comm.cc +++ b/sumi-mpi/mpi_comm/mpi_comm.cc @@ -111,6 +111,20 @@ MpiComm::globalRankSetIntersection(const std::set &neighbors) const } } +std::set +MpiComm::commNeighbors(const std::set& commWorldNeighbors) const +{ + std::set ret; + for (int i=0; i < nproc(); ++i){ + int glbl = peerTask(i); + auto iter = commWorldNeighbors.find(glbl); + if (iter != commWorldNeighbors.end()){ + ret.insert(i); + } + } + return ret; +} + void MpiComm::deleteStatics() { diff --git a/sumi-mpi/mpi_comm/mpi_comm.h b/sumi-mpi/mpi_comm/mpi_comm.h index cbf00133d..2aeb30ef9 100644 --- a/sumi-mpi/mpi_comm/mpi_comm.h +++ b/sumi-mpi/mpi_comm/mpi_comm.h @@ -148,6 +148,8 @@ class MpiComm : public Communicator return int(peerTask(comm_rank)); } + std::set commNeighbors(const std::set& commWorldNeighbors) const; + int globalToCommRank(int global_rank) const override; int nproc() const override { diff --git a/sumi/collective_actor.cc b/sumi/collective_actor.cc index 8cd429523..825432b17 100644 --- a/sumi/collective_actor.cc +++ b/sumi/collective_actor.cc @@ -667,8 +667,9 @@ DagCollectiveActor::dataRecved( Action* ac = active_comms_[id]; if (ac == nullptr){ spkt_throw_printf(sprockit::ValueError, - "on %d, received data for unknown receive %u from %d on round %d", - dom_me_, id, msg->domSender(), msg->round()); + "on %d, received data for unknown receive %u from %d on round %d\n%s", + dom_me_, id, msg->domSender(), msg->round(), + msg->toString().c_str()); } dataRecved(ac, msg, recvd_buffer); diff --git a/sumi/communicator.cc b/sumi/communicator.cc index 175cac91a..2bc808a43 100644 --- a/sumi/communicator.cc +++ b/sumi/communicator.cc @@ -67,17 +67,22 @@ Communicator::createSmpCommunicator(const std::set& neighbors, CollectiveEn auto neighbors_subset = globalRankSetIntersection(neighbors); if (neighbors_subset.size() == 1) return; //no smp parallelism + + int myGlobalRank = commToGlobalRank(my_comm_rank_); if (neighbors_subset.size() > 1){ //there is smp parallelism to be had here int idx = 0; int my_smp_rank = 0; std::vector local_to_global(neighbors_subset.size()); - for (int rank : neighbors_subset){ - local_to_global[idx] = rank; - if (rank == this->myCommRank()){ + for (int glblRank : neighbors_subset){ + local_to_global[idx] = glblRank; + if (glblRank == myGlobalRank){ my_smp_rank = idx; } idx++; } + + + smp_comm_ = new MapCommunicator(my_smp_rank, std::move(local_to_global)); if (smp_comm_->nproc() == 0){ spkt_abort_printf("Created SMP communicator of size 0!" @@ -86,19 +91,20 @@ Communicator::createSmpCommunicator(const std::set& neighbors, CollectiveEn } std::vector smp_ranks(this->nproc()); - engine->allgather(smp_ranks.data(), &my_smp_rank, 1, sizeof(int), -2, cq_id); + int tag = -2; + engine->allgather(smp_ranks.data(), &my_smp_rank, 1, sizeof(int), tag, cq_id, this); engine->blockUntilNext(cq_id); std::map rank_counts; + int my_owner_rank = -1; if (my_smp_rank == 0){ - int my_owner_rank = -1; std::vector owner_to_global; idx = 0; for (int rank=0; rank < this->nproc(); ++rank){ int local_smp_rank = smp_ranks[rank]; rank_counts[local_smp_rank]++; if (local_smp_rank == 0){ - owner_to_global.push_back(rank); + owner_to_global.push_back(commToGlobalRank(rank)); if (rank == this->myCommRank()){ my_owner_rank = idx; } diff --git a/sumi/sim_transport.cc b/sumi/sim_transport.cc index 966d94956..edf25687c 100644 --- a/sumi/sim_transport.cc +++ b/sumi/sim_transport.cc @@ -689,9 +689,40 @@ CollectiveEngine::alltoall(void *dst, void *src, int nelems, int type_size, int if (!comm) comm = global_domain_; - DagCollective* coll = AllToAllCollective::getBuilderLibrary("macro")->getBuilder(alltoall_type_) - ->create(this, dst, src, nelems, type_size, tag, cq_id, comm); - return startCollective(coll); + auto* fact = AllToAllCollective::getBuilderLibrary("macro"); + auto* builder = fact->getBuilder(alltoall_type_); + if (!builder){ + spkt_abort_printf("invalid alltoall type requested: %s", allgather_type_.c_str()); + } + + if (comm->smpComm() && comm->smpBalanced()){ + int smpSize = comm->smpComm()->nproc(); + void* intraDst = dst ? new char[nelems*type_size*smpSize] : nullptr; + int intra_tag = 1<<28 | tag; + + BtreeGather* intra = new BtreeGather(this, 0, intraDst, src, smpSize*nelems, + type_size, intra_tag, cq_id, comm->smpComm()); + DagCollective* prev; + if (comm->ownerComm()){ + int inter_tag = 2<<28 | tag; + AllToAllCollective* inter = builder->create(this, dst, intraDst, smpSize*nelems, + type_size, inter_tag, cq_id, comm->ownerComm()); + intra->setSubsequent(inter); + prev = inter; + } else { + prev = intra; + } + int bcast_tag = 3<<28 | tag; + auto* bcast = new BinaryTreeBcastCollective(this, 0, dst, comm->nproc()*nelems, + type_size, bcast_tag, cq_id, comm->smpComm()); + prev->setSubsequent(bcast); + auto* final = new DoNothingCollective(this, tag, cq_id, comm); + bcast->setSubsequent(final); + return startCollective(intra); + } else { + AllToAllCollective* coll = builder->create(this, dst, src, nelems, type_size, tag, cq_id, comm); + return startCollective(coll); + } } CollectiveDoneMessage* @@ -732,12 +763,14 @@ CollectiveEngine::allgather(void *dst, void *src, int nelems, int type_size, int int intra_tag = 1<<28 | tag; + AllgatherCollective* intra = builder->create(this, intraDst, src, nelems, type_size, intra_tag, cq_id, comm->smpComm()); DagCollective* prev; if (comm->ownerComm()){ int inter_tag = 2<<28 | tag; + AllgatherCollective* inter = builder->create(this, dst, intraDst, smpSize*nelems, type_size, inter_tag, cq_id, comm->ownerComm()); intra->setSubsequent(inter); diff --git a/tests/reference/test_core_apps_direct_alltoall.ref-out b/tests/reference/test_core_apps_direct_alltoall.ref-out index 302c13f7e..665bc6d9e 100644 --- a/tests/reference/test_core_apps_direct_alltoall.ref-out +++ b/tests/reference/test_core_apps_direct_alltoall.ref-out @@ -1 +1 @@ -Estimated total runtime of 0.01380041 seconds +Estimated total runtime of 0.00209360 seconds diff --git a/tests/reference/test_core_apps_smp_collectives_optimized.ref-out b/tests/reference/test_core_apps_smp_collectives_optimized.ref-out index 7c22f8805..92d410278 100644 --- a/tests/reference/test_core_apps_smp_collectives_optimized.ref-out +++ b/tests/reference/test_core_apps_smp_collectives_optimized.ref-out @@ -1 +1 @@ -Estimated total runtime of 0.00001778 seconds +Estimated total runtime of 0.00386767 seconds diff --git a/tests/reference/test_core_apps_smp_collectives_unoptimized.ref-out b/tests/reference/test_core_apps_smp_collectives_unoptimized.ref-out index d0d6a5b23..8c9d17c27 100644 --- a/tests/reference/test_core_apps_smp_collectives_unoptimized.ref-out +++ b/tests/reference/test_core_apps_smp_collectives_unoptimized.ref-out @@ -1 +1 @@ -Estimated total runtime of 0.00002888 seconds +Estimated total runtime of 0.12326445 seconds diff --git a/tests/test_configs/test_smp_collectives_optimized.ini b/tests/test_configs/test_smp_collectives_optimized.ini index aeb7f9129..8f0c03d9a 100644 --- a/tests/test_configs/test_smp_collectives_optimized.ini +++ b/tests/test_configs/test_smp_collectives_optimized.ini @@ -4,10 +4,25 @@ node { indexing = block allocation = first_available name = mpi_smp_collectives - launch_cmd = aprun -n 256 -N 4 + launch_cmd = aprun -n 64 -N 4 start = 0ms message_size = 1KB mpi { + alltoall = bruck + allgather = bruck + smp_optimize = true + } + } + app2 { + indexing = block + allocation = first_available + name = mpi_smp_collectives + launch_cmd = aprun -n 64 -N 4 + start = 0ms + message_size = 1KB + mpi { + alltoall = direct + allgather = ring smp_optimize = true } } @@ -64,7 +79,7 @@ switch { topology { name = torus -geometry = [4,2,4] +geometry = [4,3,4] concentration = 2 }