diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..3d3ec1c --- /dev/null +++ b/LICENSE @@ -0,0 +1,29 @@ +BSD 3-Clause License + +Copyright (c) 2019, Battelle Memorial Institute +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +1. Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + +2. Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + +3. Neither the name of the copyright holder nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..21f77ba --- /dev/null +++ b/Makefile @@ -0,0 +1,36 @@ +CXX = mpicxx +# use -xmic-avx512 instead of -xHost for Intel Xeon Phi platforms +OPTFLAGS = -O3 -xHost -DPRINT_DIST_STATS -DPRINT_EXTRA_NEDGES +# -DPRINT_EXTRA_NEDGES prints extra edges when -p <> is passed to +# add extra edges randomly on a generated graph +# use export ASAN_OPTIONS=verbosity=1 to check ASAN output +SNTFLAGS = -std=c++11 -fsanitize=address -O1 -fno-omit-frame-pointer +CXXFLAGS = -std=c++11 -g $(OPTFLAGS) + +ENABLE_DUMPI_TRACE=0 +ENABLE_SCOREP_TRACE=0 + +ifeq ($(ENABLE_DUMPI_TRACE),1) + TRACERPATH = $(HOME)/builds/sst-dumpi/lib + LDFLAGS = -L$(TRACERPATH) -ldumpi +else ifeq ($(ENABLE_SCOREP_TRACE),1) + SCOREP_INSTALL_PATH = /usr/common/software/scorep/6.0/intel + INCLUDE = -I$(SCOREP_INSTALL_PATH)/include -I$(SCOREP_INSTALL_PATH)/include/scorep -DSCOREP_USER_ENABLE + LDAPP = $(SCOREP_INSTALL_PATH)/bin/scorep --user --nocompiler --noopenmp --nopomp --nocuda --noopenacc --noopencl --nomemory +endif + +OBJ = main.o +TARGET = neve + +all: $(TARGET) + +%.o: %.cpp + $(CXX) $(INCLUDE) $(CXXFLAGS) -c -o $@ $^ + +$(TARGET): $(OBJ) + $(LDAPP) $(CXX) -o $@ $^ + +.PHONY: clean + +clean: + rm -rf *~ *.dSYM nc.vg.* $(OBJ) $(TARGET) diff --git a/README b/README new file mode 100644 index 0000000..a772b7f --- /dev/null +++ b/README @@ -0,0 +1,133 @@ +*-*-*-*-*-*-*-*-* +| | | | | | | | | +*-*-*-neve-*-*-*- +| | | | | | | | | +*-*-*-*-*-*-*-*-* + +***** +About +***** + +`neve` is a microbenchmark that distributes a graph (real-world or synthetic) across +processes and then performs variable-sized message exchanges (equivalent to the number +of ghost vertices shared between processes) between process neighbors and reports +bandwidth/latency. Further, neve allows options to shrink a real-world graph (by a +percent of ghost vertices shared across processes), study performance of message exchanges +for a specific process neighborhood, analyze the impact of an edge-balanced real-world +graph distribution, and generate a graph in-memory. neve reports bandwidth/latency, and +follows the best practices of existing MPI microbenchmarks. + +We require the total number of processes to be a power of 2 and total number of vertices +to be perfectly divisible by the number of processes when parallel RGG generation options +are used. This constraint does not apply to real world graphs passed to neve. + +neve has an in-memory random geometric graph generator (just like our other mini-application, +miniVite[?]) that can be used for weak-scaling analysis. An n-D random geometric graph (RGG) +is generated by randomly placing N vertices in an n-D space and connecting pairs of vertices +whose Euclidean distance is less than or equal to d. We only consider 2D RGGs contained within +a unit square, [0,1]^2. We distribute the domain such that each process receives N/p vertices +(where p is the total number of processes). Each process owns (1 * 1/p) portion of the unit +square and d is computed as (please refer to Section 4 of miniVite paper[?] for details): + +d = (dc + dt)/2; +where, dc = sqrt(ln(N) / pi*N); dt = sqrt(2.0736 / pi*N) + +Therefore, the number of vertices (N) passed during execution on p processes must satisfy +the condition -- 1/p > d. Unlike miniVite, the edge weights of the generated graph is always +1.0, and not the Euclidean distance between vertices (because we only perform communication in +this microbenchmark, and no computation, so edge weights are irrelevant). + +[?] Ghosh, Sayan, Mahantesh Halappanavar, Antonio Tumeo, Ananth Kalyanaraman, and +Assefaw H. Gebremedhin. "miniVite: A Graph Analytics Benchmarking Tool for Massively +Parallel Systems." In 2018 IEEE/ACM Performance Modeling, Benchmarking and Simulation of +High Performance Computer Systems (PMBS), pp. 51-56. IEEE, 2018. + +Please note, the default distribution of graph generated from the in-built random +geometric graph generator causes a process to only communicate with its two +immediate neighbors. If you want to increase the communication intensity for +generated graphs, please use the "-p" option to specify an extra percentage of edges +that will be generated, linking random vertices. As a side-effect, this option +significantly increases the time required to generate the graph, therefore low values +are preferred. The max number of edges that can be added randomly must be less than +equal to INT_MAX, at present we don't handle cases in which "-p " resolves to +extra edges more than INT_MAX. + +We also allow users to pass any real world graph as input. However, we expect an input graph +to be in a certain binary format, which we have observed to be more efficient than reading +ASCII format files. The code for binary conversion (from a variety of common graph formats) +is packaged separately with another software called Vite, which is our distributed-memory +implementation of graph community detection. Please follow instructions in Vite README for +binary file conversion. Vite could be downloaded from (please don't use the past PNNL/PNL +link to download Vite, the following GitHub link is the correct one): +https://github.com/Exa-Graph/vite + +Please contact the following for any queries or support: + +Sayan Ghosh, PNNL (sayan dot ghosh at pnnl dot gov) +Mahantesh Halappanavar, PNNL (hala at pnnl dot gov) + +******* +Compile +******* + +neve is a C++ header-only library and requires an MPI implementation. It uses MPI Send/Recv and +collectives (for synthetic graph generation). Please update the Makefile with compiler flags and +use a C++11 compliant compiler of your choice. Invoke `make clean; make` after setting paths +to MPI for generating the binary. Use `mpirun` or `mpiexec` or `srun` to execute the code with +specific runtime arguments mentioned in the next section. + +Pass -DPRINT_DIST_STATS while building for printing distributed graph characteristics. + +***************** +Execution options +***************** +E.g.: +mpiexec -n 2 bin/./neve -f karate.bin -w -t 0 -z 5 +mpiexec -n 4 bin/./neve -f karate.bin -w -t 0 -s 2 -g 5 +mpiexec -n 2 bin/./neve -l -n 100 -w +mpiexec -n 2 bin/./neve -n 100 -t 0 +mpiexec -n 2 bin/./neve -p 2 -n 100 -w + +Possible options (can be combined): + +1. -f : Specify input binary file after this argument. +2. -b : Only valid for real-world inputs. Attempts to distribute approximately + equal number of edges among processes. Irregular number of vertices + owned by a particular process. Increases the distributed graph creation + time due to serial overheads, but may improve overall execution time. +3. -n : Only valid for synthetically generated inputs. Pass total number of + vertices of the generated graph. +4. -l : Use distributed LCG for randomly choosing edges. If this option + is not used, we will use C++ random number generator (using + std::default_random_engine). +5. -p : Only valid for synthetically generated inputs. Specify percent of overall + edges to be randomly generated between processes. +6. -r : This is used to control the number of aggregators in MPI I/O and is + meaningful when an input binary graph file is passed with option "-f". + naggr := (nranks > 1) ? (nprocs/nranks) : nranks; +7 -w : Report Bandwidth in MB/s[*]. +8. -t <0|1|2> : Report Latency in microseconds[*]. Option '0' uses nonblocking Send/Recv, + Option '1' uses MPI_Neighbor_alltoall and Option '2' uses MPI_Neighbor_allgather. +9. -x : Maximum data exchange size (in bytes). +10. -m : Minimum data exchange size (in bytes). +11. -s : Analyze a single process neighborhood. Performs bidirectional message + exchanges between the process neighbors of a particular PE passed by the + user. This transforms the neighbor subgraph of a particular PE as a fully + connected graph. +12. -g : Specify maximum number of ghosts shared between process neighbors of a + particular PE. This option is only valid when -s is passed (see #11). +13. -z : Select a percentage of ghost vertices of a real-world graph distributed + across processes as actual ghosts. For e.g., lets assume that a graph is + distributed across 4 processes, and PE#0 has two neighbors, PE#1 and PE#3 + with whom it shares a 100 ghost vertices each. In such a configuration, + the b/w test would perform variable-sized message exchanges a 100 times + between PE#0 and {PE#1, PE#3}. The -z option can limit the + number of message exchanges by selecting a percentage of the actual the number + of ghosts, but maintaining the overall structure of the original graph. + Hence, this option can help 'shrink' a graph. Only valid for b/w test, because + latency test will just perform message transfers between process neighborhoods. + +[*]Note: Unless -w or -t <...> is passed, the code will just load the graph and create the graph data +structure. This is deliberate, in case we want to measure the overhead of loading a graph and +time only the file I/O part (like measuring the impact of different #aggregators through the +-r option). diff --git a/comm.hpp b/comm.hpp new file mode 100644 index 0000000..de6cc29 --- /dev/null +++ b/comm.hpp @@ -0,0 +1,1287 @@ +// *********************************************************************** +// +// NEVE +// +// *********************************************************************** +// +// Copyright (2019) Battelle Memorial Institute +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions +// are met: +// +// 1. Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS +// FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE +// COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +// INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, +// BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +// CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +// LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN +// ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +// POSSIBILITY OF SUCH DAMAGE. +// +// ************************************************************************ +#pragma once +#ifndef COMM_HPP +#define COMM_HPP + +#include "graph.hpp" + +#include +#include +#include +#include + +#if defined(SCOREP_USER_ENABLE) +#include +#endif + +#define MAX_SIZE (1<<22) +#define MIN_SIZE (0) +#define LARGE_SIZE 8192 +#define ZCI (1.96) + +#define BW_LOOP_COUNT 100 +#define BW_SKIP_COUNT 10 +#define BW_LOOP_COUNT_LARGE 20 +#define BW_SKIP_COUNT_LARGE 2 + +#define LT_LOOP_COUNT 10000 +#define LT_SKIP_COUNT 100 +#define LT_LOOP_COUNT_LARGE 1000 +#define LT_SKIP_COUNT_LARGE 10 + +class Comm +{ + public: + +#define COMM_COMMON() \ + do { \ + comm_ = g_->get_comm(); \ + MPI_Comm_size(comm_, &size_); \ + MPI_Comm_rank(comm_, &rank_); \ + const GraphElem lne = g_->get_lne(); \ + lnv_ = g_->get_lnv(); \ + std::vector a2a_send_dat(size_), a2a_recv_dat(size_); \ + /* track outgoing ghosts not owned by me */ \ + for (GraphElem i = 0; i < lnv_; i++) \ + { \ + GraphElem e0, e1; \ + g_->edge_range(i, e0, e1); \ + for (GraphElem e = e0; e < e1; e++) \ + { \ + Edge const& edge = g_->get_edge(e); \ + const int owner = g_->get_owner(edge.tail_); \ + if (owner != rank_) \ + { \ + if (std::find(targets_.begin(), targets_.end(), owner) == targets_.end()) \ + { \ + targets_.push_back(owner); \ + target_pindex_.insert({owner, outdegree_}); \ + outdegree_++; \ + nghosts_in_target_.push_back(0); \ + } \ + out_nghosts_++; \ + a2a_send_dat[owner]++; \ + nghosts_in_target_[target_pindex_[owner]]++; \ + } \ + } \ + } \ + assert(outdegree_ == nghosts_in_target_.size()); \ + if (shrinkp_ > 0.0) \ + { \ + GraphElem new_nghosts = 0; \ + std::unordered_map::iterator peit = target_pindex_.begin(); \ + for (int p = 0; p < outdegree_; p++) \ + { \ + nghosts_in_target_[p] = (int)((shrinkp_ * (float)nghosts_in_target_[p]) / (float)100); \ + if (nghosts_in_target_[p] == 0) \ + nghosts_in_target_[p] = 1; \ + new_nghosts += nghosts_in_target_[p]; \ + a2a_send_dat[peit->first] = nghosts_in_target_[p]; \ + ++peit; \ + } \ + GraphElem nghosts[2] = {out_nghosts_, new_nghosts}, all_nghosts[2] = {0, 0}; \ + MPI_Reduce(nghosts, all_nghosts, 2, MPI_GRAPH_TYPE, MPI_SUM, 0, comm_); \ + out_nghosts_ = new_nghosts; \ + if (rank_ == 0) \ + { \ + std::cout << "Considering only " << shrinkp_ << "% of overall #ghosts, previous total outgoing #ghosts: " \ + << all_nghosts[0] << ", current total outgoing #ghosts: " << all_nghosts[1] << std::endl; \ + } \ + } \ + /* track incoming communication (processes for which I am a ghost) */ \ + /* send to PEs in targets_ list about shared ghost info */ \ + MPI_Alltoall(a2a_send_dat.data(), 1, MPI_GRAPH_TYPE, a2a_recv_dat.data(), 1, MPI_GRAPH_TYPE, comm_); \ + MPI_Barrier(comm_); \ + for (int p = 0; p < size_; p++) \ + { \ + if (a2a_recv_dat[p] > 0) \ + { \ + source_pindex_.insert({p, indegree_}); \ + sources_.push_back(p); \ + nghosts_in_source_.push_back(a2a_recv_dat[p]); \ + indegree_++; \ + in_nghosts_ += a2a_recv_dat[p]; \ + } \ + } \ + assert(indegree_ == nghosts_in_source_.size()); \ + sbuf_ = new char[out_nghosts_*max_size_]; \ + rbuf_ = new char[in_nghosts_*max_size_]; \ + assert(in_nghosts_ >= indegree_); \ + assert(out_nghosts_ >= outdegree_); \ + sreq_ = new MPI_Request[out_nghosts_]; \ + rreq_ = new MPI_Request[in_nghosts_]; \ + /* for large graphs, if iteration counts are not reduced it takes >> time */\ + if (lne > 1000) \ + { \ + if (bw_loop_count_ == BW_LOOP_COUNT) \ + bw_loop_count_ = bw_loop_count_large_; \ + if (bw_skip_count_ == BW_SKIP_COUNT) \ + bw_skip_count_ = bw_skip_count_large_; \ + } \ + a2a_send_dat.clear(); \ + a2a_recv_dat.clear(); \ + /* create graph topology communicator for neighbor collectives */ \ + MPI_Dist_graph_create_adjacent(comm_, sources_.size(), sources_.data(), \ + MPI_UNWEIGHTED, targets_.size(), targets_.data(), MPI_UNWEIGHTED, \ + MPI_INFO_NULL, 0 , &nbr_comm_); \ + /* following is not necessary, just checking */ \ + int weighted, indeg, outdeg; \ + MPI_Dist_graph_neighbors_count(nbr_comm_, &indeg, &outdeg, &weighted); \ + assert(indegree_ == sources_.size()); \ + assert(outdegree_ == targets_.size()); \ + } while(0) + + explicit Comm(Graph* g): + g_(g), comm_(MPI_COMM_NULL), nbr_comm_(MPI_COMM_NULL), + in_nghosts_(0), out_nghosts_(0), lnv_(0), + target_pindex_(0), source_pindex_(0), + nghosts_in_target_(0), nghosts_in_source_(0), + sbuf_(nullptr), rbuf_(nullptr), + sreq_(nullptr), rreq_(nullptr), + max_size_(MAX_SIZE), min_size_(MIN_SIZE), + large_msg_size_(LARGE_SIZE), + bw_loop_count_(BW_LOOP_COUNT), + bw_loop_count_large_(BW_LOOP_COUNT_LARGE), + bw_skip_count_(BW_SKIP_COUNT), + bw_skip_count_large_(BW_SKIP_COUNT_LARGE), + lt_loop_count_(LT_LOOP_COUNT), + lt_loop_count_large_(LT_LOOP_COUNT_LARGE), + lt_skip_count_(LT_SKIP_COUNT), + lt_skip_count_large_(LT_SKIP_COUNT_LARGE), + targets_(0), sources_(0), indegree_(0), outdegree_(0) + { COMM_COMMON(); } + + explicit Comm(Graph* g, GraphElem min_size, GraphElem max_size, float shrink_percent): + g_(g), comm_(MPI_COMM_NULL), nbr_comm_(MPI_COMM_NULL), + in_nghosts_(0), out_nghosts_(0), lnv_(0), + target_pindex_(0), source_pindex_(0), + nghosts_in_target_(0), nghosts_in_source_(0), + sbuf_(nullptr), rbuf_(nullptr), + sreq_(nullptr), rreq_(nullptr), + min_size_(min_size), max_size_(max_size), + large_msg_size_(LARGE_SIZE), + bw_loop_count_(BW_LOOP_COUNT), + bw_loop_count_large_(BW_LOOP_COUNT_LARGE), + bw_skip_count_(BW_SKIP_COUNT), + bw_skip_count_large_(BW_SKIP_COUNT_LARGE), + lt_loop_count_(LT_LOOP_COUNT), + lt_loop_count_large_(LT_LOOP_COUNT_LARGE), + lt_skip_count_(LT_SKIP_COUNT), + lt_skip_count_large_(LT_SKIP_COUNT_LARGE), + targets_(0), sources_(0), indegree_(0), outdegree_(0), + shrinkp_(shrink_percent) + { COMM_COMMON(); } + + explicit Comm(Graph* g, + GraphElem max_size, GraphElem min_size, + GraphElem large_msg_size, + int bw_loop_count, int bw_loop_count_large, + int bw_skip_count, int bw_skip_count_large, + int lt_loop_count, int lt_loop_count_large, + int lt_skip_count, int lt_skip_count_large): + g_(g), comm_(MPI_COMM_NULL), nbr_comm_(MPI_COMM_NULL), + in_nghosts_(0), out_nghosts_(0), lnv_(0), + target_pindex_(0), source_pindex_(0), + nghosts_in_target_(0), nghosts_in_source_(0), + sbuf_(nullptr), rbuf_(nullptr), + sreq_(nullptr), rreq_(nullptr), + max_size_(max_size), min_size_(min_size), + large_msg_size_(large_msg_size), + bw_loop_count_(bw_loop_count), + bw_loop_count_large_(bw_loop_count_large), + bw_skip_count_(bw_skip_count), + bw_skip_count_large_(bw_skip_count_large), + lt_loop_count_(lt_loop_count), + lt_loop_count_large_(lt_loop_count_large), + lt_skip_count_(lt_skip_count), + lt_skip_count_large_(lt_skip_count_large), + targets_(0), sources_(0), indegree_(0), outdegree_(0) + { COMM_COMMON(); } + + // destroy graph topology communicator + void destroy_nbr_comm() + { + if (nbr_comm_ != MPI_COMM_NULL) + MPI_Comm_free(&nbr_comm_); + }; + + ~Comm() + { + targets_.clear(); + target_pindex_.clear(); + nghosts_in_target_.clear(); + sources_.clear(); + source_pindex_.clear(); + nghosts_in_source_.clear(); + + delete []sbuf_; + delete []rbuf_; + delete []sreq_; + delete []rreq_; + } + + void touch_buffers(GraphElem const& size) + { + std::memset(sbuf_, 'a', out_nghosts_*size); + std::memset(rbuf_, 'b', in_nghosts_*size); + } + + // kernel for bandwidth + // (extra s/w overhead for determining + // owner and accessing CSR) + inline void comm_kernel_bw_extra_overhead(GraphElem const& size) + { + // prepost recvs + for (GraphElem g = 0; g < in_nghosts_; g++) + { + MPI_Irecv(&rbuf_[g*size], size, MPI_CHAR, sources_[g], + g, comm_, rreq_ + g); + } + + // sends + GraphElem ng = 0; + for (GraphElem i = 0; i < lnv_; i++) + { + GraphElem e0, e1; + g_->edge_range(i, e0, e1); + + for (GraphElem e = e0; e < e1; e++) + { + Edge const& edge = g_->get_edge(e); + const int owner = g_->get_owner(edge.tail_); + if (owner != rank_) + { + MPI_Isend(&sbuf_[ng*size], size, MPI_CHAR, owner, + ng, comm_, sreq_+ ng); + ng++; + } + } + } + + MPI_Waitall(in_nghosts_, rreq_, MPI_STATUSES_IGNORE); + MPI_Waitall(out_nghosts_, sreq_, MPI_STATUSES_IGNORE); + } + + // kernel for latency using MPI Isend/Irecv + inline void comm_kernel_lt(GraphElem const& size) + { + for (int p = 0; p < indegree_; p++) + { + MPI_Irecv(rbuf_, size, MPI_CHAR, sources_[p], 100, comm_, rreq_ + p); + } + + + for (int p = 0; p < outdegree_; p++) + { + MPI_Isend(sbuf_, size, MPI_CHAR, targets_[p], 100, comm_, sreq_ + p); + } + + MPI_Waitall(indegree_, rreq_, MPI_STATUSES_IGNORE); + MPI_Waitall(outdegree_, sreq_, MPI_STATUSES_IGNORE); + } + +#if defined(TEST_LT_MPI_PROC_NULL) + // same as above, but replaces target with MPI_PROC_NULL to + // measure software overhead + inline void comm_kernel_lt_pnull(GraphElem const& size) + { + for (int p = 0; p < indegree_; p++) + { + MPI_Irecv(rbuf_, size, MPI_CHAR, MPI_PROC_NULL, 100, comm_, rreq_ + p); + } + + + for (int p = 0; p < outdegree_; p++) + { + MPI_Isend(sbuf_, size, MPI_CHAR, MPI_PROC_NULL, 100, comm_, sreq_ + p); + } + + MPI_Waitall(indegree_, rreq_, MPI_STATUSES_IGNORE); + MPI_Waitall(outdegree_, sreq_, MPI_STATUSES_IGNORE); + } +#endif + + // kernel for latency using MPI Isend/Irecv (invokes usleep) + inline void comm_kernel_lt_usleep(GraphElem const& size) + { + for (int p = 0; p < indegree_; p++) + { + MPI_Irecv(rbuf_, size, MPI_CHAR, sources_[p], 100, comm_, rreq_ + p); + } + + usleep(lnv_); + + for (int p = 0; p < outdegree_; p++) + { + MPI_Isend(sbuf_, size, MPI_CHAR, targets_[p], 100, comm_, sreq_ + p); + } + + MPI_Waitall(indegree_, rreq_, MPI_STATUSES_IGNORE); + MPI_Waitall(outdegree_, sreq_, MPI_STATUSES_IGNORE); + } + +#if defined(TEST_LT_MPI_PROC_NULL) + // same as above, but replaces target with MPI_PROC_NULL to + // measure software overhead (invokes usleep) + inline void comm_kernel_lt_pnull_usleep(GraphElem const& size) + { + for (int p = 0; p < indegree_; p++) + { + MPI_Irecv(rbuf_, size, MPI_CHAR, MPI_PROC_NULL, 100, comm_, rreq_ + p); + } + + usleep(lnv_); + + for (int p = 0; p < outdegree_; p++) + { + MPI_Isend(sbuf_, size, MPI_CHAR, MPI_PROC_NULL, 100, comm_, sreq_ + p); + } + + MPI_Waitall(indegree_, rreq_, MPI_STATUSES_IGNORE); + MPI_Waitall(outdegree_, sreq_, MPI_STATUSES_IGNORE); + } +#endif + + // kernel for latency using MPI_Neighbor_alltoall + inline void comm_kernel_lt_ala(GraphElem const& size) + { MPI_Neighbor_alltoall(sbuf_, size, MPI_CHAR, rbuf_, size, MPI_CHAR, nbr_comm_); } + + // kernel for latency using MPI_Neighbor_allgather + inline void comm_kernel_lt_aga(GraphElem const& size) + { MPI_Neighbor_allgather(sbuf_, size, MPI_CHAR, rbuf_, size, MPI_CHAR, nbr_comm_); } + + // kernel for latency with extra input paragathers + inline void comm_kernel_lt(GraphElem const& size, GraphElem const& npairs, + MPI_Comm gcomm, int const& me) + { + for (int p = 0, j = 0; p < npairs; p++) + { + if (p != me) + { + MPI_Irecv(rbuf_, size, MPI_CHAR, p, 100, gcomm, rreq_ + j); + j++; + } + } + + for (int p = 0, j = 0; p < npairs; p++) + { + if (p != me) + { + MPI_Isend(sbuf_, size, MPI_CHAR, p, 100, gcomm, sreq_ + j); + j++; + } + } + + MPI_Waitall(npairs-1, rreq_, MPI_STATUSES_IGNORE); + MPI_Waitall(npairs-1, sreq_, MPI_STATUSES_IGNORE); + } + + // kernel for bandwidth + inline void comm_kernel_bw(GraphElem const& size) + { + GraphElem rng = 0, sng = 0; + + // prepost recvs + for (int p = 0; p < indegree_; p++) + { + for (GraphElem g = 0; g < nghosts_in_source_[p]; g++) + { + MPI_Irecv(&rbuf_[rng*size], size, MPI_CHAR, sources_[p], g, comm_, rreq_ + rng); + rng++; + } + } + + // sends + for (int p = 0; p < outdegree_; p++) + { + for (GraphElem g = 0; g < nghosts_in_target_[p]; g++) + { + MPI_Isend(&sbuf_[sng*size], size, MPI_CHAR, targets_[p], g, comm_, sreq_+ sng); + sng++; + } + } + + MPI_Waitall(in_nghosts_, rreq_, MPI_STATUSES_IGNORE); + MPI_Waitall(out_nghosts_, sreq_, MPI_STATUSES_IGNORE); + } + + // kernel for bandwidth with extra input parameters and + // out of order receives + inline void comm_kernel_bw(GraphElem const& size, GraphElem const& npairs, + MPI_Comm gcomm, GraphElem const& avg_ng, int const& me) + { + // prepost recvs + for (int p = 0, j = 0; p < npairs; p++) + { + if (p != me) + { + for (GraphElem g = 0; g < avg_ng; g++) + { + MPI_Irecv(&rbuf_[j*size], size, MPI_CHAR, MPI_ANY_SOURCE, j, gcomm, rreq_ + j); + j++; + } + } + } + + // sends + for (int p = 0, j = 0; p < npairs; p++) + { + if (p != me) + { + for (GraphElem g = 0; g < avg_ng; g++) + { + MPI_Isend(&sbuf_[j*size], size, MPI_CHAR, p, j, gcomm, sreq_+ j); + j++; + } + } + } + + MPI_Waitall(avg_ng*(npairs-1), rreq_, MPI_STATUSES_IGNORE); + MPI_Waitall(avg_ng*(npairs-1), sreq_, MPI_STATUSES_IGNORE); + } + + // Bandwidth tests + void p2p_bw() + { + double t, t_start, t_end, sum_t = 0.0; + int loop = bw_loop_count_, skip = bw_skip_count_; + + // total communicating pairs + int sum_npairs = outdegree_ + indegree_; + MPI_Allreduce(MPI_IN_PLACE, &sum_npairs, 1, MPI_INT, MPI_SUM, comm_); + sum_npairs /= 2; + + // find average number of ghost vertices + GraphElem sum_ng = out_nghosts_ + in_nghosts_, avg_ng; + MPI_Allreduce(MPI_IN_PLACE, &sum_ng, 1, MPI_GRAPH_TYPE, MPI_SUM, comm_); + avg_ng = sum_ng / sum_npairs; + + if(rank_ == 0) + { + std::cout << "--------------------------------" << std::endl; + std::cout << "--------Bandwidth test----------" << std::endl; + std::cout << "--------------------------------" << std::endl; + std::cout << std::setw(12) << "# Bytes" << std::setw(13) << "MB/s" + << std::setw(13) << "Msg/s" + << std::setw(18) << "Variance" + << std::setw(15) << "STDDEV" + << std::setw(16) << "95% CI" + << std::endl; + } + + for (GraphElem size = (!min_size_ ? 1 : min_size_); size <= max_size_; size *= 2) + { + // memset + touch_buffers(size); + + if(size > large_msg_size_) + { + loop = bw_loop_count_large_; + skip = bw_skip_count_large_; + } + +#if defined(SCOREP_USER_ENABLE) + SCOREP_RECORDING_ON(); +#endif + // time communication kernel + for (int l = 0; l < loop + skip; l++) + { + if (l == skip) + { + MPI_Barrier(comm_); + t_start = MPI_Wtime(); + } + + comm_kernel_bw(size); + } + +#if defined(SCOREP_USER_ENABLE) + SCOREP_RECORDING_OFF(); +#endif + t_end = MPI_Wtime(); + t = t_end - t_start; + + // execution time stats + MPI_Reduce(&t, &sum_t, 1, MPI_DOUBLE, MPI_SUM, 0, comm_); + + double avg_t = sum_t / sum_npairs; + double avg_st = sum_t / size_; + double t_sq = t*t; + double sum_tsq = 0; + MPI_Reduce(&t_sq, &sum_tsq, 1, MPI_DOUBLE, MPI_SUM, 0, comm_); + + double avg_tsq = sum_tsq / size_; + double var = avg_tsq - (avg_st*avg_st); + double stddev = sqrt(var); + + if (rank_ == 0) + { + double tmp = size / 1e6 * loop * avg_ng; + sum_t /= sum_npairs; + double bw = tmp / sum_t; + + std::cout << std::setw(10) << size << std::setw(15) << bw + << std::setw(15) << 1e6 * bw / size + << std::setw(18) << var + << std::setw(16) << stddev + << std::setw(16) << stddev * ZCI / sqrt((double)loop * avg_ng) + << std::endl; + } + } + } + + // no extra loop, just communication among ghosts + void p2p_bw_hardskip() + { + double t, t_start, t_end, sum_t = 0.0; + + // total communicating pairs + int sum_npairs = outdegree_ + indegree_; + MPI_Allreduce(MPI_IN_PLACE, &sum_npairs, 1, MPI_INT, MPI_SUM, comm_); + sum_npairs /= 2; + + // find average number of ghost vertices + GraphElem sum_ng = out_nghosts_, avg_ng; + MPI_Allreduce(MPI_IN_PLACE, &sum_ng, 1, MPI_GRAPH_TYPE, MPI_SUM, comm_); + avg_ng = sum_ng / sum_npairs; + + if(rank_ == 0) + { + std::cout << "--------------------------------" << std::endl; + std::cout << "--------Bandwidth test----------" << std::endl; + std::cout << "--------------------------------" << std::endl; + std::cout << std::setw(12) << "# Bytes" << std::setw(13) << "MB/s" + << std::setw(13) << "Msg/s" + << std::setw(18) << "Variance" + << std::setw(15) << "STDDEV" + << std::setw(16) << "95% CI" + << std::endl; + } + + for (GraphElem size = (!min_size_ ? 1 : min_size_); size <= max_size_; size *= 2) + { + // memset + touch_buffers(size); + + MPI_Barrier(comm_); + + t_start = MPI_Wtime(); + + comm_kernel_bw(size); + + t_end = MPI_Wtime(); + t = t_end - t_start; + + // execution time stats + MPI_Reduce(&t, &sum_t, 1, MPI_DOUBLE, MPI_SUM, 0, comm_); + + double avg_t = sum_t / sum_npairs; + double avg_st = sum_t / size_; + double t_sq = t*t; + double sum_tsq = 0; + MPI_Reduce(&t_sq, &sum_tsq, 1, MPI_DOUBLE, MPI_SUM, 0, comm_); + + double avg_tsq = sum_tsq / size_; + double var = avg_tsq - (avg_st*avg_st); + double stddev = sqrt(var); + + if (rank_ == 0) + { + double tmp = size / 1e6 * avg_ng; + sum_t /= sum_npairs; + double bw = tmp / sum_t; + + std::cout << std::setw(10) << size << std::setw(15) << bw + << std::setw(15) << 1e6 * bw / size + << std::setw(18) << var + << std::setw(16) << stddev + << std::setw(16) << stddev * ZCI / sqrt((double)avg_ng) + << std::endl; + } + } + } + + // Latency test using MPI Isend/Irecv + void p2p_lt() + { + double t, t_start, t_end, sum_t = 0.0; + int loop = lt_loop_count_, skip = lt_skip_count_; + + std::vector plat(size_); + int n99 = (int)std::ceil(0.99*size_); + + // total communicating pairs + int sum_npairs = outdegree_ + indegree_; + MPI_Allreduce(MPI_IN_PLACE, &sum_npairs, 1, MPI_INT, MPI_SUM, comm_); + sum_npairs /= 2; + + if(rank_ == 0) + { + std::cout << "--------------------------------" << std::endl; + std::cout << "----------Latency test----------" << std::endl; + std::cout << "--------------------------------" << std::endl; + std::cout << std::setw(12) << "# Bytes" << std::setw(15) << "Lat(us)" + << std::setw(16) << "Max(us)" + << std::setw(16) << "99%(us)" + << std::setw(16) << "Variance" + << std::setw(15) << "STDDEV" + << std::setw(16) << "95% CI" + << std::endl; + } + + for (GraphElem size = min_size_; size <= max_size_; size = (size ? size * 2 : 1)) + { + MPI_Barrier(comm_); + + if (size > large_msg_size_) + { + loop = lt_loop_count_large_; + skip = lt_skip_count_large_; + } + +#if defined(SCOREP_USER_ENABLE) + SCOREP_RECORDING_ON(); + SCOREP_USER_REGION_BY_NAME_BEGIN("TRACER_Loop", SCOREP_USER_REGION_TYPE_COMMON); + if (rank_ == 0) + SCOREP_USER_REGION_BY_NAME_BEGIN("TRACER_WallTime_MainLoop", SCOREP_USER_REGION_TYPE_COMMON); +#endif + // time communication kernel + for (int l = 0; l < loop + skip; l++) + { + if (l == skip) + { + t_start = MPI_Wtime(); + MPI_Barrier(comm_); + } + +#if defined(TEST_LT_MPI_PROC_NULL) + comm_kernel_lt_pnull(size); +#else + comm_kernel_lt(size); +#endif + } + +#if defined(SCOREP_USER_ENABLE) + if (rank_ == 0) + SCOREP_USER_REGION_BY_NAME_END("TRACER_WallTime_MainLoop"); + + SCOREP_USER_REGION_BY_NAME_END("TRACER_Loop"); + SCOREP_RECORDING_OFF(); +#endif + t_end = MPI_Wtime(); + t = (t_end - t_start) * 1.0e6 / (double)loop; + double t_sq = t*t; + double sum_tsq = 0; + + // execution time stats + MPI_Allreduce(&t, &sum_t, 1, MPI_DOUBLE, MPI_SUM, comm_); + MPI_Reduce(&t_sq, &sum_tsq, 1, MPI_DOUBLE, MPI_SUM, 0, comm_); + + double avg_t = sum_t / (double) sum_npairs; + double avg_st = sum_t / (double) size_; // no. of observations + double avg_tsq = sum_tsq / (double) size_; + double var = avg_tsq - (avg_st*avg_st); + double stddev = sqrt(var); + + double lmax = 0.0; + MPI_Reduce(&t, &lmax, 1, MPI_DOUBLE, MPI_MAX, 0, comm_); + MPI_Gather(&t, 1, MPI_DOUBLE, plat.data(), 1, MPI_DOUBLE, 0, comm_); + + if (rank_ == 0) + { + std::sort(plat.begin(), plat.end()); + std::cout << std::setw(10) << size << std::setw(17) << avg_t + << std::setw(16) << lmax/2.0 + << std::setw(16) << plat[n99-1]/2.0 + << std::setw(16) << var + << std::setw(16) << stddev + << std::setw(16) << stddev * ZCI / sqrt((double)loop * sum_npairs) + << std::endl; + } + } + plat.clear(); + } + + // Latency test using MPI Isend/Irecv, including usleep + void p2p_lt_usleep() + { + double t, t_start, t_end, sum_t = 0.0; + int loop = lt_loop_count_, skip = lt_skip_count_; + + std::vector plat(size_); + int n99 = (int)std::ceil(0.99*size_); + + // total communicating pairs + int sum_npairs = outdegree_ + indegree_; + MPI_Allreduce(MPI_IN_PLACE, &sum_npairs, 1, MPI_INT, MPI_SUM, comm_); + sum_npairs /= 2; + + if(rank_ == 0) + { + std::cout << "--------------------------------" << std::endl; + std::cout << "-----Latency test (w usleep)----" << std::endl; + std::cout << "--------------------------------" << std::endl; + std::cout << std::setw(12) << "# Bytes" << std::setw(15) << "Lat(us)" + << std::setw(16) << "Max(us)" + << std::setw(16) << "99%(us)" + << std::setw(16) << "Variance" + << std::setw(15) << "STDDEV" + << std::setw(16) << "95% CI" + << std::endl; + } + + for (GraphElem size = min_size_; size <= max_size_; size = (size ? size * 2 : 1)) + { + MPI_Barrier(comm_); + + if (size > large_msg_size_) + { + loop = lt_loop_count_large_; + skip = lt_skip_count_large_; + } + +#if defined(SCOREP_USER_ENABLE) + SCOREP_RECORDING_ON(); + SCOREP_USER_REGION_BY_NAME_BEGIN("TRACER_Loop", SCOREP_USER_REGION_TYPE_COMMON); + if (rank_ == 0) + SCOREP_USER_REGION_BY_NAME_BEGIN("TRACER_WallTime_MainLoop", SCOREP_USER_REGION_TYPE_COMMON); +#endif + // time communication kernel + for (int l = 0; l < loop + skip; l++) + { + if (l == skip) + { + t_start = MPI_Wtime(); + MPI_Barrier(comm_); + } + +#if defined(TEST_LT_MPI_PROC_NULL) + comm_kernel_lt_pnull_usleep(size); +#else + comm_kernel_lt_usleep(size); +#endif + } + +#if defined(SCOREP_USER_ENABLE) + if (rank_ == 0) + SCOREP_USER_REGION_BY_NAME_END("TRACER_WallTime_MainLoop"); + + SCOREP_USER_REGION_BY_NAME_END("TRACER_Loop"); + SCOREP_RECORDING_OFF(); +#endif + t_end = MPI_Wtime(); + t = (t_end - t_start) * 1.0e6 / (double)loop; + double t_sq = t*t; + double sum_tsq = 0; + + // execution time stats + MPI_Allreduce(&t, &sum_t, 1, MPI_DOUBLE, MPI_SUM, comm_); + MPI_Reduce(&t_sq, &sum_tsq, 1, MPI_DOUBLE, MPI_SUM, 0, comm_); + + double avg_t = sum_t / (double) sum_npairs; + double avg_st = sum_t / (double) size_; // no. of observations + double avg_tsq = sum_tsq / (double) size_; + double var = avg_tsq - (avg_st*avg_st); + double stddev = sqrt(var); + + double lmax = 0.0; + MPI_Reduce(&t, &lmax, 1, MPI_DOUBLE, MPI_MAX, 0, comm_); + MPI_Gather(&t, 1, MPI_DOUBLE, plat.data(), 1, MPI_DOUBLE, 0, comm_); + + if (rank_ == 0) + { + std::sort(plat.begin(), plat.end()); + std::cout << std::setw(10) << size << std::setw(17) << avg_t + << std::setw(16) << lmax/2.0 + << std::setw(16) << plat[n99-1]/2.0 + << std::setw(16) << var + << std::setw(16) << stddev + << std::setw(16) << stddev * ZCI / sqrt((double)loop * sum_npairs) + << std::endl; + } + } + plat.clear(); + } + + // Latency test using all-to-all among graph neighbors + void nbr_ala_lt() + { + double t, t_start, t_end, sum_t = 0.0; + int loop = lt_loop_count_, skip = lt_skip_count_; + + std::vector plat(size_); + int n99 = (int)std::ceil(0.99*size_); + + // total communicating pairs + int sum_npairs = outdegree_ + indegree_; + MPI_Allreduce(MPI_IN_PLACE, &sum_npairs, 1, MPI_INT, MPI_SUM, comm_); + sum_npairs /= 2; + + if(rank_ == 0) + { + std::cout << "---------------------------------" << std::endl; + std::cout << "----Latency test (All-To-All)----" << std::endl; + std::cout << "---------------------------------" << std::endl; + std::cout << std::setw(12) << "# Bytes" << std::setw(15) << "Lat(us)" + << std::setw(16) << "Max(us)" + << std::setw(16) << "99%(us)" + << std::setw(16) << "Variance" + << std::setw(15) << "STDDEV" + << std::setw(16) << "95% CI" + << std::endl; + } + + for (GraphElem size = min_size_; size <= max_size_; size = (size ? size * 2 : 1)) + { + MPI_Barrier(comm_); + + if (size > large_msg_size_) + { + loop = lt_loop_count_large_; + skip = lt_skip_count_large_; + } + + // time communication kernel + for (int l = 0; l < loop + skip; l++) + { + if (l == skip) + t_start = MPI_Wtime(); + + comm_kernel_lt_ala(size); + } + + t_end = MPI_Wtime(); + t = (t_end - t_start) * 1.0e6 / (double)loop; + double t_sq = t*t; + double sum_tsq = 0; + + // execution time stats + MPI_Allreduce(&t, &sum_t, 1, MPI_DOUBLE, MPI_SUM, comm_); + MPI_Reduce(&t_sq, &sum_tsq, 1, MPI_DOUBLE, MPI_SUM, 0, comm_); + + double avg_t = sum_t / (double) size_; + double avg_tsq = sum_tsq / (double) size_; + double var = avg_tsq - (avg_t*avg_t); + double stddev = sqrt(var); + + double lmax = 0.0; + MPI_Reduce(&t, &lmax, 1, MPI_DOUBLE, MPI_MAX, 0, comm_); + MPI_Gather(&t, 1, MPI_DOUBLE, plat.data(), 1, MPI_DOUBLE, 0, comm_); + + if (rank_ == 0) + { + std::sort(plat.begin(), plat.end()); + std::cout << std::setw(10) << size << std::setw(17) << avg_t + << std::setw(16) << lmax + << std::setw(16) << plat[n99-1] + << std::setw(16) << var + << std::setw(16) << stddev + << std::setw(16) << stddev * ZCI / sqrt((double)loop * sum_npairs) + << std::endl; + } + } + plat.clear(); + } + + // Latency test using all-gather among graph neighbors + void nbr_aga_lt() + { + double t, t_start, t_end, sum_t = 0.0; + int loop = lt_loop_count_, skip = lt_skip_count_; + + std::vector plat(size_); + int n99 = (int)std::ceil(0.99*size_); + + // total communicating pairs + int sum_npairs = outdegree_ + indegree_; + MPI_Allreduce(MPI_IN_PLACE, &sum_npairs, 1, MPI_INT, MPI_SUM, comm_); + sum_npairs /= 2; + + if(rank_ == 0) + { + std::cout << "---------------------------------" << std::endl; + std::cout << "----Latency test (All-Gather)----" << std::endl; + std::cout << "---------------------------------" << std::endl; + std::cout << std::setw(12) << "# Bytes" << std::setw(15) << "Lat(us)" + << std::setw(16) << "Max(us)" + << std::setw(16) << "99%(us)" + << std::setw(16) << "Variance" + << std::setw(15) << "STDDEV" + << std::setw(16) << "95% CI" + << std::endl; + } + + for (GraphElem size = min_size_; size <= max_size_; size = (size ? size * 2 : 1)) + { + MPI_Barrier(comm_); + + if (size > large_msg_size_) + { + loop = lt_loop_count_large_; + skip = lt_skip_count_large_; + } + + // time communication kernel + for (int l = 0; l < loop + skip; l++) + { + if (l == skip) + t_start = MPI_Wtime(); + + comm_kernel_lt_aga(size); + } + + t_end = MPI_Wtime(); + t = (t_end - t_start) * 1.0e6 / (double)loop; + double t_sq = t*t; + double sum_tsq = 0; + + // execution time stats + MPI_Allreduce(&t, &sum_t, 1, MPI_DOUBLE, MPI_SUM, comm_); + MPI_Reduce(&t_sq, &sum_tsq, 1, MPI_DOUBLE, MPI_SUM, 0, comm_); + + double avg_t = sum_t / (double) size_; + double avg_tsq = sum_tsq / (double) size_; + double var = avg_tsq - (avg_t*avg_t); + double stddev = sqrt(var); + + double lmax = 0.0; + MPI_Reduce(&t, &lmax, 1, MPI_DOUBLE, MPI_MAX, 0, comm_); + MPI_Gather(&t, 1, MPI_DOUBLE, plat.data(), 1, MPI_DOUBLE, 0, comm_); + + if (rank_ == 0) + { + std::sort(plat.begin(), plat.end()); + std::cout << std::setw(10) << size << std::setw(17) << avg_t + << std::setw(16) << lmax + << std::setw(16) << plat[n99-1] + << std::setw(16) << var + << std::setw(16) << stddev + << std::setw(16) << stddev * ZCI / sqrt((double)loop * sum_npairs) + << std::endl; + } + } + plat.clear(); + } + + // Bandwidth/Latency estimation by analyzing a + // single process neighborhood + + // Bandwidth test (single neighborhood) + void p2p_bw_snbr(int target_nbrhood, GraphElem max_ng = -1) + { + double t, t_start, t_end, sum_t = 0.0; + int loop = bw_loop_count_, skip = bw_skip_count_; + assert(target_nbrhood < size_); + + // extract process neighborhood of target_nbrhood PE + int tgt_deg = outdegree_; + int tgt_rank = MPI_UNDEFINED, tgt_size = 0; + MPI_Bcast(&tgt_deg, 1, MPI_INT, target_nbrhood, comm_); + + std::vector exl_tgt(tgt_deg+1); + if (rank_ == target_nbrhood) + std::copy(targets_.begin(), targets_.end(), exl_tgt.begin()); + + MPI_Bcast(exl_tgt.data(), tgt_deg, MPI_INT, target_nbrhood, comm_); + exl_tgt[tgt_deg] = target_nbrhood; + + // find average number of ghost vertices + GraphElem avg_ng, sum_ng = out_nghosts_; + MPI_Allreduce(MPI_IN_PLACE, &sum_ng, 1, MPI_GRAPH_TYPE, MPI_SUM, comm_); + avg_ng = sum_ng / tgt_deg; // number of pairs + + // override computed avg_ng + if (max_ng > 0) + { + avg_ng = std::min(avg_ng, max_ng); + if (rank_ == 0) + { + std::cout << "Number of ghost vertices set as: " << avg_ng << std::endl; + } + } + + // create new group/comm + MPI_Group cur_grp, nbr_grp; + MPI_Comm nbr_comm; + + MPI_Comm_group(comm_, &cur_grp); + MPI_Group_incl(cur_grp, tgt_deg+1, exl_tgt.data(), &nbr_grp); + MPI_Comm_create(comm_, nbr_grp, &nbr_comm); + + MPI_Group_rank(nbr_grp, &tgt_rank); + MPI_Group_size(nbr_grp, &tgt_size); + + if(rank_ == target_nbrhood) + { + std::cout << "------------------------------------------" << std::endl; + std::cout << "---Bandwidth test (single neighborhood)---" << std::endl; + std::cout << "------------------------------------------" << std::endl; + std::cout << std::setw(12) << "# Bytes" << std::setw(13) << "MB/s" + << std::setw(13) << "Msg/s" + << std::setw(18) << "Variance" + << std::setw(15) << "STDDEV" + << std::setw(16) << "95% CI" + << std::endl; + } + + // start communication only if belongs to the + // chosen process neighborhood + if (tgt_rank != MPI_UNDEFINED) + { + // readjust request buffer sizes and counts + delete []sreq_; + delete []rreq_; + sreq_ = new MPI_Request[tgt_deg*avg_ng]; + rreq_ = new MPI_Request[tgt_deg*avg_ng]; + + if (bw_loop_count_ == BW_LOOP_COUNT_LARGE) + bw_loop_count_ = BW_LOOP_COUNT; + if (bw_skip_count_ == BW_SKIP_COUNT_LARGE) + bw_skip_count_ = BW_SKIP_COUNT; + + for (GraphElem size = (!min_size_ ? 1 : min_size_); size <= max_size_; size *= 2) + { + // memset + touch_buffers(size); + + if(size > large_msg_size_) + { + loop = bw_loop_count_large_; + skip = bw_skip_count_large_; + } + + // time communication kernel + for (int l = 0; l < loop + skip; l++) + { + if (l == skip) + { + MPI_Barrier(nbr_comm); + t_start = MPI_Wtime(); + } + + comm_kernel_bw(size, tgt_deg+1, nbr_comm, avg_ng, tgt_rank); + } + + t_end = MPI_Wtime(); + t = t_end - t_start; + + // execution time stats + MPI_Reduce(&t, &sum_t, 1, MPI_DOUBLE, MPI_SUM, 0, nbr_comm); + + double avg_t = sum_t / tgt_deg; + double avg_st = sum_t / tgt_size; + double t_sq = t*t; + double sum_tsq = 0; + MPI_Reduce(&t_sq, &sum_tsq, 1, MPI_DOUBLE, MPI_SUM, 0, nbr_comm); + + double avg_tsq = sum_tsq / tgt_size; + double var = avg_tsq - (avg_st*avg_st); + double stddev = sqrt(var); + + if (tgt_rank == 0) + { + double tmp = size / 1e6 * loop * avg_ng; + sum_t /= tgt_size; + double bw = tmp / sum_t; + + std::cout << std::setw(10) << size << std::setw(15) << bw + << std::setw(15) << 1e6 * bw / size + << std::setw(18) << var + << std::setw(16) << stddev + << std::setw(16) << stddev * ZCI / sqrt((double)avg_ng * loop) + << std::endl; + } + } + } + + // remaining processes wait on + // a barrier + MPI_Barrier(comm_); + + if (nbr_comm != MPI_COMM_NULL) + MPI_Comm_free(&nbr_comm); + + MPI_Group_free(&cur_grp); + MPI_Group_free(&nbr_grp); + } + + // Latency test (single neighborhood) + void p2p_lt_snbr(int target_nbrhood) + { + double t, t_start, t_end, sum_t = 0.0; + int loop = lt_loop_count_, skip = lt_skip_count_; + int tgt_rank = MPI_UNDEFINED, tgt_size = 0; + + assert(target_nbrhood < size_); + + // extract process neighborhood of target_nbrhood PE + int tgt_deg = outdegree_; + MPI_Bcast(&tgt_deg, 1, MPI_INT, target_nbrhood, comm_); + + std::vector exl_tgt(tgt_deg+1); + if (rank_ == target_nbrhood) + std::copy(targets_.begin(), targets_.end(), exl_tgt.begin()); + + MPI_Bcast(exl_tgt.data(), tgt_deg, MPI_INT, target_nbrhood, comm_); + exl_tgt[tgt_deg] = target_nbrhood; + + // create new group/comm + MPI_Group cur_grp, nbr_grp; + MPI_Comm nbr_comm; + + MPI_Comm_group(comm_, &cur_grp); + MPI_Group_incl(cur_grp, tgt_deg+1, exl_tgt.data(), &nbr_grp); + MPI_Comm_create(comm_, nbr_grp, &nbr_comm); + + MPI_Group_rank(nbr_grp, &tgt_rank); + MPI_Group_size(nbr_grp, &tgt_size); + + if(rank_ == target_nbrhood) + { + std::cout << "------------------------------------------" << std::endl; + std::cout << "----Latency test (single neighborhood)----" << std::endl; + std::cout << "------------------------------------------" << std::endl; + std::cout << std::setw(12) << "# Bytes" << std::setw(15) << "Lat(us)" + << std::setw(16) << "Max(us)" + << std::setw(16) << "99%(us)" + << std::setw(16) << "Variance" + << std::setw(15) << "STDDEV" + << std::setw(16) << "95% CI" + << std::endl; + } + + // start communication only if belongs to the + // chosen process neighborhood + if (tgt_rank != MPI_UNDEFINED) + { + std::vector plat(tgt_size); + int n99 = (int)std::ceil(0.99*tgt_size); + + for (GraphElem size = min_size_; size <= max_size_; size = (size ? size * 2 : 1)) + { + MPI_Barrier(nbr_comm); + + if(size > large_msg_size_) + { + loop = lt_loop_count_large_; + skip = lt_skip_count_large_; + } + + // time communication kernel + for (int l = 0; l < loop + skip; l++) + { + if (l == skip) + { + t_start = MPI_Wtime(); + MPI_Barrier(nbr_comm); + } + + comm_kernel_lt(size, tgt_deg+1, nbr_comm, tgt_rank); + } + + t_end = MPI_Wtime(); + t = (t_end - t_start) * 1.0e6 / (2.0 * loop); + + // execution time stats + MPI_Allreduce(&t, &sum_t, 1, MPI_DOUBLE, MPI_SUM, nbr_comm); + double t_sq = t*t; + double sum_tsq = 0; + MPI_Reduce(&t_sq, &sum_tsq, 1, MPI_DOUBLE, MPI_SUM, 0, nbr_comm); + + double avg_t = sum_t / (double)(2.0*tgt_deg); + double avg_st = sum_t / (double)(tgt_size); + double avg_tsq = sum_tsq / (double)(tgt_size); + double var = avg_tsq - (avg_st*avg_st); + double stddev = sqrt(var); + + double lmax = 0.0; + MPI_Reduce(&t, &lmax, 1, MPI_DOUBLE, MPI_MAX, 0, nbr_comm); + MPI_Gather(&t, 1, MPI_DOUBLE, plat.data(), 1, MPI_DOUBLE, 0, nbr_comm); + + if (tgt_rank == 0) + { + std::sort(plat.begin(), plat.end()); + std::cout << std::setw(10) << size << std::setw(17) << avg_t + << std::setw(16) << lmax/2.0 + << std::setw(16) << plat[n99-1]/2 + << std::setw(16) << var + << std::setw(16) << stddev + << std::setw(16) << stddev * ZCI / sqrt((double)tgt_size * loop) + << std::endl; + } + } + plat.clear(); + } + + // remaining processes wait on + // a barrier + MPI_Barrier(comm_); + + if (nbr_comm != MPI_COMM_NULL) + MPI_Comm_free(&nbr_comm); + + MPI_Group_free(&cur_grp); + MPI_Group_free(&nbr_grp); + } + + private: + Graph* g_; + GraphElem in_nghosts_, out_nghosts_, lnv_; + // ghost vertices in source/target rank + std::vector nghosts_in_target_, nghosts_in_source_; + std::unordered_map target_pindex_, source_pindex_; + + char *sbuf_, *rbuf_; + MPI_Request *sreq_, *rreq_; + + // ranges + GraphElem max_size_, min_size_, large_msg_size_; + int bw_loop_count_, bw_loop_count_large_, + bw_skip_count_, bw_skip_count_large_, + lt_loop_count_, lt_loop_count_large_, + lt_skip_count_, lt_skip_count_large_; + + float shrinkp_; // graph shrink percent + int rank_, size_, indegree_, outdegree_; + std::vector targets_, sources_; + MPI_Comm comm_, nbr_comm_; +}; + +#endif diff --git a/graph.hpp b/graph.hpp new file mode 100644 index 0000000..3e09825 --- /dev/null +++ b/graph.hpp @@ -0,0 +1,1214 @@ +// *********************************************************************** +// +// NEVE +// +// *********************************************************************** +// +// Copyright (2019) Battelle Memorial Institute +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions +// are met: +// +// 1. Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS +// FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE +// COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +// INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, +// BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +// CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +// LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN +// ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +// POSSIBILITY OF SUCH DAMAGE. +// +// ************************************************************************ + +#pragma once +#ifndef GRAPH_HPP +#define GRAPH_HPP + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "utils.hpp" + +unsigned seed; + +struct Edge +{ + GraphElem tail_; + GraphWeight weight_; + + Edge(): tail_(-1), weight_(0.0) {} +}; + +struct EdgeTuple +{ + GraphElem ij_[2]; + GraphWeight w_; + + EdgeTuple(GraphElem i, GraphElem j, GraphWeight w): + ij_{i, j}, w_(w) + {} + EdgeTuple(GraphElem i, GraphElem j): + ij_{i, j}, w_(1.0) + {} + EdgeTuple(): + ij_{-1, -1}, w_(0.0) + {} +}; + +// per process graph instance +class Graph +{ + public: + Graph(): + lnv_(-1), lne_(-1), nv_(-1), + ne_(-1), comm_(MPI_COMM_WORLD) + { + MPI_Comm_size(comm_, &size_); + MPI_Comm_rank(comm_, &rank_); + } + + Graph(GraphElem lnv, GraphElem lne, + GraphElem nv, GraphElem ne, + MPI_Comm comm=MPI_COMM_WORLD): + lnv_(lnv), lne_(lne), + nv_(nv), ne_(ne), + comm_(comm) + { + MPI_Comm_size(comm_, &size_); + MPI_Comm_rank(comm_, &rank_); + + edge_indices_.resize(lnv_+1, 0); + edge_list_.resize(lne_); // this is usually populated later + + parts_.resize(size_+1); + parts_[0] = 0; + + for (GraphElem i = 1; i < size_+1; i++) + parts_[i] = ((nv_ * i) / size_); + } + + ~Graph() + { + edge_list_.clear(); + edge_indices_.clear(); + parts_.clear(); + } + + // update vertex partition information + void repart(std::vector const& parts) + { memcpy(parts_.data(), parts.data(), sizeof(GraphElem)*(size_+1)); } + + // TODO FIXME put asserts like the following + // everywhere function member of Graph class + void set_edge_index(GraphElem const vertex, GraphElem const e0) + { +#if defined(DEBUG_BUILD) + assert((vertex >= 0) && (vertex <= lnv_)); + assert((e0 >= 0) && (e0 <= lne_)); + edge_indices_.at(vertex) = e0; +#else + edge_indices_[vertex] = e0; +#endif + } + + void edge_range(GraphElem const vertex, GraphElem& e0, + GraphElem& e1) const + { + e0 = edge_indices_[vertex]; + e1 = edge_indices_[vertex+1]; + } + + // collective + void set_nedges(GraphElem lne) + { + lne_ = lne; + edge_list_.resize(lne_); + + // compute total number of edges + ne_ = 0; + MPI_Allreduce(&lne_, &ne_, 1, MPI_GRAPH_TYPE, MPI_SUM, comm_); + } + + GraphElem get_base(const int rank) const + { return parts_[rank]; } + + GraphElem get_bound(const int rank) const + { return parts_[rank+1]; } + + GraphElem get_range(const int rank) const + { return (parts_[rank+1] - parts_[rank] + 1); } + + int get_owner(const GraphElem vertex) const + { + const std::vector::const_iterator iter = + std::upper_bound(parts_.begin(), parts_.end(), vertex); + + return (iter - parts_.begin() - 1); + } + + GraphElem get_lnv() const { return lnv_; } + GraphElem get_lne() const { return lne_; } + GraphElem get_nv() const { return nv_; } + GraphElem get_ne() const { return ne_; } + MPI_Comm get_comm() const { return comm_; } + + // return edge and active info + // ---------------------------- + + Edge const& get_edge(GraphElem const index) const + { return edge_list_[index]; } + + Edge& set_edge(GraphElem const index) + { return edge_list_[index]; } + + // local <--> global index translation + // ----------------------------------- + GraphElem local_to_global(GraphElem idx) + { return (idx + get_base(rank_)); } + + GraphElem global_to_local(GraphElem idx) + { return (idx - get_base(rank_)); } + + // w.r.t passed rank + GraphElem local_to_global(GraphElem idx, int rank) + { return (idx + get_base(rank)); } + + GraphElem global_to_local(GraphElem idx, int rank) + { return (idx - get_base(rank)); } + + // print edge list (with weights) + void print(bool print_weight = true) const + { + if (lne_ < MAX_PRINT_NEDGE) + { + for (int p = 0; p < size_; p++) + { + MPI_Barrier(comm_); + if (p == rank_) + { + std::cout << "###############" << std::endl; + std::cout << "Process #" << p << ": " << std::endl; + std::cout << "###############" << std::endl; + GraphElem base = get_base(p); + for (GraphElem i = 0; i < lnv_; i++) + { + GraphElem e0, e1; + edge_range(i, e0, e1); + if (print_weight) { // print weights (default) + for (GraphElem e = e0; e < e1; e++) + { + Edge const& edge = get_edge(e); + std::cout << i+base << " " << edge.tail_ << " " << edge.weight_ << std::endl; + } + } + else { // don't print weights + for (GraphElem e = e0; e < e1; e++) + { + Edge const& edge = get_edge(e); + std::cout << i+base << " " << edge.tail_ << std::endl; + } + } + } + MPI_Barrier(comm_); + } + } + } + else + { + if (rank_ == 0) + std::cout << "Graph size per process is {" << lnv_ << ", " << lne_ << + "}, which will overwhelm STDOUT." << std::endl; + } + } + + // print statistics about edge distribution + void print_dist_stats() + { + GraphElem sumdeg = 0, maxdeg = 0; + + MPI_Reduce(&lne_, &sumdeg, 1, MPI_GRAPH_TYPE, MPI_SUM, 0, comm_); + MPI_Reduce(&lne_, &maxdeg, 1, MPI_GRAPH_TYPE, MPI_MAX, 0, comm_); + + GraphElem my_sq = lne_*lne_; + GraphElem sum_sq = 0; + MPI_Reduce(&my_sq, &sum_sq, 1, MPI_GRAPH_TYPE, MPI_SUM, 0, comm_); + + GraphWeight average = (GraphWeight) sumdeg / size_; + GraphWeight avg_sq = (GraphWeight) sum_sq / size_; + GraphWeight var = avg_sq - (average*average); + GraphWeight stddev = sqrt(var); + + MPI_Barrier(comm_); + + if (rank_ == 0) + { + std::cout << std::endl; + std::cout << "-------------------------------------------------------" << std::endl; + std::cout << "Graph edge distribution characteristics" << std::endl; + std::cout << "-------------------------------------------------------" << std::endl; + std::cout << "Number of vertices: " << nv_ << std::endl; + std::cout << "Number of edges: " << ne_ << std::endl; + std::cout << "Maximum number of edges: " << maxdeg << std::endl; + std::cout << "Average number of edges: " << average << std::endl; + std::cout << "Expected value of X^2: " << avg_sq << std::endl; + std::cout << "Variance: " << var << std::endl; + std::cout << "Standard deviation: " << stddev << std::endl; + std::cout << "-------------------------------------------------------" << std::endl; + + } + } + + // public variables + std::vector edge_indices_; + std::vector edge_list_; + private: + GraphElem lnv_, lne_, nv_, ne_; + std::vector parts_; + MPI_Comm comm_; + int rank_, size_; +}; + +// read in binary edge list files +// using MPI I/O +class BinaryEdgeList +{ + public: + BinaryEdgeList() : + M_(-1), N_(-1), + M_local_(-1), N_local_(-1), + comm_(MPI_COMM_WORLD) + {} + BinaryEdgeList(MPI_Comm comm) : + M_(-1), N_(-1), + M_local_(-1), N_local_(-1), + comm_(comm) + {} + + // read a file and return a graph + Graph* read(int me, int nprocs, int ranks_per_node, std::string file) + { + int file_open_error; + MPI_File fh; + MPI_Status status; + + // specify the number of aggregates + MPI_Info info; + MPI_Info_create(&info); + int naggr = (ranks_per_node > 1) ? (nprocs/ranks_per_node) : ranks_per_node; + if (naggr >= nprocs) + naggr = 1; + std::stringstream tmp_str; + tmp_str << naggr; + std::string str = tmp_str.str(); + MPI_Info_set(info, "cb_nodes", str.c_str()); + + file_open_error = MPI_File_open(comm_, file.c_str(), MPI_MODE_RDONLY, info, &fh); + MPI_Info_free(&info); + + if (file_open_error != MPI_SUCCESS) + { + std::cout << " Error opening file! " << std::endl; + MPI_Abort(comm_, -99); + } + + // read the dimensions + MPI_File_read_all(fh, &M_, sizeof(GraphElem), MPI_BYTE, &status); + MPI_File_read_all(fh, &N_, sizeof(GraphElem), MPI_BYTE, &status); + M_local_ = ((M_*(me + 1)) / nprocs) - ((M_*me) / nprocs); + + // create local graph + Graph *g = new Graph(M_local_, 0, M_, N_); + + // Let N = array length and P = number of processors. + // From j = 0 to P-1, + // Starting point of array on processor j = floor(N * j / P) + // Length of array on processor j = floor(N * (j + 1) / P) - floor(N * j / P) + + uint64_t tot_bytes=(M_local_+1)*sizeof(GraphElem); + MPI_Offset offset = 2*sizeof(GraphElem) + ((M_*me) / nprocs)*sizeof(GraphElem); + + // read in INT_MAX increments if total byte size is > INT_MAX + + if (tot_bytes < INT_MAX) + MPI_File_read_at(fh, offset, &g->edge_indices_[0], tot_bytes, MPI_BYTE, &status); + else + { + int chunk_bytes=INT_MAX; + uint8_t *curr_pointer = (uint8_t*) &g->edge_indices_[0]; + uint64_t transf_bytes = 0; + + while (transf_bytes < tot_bytes) + { + MPI_File_read_at(fh, offset, curr_pointer, chunk_bytes, MPI_BYTE, &status); + transf_bytes += chunk_bytes; + offset += chunk_bytes; + curr_pointer += chunk_bytes; + + if ((tot_bytes - transf_bytes) < INT_MAX) + chunk_bytes = tot_bytes - transf_bytes; + } + } + + N_local_ = g->edge_indices_[M_local_] - g->edge_indices_[0]; + g->set_nedges(N_local_); + + tot_bytes = N_local_*(sizeof(Edge)); + offset = 2*sizeof(GraphElem) + (M_+1)*sizeof(GraphElem) + g->edge_indices_[0]*(sizeof(Edge)); + + if (tot_bytes < INT_MAX) + MPI_File_read_at(fh, offset, &g->edge_list_[0], tot_bytes, MPI_BYTE, &status); + else + { + int chunk_bytes=INT_MAX; + uint8_t *curr_pointer = (uint8_t*)&g->edge_list_[0]; + uint64_t transf_bytes = 0; + + while (transf_bytes < tot_bytes) + { + MPI_File_read_at(fh, offset, curr_pointer, chunk_bytes, MPI_BYTE, &status); + transf_bytes += chunk_bytes; + offset += chunk_bytes; + curr_pointer += chunk_bytes; + + if ((tot_bytes - transf_bytes) < INT_MAX) + chunk_bytes = (tot_bytes - transf_bytes); + } + } + + MPI_File_close(&fh); + + for(GraphElem i=1; i < M_local_+1; i++) + g->edge_indices_[i] -= g->edge_indices_[0]; + g->edge_indices_[0] = 0; + + return g; + } + + // find a distribution such that every + // process own equal number of edges (serial) + void find_balanced_num_edges(int nprocs, std::string file, std::vector& mbins) + { + FILE *fp; + GraphElem nv, ne; // #vertices, #edges + std::vector nbins(nprocs,0); + + fp = fopen(file.c_str(), "rb"); + if (fp == NULL) + { + std::cout<< " Error opening file! " << std::endl; + return; + } + + // read nv and ne + fread(&nv, sizeof(GraphElem), 1, fp); + fread(&ne, sizeof(GraphElem), 1, fp); + + // bin capacity + GraphElem nbcap = (ne / nprocs), ecount_idx, past_ecount_idx = 0; + int p = 0; + + for (GraphElem m = 0; m < nv; m++) + { + fread(&ecount_idx, sizeof(GraphElem), 1, fp); + + // bins[p] >= capacity only for the last process + if ((nbins[p] < nbcap) || (p == (nprocs - 1))) + nbins[p] += (ecount_idx - past_ecount_idx); + + // increment p as long as p is not the last process + // worst case: excess edges piled up on (p-1) + if ((nbins[p] >= nbcap) && (p < (nprocs - 1))) + p++; + + mbins[p+1]++; + past_ecount_idx = ecount_idx; + } + + fclose(fp); + + // prefix sum to store indices + for (int k = 1; k < nprocs+1; k++) + mbins[k] += mbins[k-1]; + + nbins.clear(); + } + + // read a file and return a graph + // uses a balanced distribution + // (approximately equal #edges per process) + Graph* read_balanced(int me, int nprocs, int ranks_per_node, std::string file) + { + int file_open_error; + MPI_File fh; + MPI_Status status; + std::vector mbins(nprocs+1,0); + + // find #vertices per process such that + // each process roughly owns equal #edges + if (me == 0) + { + find_balanced_num_edges(nprocs, file, mbins); + std::cout << "Trying to achieve equal edge distribution across processes." << std::endl; + } + MPI_Barrier(comm_); + MPI_Bcast(mbins.data(), nprocs+1, MPI_GRAPH_TYPE, 0, comm_); + + // specify the number of aggregates + MPI_Info info; + MPI_Info_create(&info); + int naggr = (ranks_per_node > 1) ? (nprocs/ranks_per_node) : ranks_per_node; + if (naggr >= nprocs) + naggr = 1; + std::stringstream tmp_str; + tmp_str << naggr; + std::string str = tmp_str.str(); + MPI_Info_set(info, "cb_nodes", str.c_str()); + + file_open_error = MPI_File_open(comm_, file.c_str(), MPI_MODE_RDONLY, info, &fh); + MPI_Info_free(&info); + + if (file_open_error != MPI_SUCCESS) + { + std::cout << " Error opening file! " << std::endl; + MPI_Abort(comm_, -99); + } + + // read the dimensions + MPI_File_read_all(fh, &M_, sizeof(GraphElem), MPI_BYTE, &status); + MPI_File_read_all(fh, &N_, sizeof(GraphElem), MPI_BYTE, &status); + M_local_ = mbins[me+1] - mbins[me]; + + // create local graph + Graph *g = new Graph(M_local_, 0, M_, N_); + // readjust parts with new vertex partition + g->repart(mbins); + + uint64_t tot_bytes=(M_local_+1)*sizeof(GraphElem); + MPI_Offset offset = 2*sizeof(GraphElem) + mbins[me]*sizeof(GraphElem); + + // read in INT_MAX increments if total byte size is > INT_MAX + if (tot_bytes < INT_MAX) + MPI_File_read_at(fh, offset, &g->edge_indices_[0], tot_bytes, MPI_BYTE, &status); + else + { + int chunk_bytes=INT_MAX; + uint8_t *curr_pointer = (uint8_t*) &g->edge_indices_[0]; + uint64_t transf_bytes = 0; + + while (transf_bytes < tot_bytes) + { + MPI_File_read_at(fh, offset, curr_pointer, chunk_bytes, MPI_BYTE, &status); + transf_bytes += chunk_bytes; + offset += chunk_bytes; + curr_pointer += chunk_bytes; + + if ((tot_bytes - transf_bytes) < INT_MAX) + chunk_bytes = tot_bytes - transf_bytes; + } + } + + N_local_ = g->edge_indices_[M_local_] - g->edge_indices_[0]; + g->set_nedges(N_local_); + + tot_bytes = N_local_*(sizeof(Edge)); + offset = 2*sizeof(GraphElem) + (M_+1)*sizeof(GraphElem) + g->edge_indices_[0]*(sizeof(Edge)); + + if (tot_bytes < INT_MAX) + MPI_File_read_at(fh, offset, &g->edge_list_[0], tot_bytes, MPI_BYTE, &status); + else + { + int chunk_bytes=INT_MAX; + uint8_t *curr_pointer = (uint8_t*)&g->edge_list_[0]; + uint64_t transf_bytes = 0; + + while (transf_bytes < tot_bytes) + { + MPI_File_read_at(fh, offset, curr_pointer, chunk_bytes, MPI_BYTE, &status); + transf_bytes += chunk_bytes; + offset += chunk_bytes; + curr_pointer += chunk_bytes; + + if ((tot_bytes - transf_bytes) < INT_MAX) + chunk_bytes = (tot_bytes - transf_bytes); + } + } + + MPI_File_close(&fh); + + for(GraphElem i=1; i < M_local_+1; i++) + g->edge_indices_[i] -= g->edge_indices_[0]; + g->edge_indices_[0] = 0; + + mbins.clear(); + + return g; + } + + private: + GraphElem M_; + GraphElem N_; + GraphElem M_local_; + GraphElem N_local_; + MPI_Comm comm_; +}; + +// RGG graph +// 1D vertex distribution +class GenerateRGG +{ + public: + GenerateRGG(GraphElem nv, MPI_Comm comm = MPI_COMM_WORLD) + { + nv_ = nv; + comm_ = comm; + + MPI_Comm_rank(comm_, &rank_); + MPI_Comm_size(comm_, &nprocs_); + + // neighbors + up_ = down_ = MPI_PROC_NULL; + if (nprocs_ > 1) { + if (rank_ > 0 && rank_ < (nprocs_ - 1)) { + up_ = rank_ - 1; + down_ = rank_ + 1; + } + if (rank_ == 0) + down_ = 1; + if (rank_ == (nprocs_ - 1)) + up_ = rank_ - 1; + } + n_ = nv_ / nprocs_; + + // check if number of nodes is divisible by #processes + if ((nv_ % nprocs_) != 0) { + if (rank_ == 0) { + std::cout << "[ERROR] Number of vertices must be perfectly divisible by number of processes." << std::endl; + std::cout << "Exiting..." << std::endl; + } + MPI_Abort(comm_, -99); + } + + // check if processes are power of 2 + if (!is_pwr2(nprocs_)) { + if (rank_ == 0) { + std::cout << "[ERROR] Number of processes must be a power of 2." << std::endl; + std::cout << "Exiting..." << std::endl; + } + MPI_Abort(comm_, -99); + } + + // calculate r(n) + GraphWeight rc = sqrt((GraphWeight)log(nv)/(GraphWeight)(PI*nv)); + GraphWeight rt = sqrt((GraphWeight)2.0736/(GraphWeight)nv); + rn_ = (rc + rt)/(GraphWeight)2.0; + + assert(((GraphWeight)1.0/(GraphWeight)nprocs_) > rn_); + + MPI_Barrier(comm_); + } + + // create RGG and returns Graph + // TODO FIXME use OpenMP wherever possible + // use Euclidean distance as edge weight + // for random edges, choose from (0,1) + // otherwise, use unit weight throughout + Graph* generate(bool isLCG, bool unitEdgeWeight = true, GraphWeight randomEdgePercent = 0.0) + { + // Generate random coordinate points + std::vector X, Y, X_up, Y_up, X_down, Y_down; + + if (isLCG) + X.resize(2*n_); + else + X.resize(n_); + + Y.resize(n_); + + if (up_ != MPI_PROC_NULL) { + X_up.resize(n_); + Y_up.resize(n_); + } + + if (down_ != MPI_PROC_NULL) { + X_down.resize(n_); + Y_down.resize(n_); + } + + // create local graph + Graph *g = new Graph(n_, 0, nv_, nv_); + + // generate random number within range + // X: 0, 1 + // Y: rank_*1/p, (rank_+1)*1/p, + GraphWeight rec_np = (GraphWeight)(1.0/(GraphWeight)nprocs_); + GraphWeight lo = rank_* rec_np; + GraphWeight hi = lo + rec_np; + assert(hi > lo); + + // measure the time to generate random numbers + MPI_Barrier(MPI_COMM_WORLD); + double st = MPI_Wtime(); + + if (!isLCG) { + // set seed (declared an extern in utils) + seed = (unsigned)reseeder(1); + +#if defined(PRINT_RANDOM_XY_COORD) + for (int k = 0; k < nprocs_; k++) { + if (k == rank_) { + std::cout << "Random number generated on Process#" << k << " :" << std::endl; + for (GraphElem i = 0; i < n_; i++) { + X[i] = genRandom(0.0, 1.0); + Y[i] = genRandom(lo, hi); + std::cout << "X, Y: " << X[i] << ", " << Y[i] << std::endl; + } + } + MPI_Barrier(comm_); + } +#else + for (GraphElem i = 0; i < n_; i++) { + X[i] = genRandom(0.0, 1.0); + Y[i] = genRandom(lo, hi); + } +#endif + } + else { // LCG + // X | Y + // e.g seeds: 1741, 3821 + // create LCG object + // seed to generate x0 + LCG xr(/*seed*/1, X.data(), 2*n_, comm_); + + // generate random numbers between 0-1 + xr.generate(); + + // rescale xr further between lo-hi + // and put the numbers in Y taking + // from X[n] + xr.rescale(Y.data(), n_, lo); + +#if defined(PRINT_RANDOM_XY_COORD) + for (int k = 0; k < nprocs_; k++) { + if (k == rank_) { + std::cout << "Random number generated on Process#" << k << " :" << std::endl; + for (GraphElem i = 0; i < n_; i++) { + std::cout << "X, Y: " << X[i] << ", " << Y[i] << std::endl; + } + } + MPI_Barrier(comm_); + } +#endif + } + + double et = MPI_Wtime(); + double tt = et - st; + double tot_tt = 0.0; + MPI_Reduce(&tt, &tot_tt, 1, MPI_DOUBLE, MPI_SUM, 0, comm_); + + if (rank_ == 0) { + double tot_avg = (tot_tt/nprocs_); + std::cout << "Average time to generate " << 2*n_ + << " random numbers using LCG (in s): " + << tot_avg << std::endl; + } + + // ghost(s) + + // cross edges, each processor + // communicates with up or/and down + // neighbor only + std::vector sendup_edges, senddn_edges; + std::vector recvup_edges, recvdn_edges; + std::vector edgeList; + + // counts, indexing: [2] = {up - 0, down - 1} + // TODO can't we use MPI_INT + std::array send_sizes = {0, 0}, recv_sizes = {0, 0}; +#if defined(CHECK_NUM_EDGES) + GraphElem numEdges = 0; +#endif + // local + for (GraphElem i = 0; i < n_; i++) { + for (GraphElem j = i + 1; j < n_; j++) { + // euclidean distance: + // 2D: sqrt((px-qx)^2 + (py-qy)^2) + GraphWeight dx = X[i] - X[j]; + GraphWeight dy = Y[i] - Y[j]; + GraphWeight ed = sqrt(dx*dx + dy*dy); + // are the two vertices within the range? + if (ed <= rn_) { + // local to global index + const GraphElem g_i = g->local_to_global(i); + const GraphElem g_j = g->local_to_global(j); + + if (!unitEdgeWeight) { + edgeList.emplace_back(i, g_j, ed); + edgeList.emplace_back(j, g_i, ed); + } + else { + edgeList.emplace_back(i, g_j); + edgeList.emplace_back(j, g_i); + } +#if defined(CHECK_NUM_EDGES) + numEdges += 2; +#endif + + g->edge_indices_[i+1]++; + g->edge_indices_[j+1]++; + } + } + } + + MPI_Barrier(comm_); + + // communicate ghost coordinates with neighbors + + const int x_ndown = X_down.empty() ? 0 : n_; + const int y_ndown = Y_down.empty() ? 0 : n_; + const int x_nup = X_up.empty() ? 0 : n_; + const int y_nup = Y_up.empty() ? 0 : n_; + + MPI_Sendrecv(X.data(), n_, MPI_WEIGHT_TYPE, up_, SR_X_UP_TAG, + X_down.data(), x_ndown, MPI_WEIGHT_TYPE, down_, SR_X_UP_TAG, + comm_, MPI_STATUS_IGNORE); + MPI_Sendrecv(X.data(), n_, MPI_WEIGHT_TYPE, down_, SR_X_DOWN_TAG, + X_up.data(), x_nup, MPI_WEIGHT_TYPE, up_, SR_X_DOWN_TAG, + comm_, MPI_STATUS_IGNORE); + MPI_Sendrecv(Y.data(), n_, MPI_WEIGHT_TYPE, up_, SR_Y_UP_TAG, + Y_down.data(), y_ndown, MPI_WEIGHT_TYPE, down_, SR_Y_UP_TAG, + comm_, MPI_STATUS_IGNORE); + MPI_Sendrecv(Y.data(), n_, MPI_WEIGHT_TYPE, down_, SR_Y_DOWN_TAG, + Y_up.data(), y_nup, MPI_WEIGHT_TYPE, up_, SR_Y_DOWN_TAG, + comm_, MPI_STATUS_IGNORE); + + // exchange ghost vertices / cross edges + if (nprocs_ > 1) { + if (up_ != MPI_PROC_NULL) { + + for (GraphElem i = 0; i < n_; i++) { + for (GraphElem j = i + 1; j < n_; j++) { + GraphWeight dx = X[i] - X_up[j]; + GraphWeight dy = Y[i] - Y_up[j]; + GraphWeight ed = sqrt(dx*dx + dy*dy); + + if (ed <= rn_) { + const GraphElem g_i = g->local_to_global(i); + const GraphElem g_j = j + up_*n_; + + if (!unitEdgeWeight) { + sendup_edges.emplace_back(j, g_i, ed); + edgeList.emplace_back(i, g_j, ed); + } + else { + sendup_edges.emplace_back(j, g_i); + edgeList.emplace_back(i, g_j); + } +#if defined(CHECK_NUM_EDGES) + numEdges++; +#endif + g->edge_indices_[i+1]++; + } + } + } + + // send up sizes + send_sizes[0] = sendup_edges.size(); + } + + if (down_ != MPI_PROC_NULL) { + + for (GraphElem i = 0; i < n_; i++) { + for (GraphElem j = i + 1; j < n_; j++) { + GraphWeight dx = X[i] - X_down[j]; + GraphWeight dy = Y[i] - Y_down[j]; + GraphWeight ed = sqrt(dx*dx + dy*dy); + + if (ed <= rn_) { + const GraphElem g_i = g->local_to_global(i); + const GraphElem g_j = j + down_*n_; + + if (!unitEdgeWeight) { + senddn_edges.emplace_back(j, g_i, ed); + edgeList.emplace_back(i, g_j, ed); + } + else { + senddn_edges.emplace_back(j, g_i); + edgeList.emplace_back(i, g_j); + } +#if defined(CHECK_NUM_EDGES) + numEdges++; +#endif + g->edge_indices_[i+1]++; + } + } + } + + // send down sizes + send_sizes[1] = senddn_edges.size(); + } + } + + MPI_Barrier(comm_); + + // communicate ghost vertices with neighbors + // send/recv buffer sizes + + MPI_Sendrecv(&send_sizes[0], 1, MPI_GRAPH_TYPE, up_, SR_SIZES_UP_TAG, + &recv_sizes[1], 1, MPI_GRAPH_TYPE, down_, SR_SIZES_UP_TAG, + comm_, MPI_STATUS_IGNORE); + MPI_Sendrecv(&send_sizes[1], 1, MPI_GRAPH_TYPE, down_, SR_SIZES_DOWN_TAG, + &recv_sizes[0], 1, MPI_GRAPH_TYPE, up_, SR_SIZES_DOWN_TAG, + comm_, MPI_STATUS_IGNORE); + + // resize recv buffers + + if (recv_sizes[0] > 0) + recvup_edges.resize(recv_sizes[0]); + if (recv_sizes[1] > 0) + recvdn_edges.resize(recv_sizes[1]); + + // send/recv both up and down + + MPI_Sendrecv(sendup_edges.data(), send_sizes[0]*sizeof(struct EdgeTuple), MPI_BYTE, + up_, SR_UP_TAG, recvdn_edges.data(), recv_sizes[1]*sizeof(struct EdgeTuple), + MPI_BYTE, down_, SR_UP_TAG, comm_, MPI_STATUS_IGNORE); + MPI_Sendrecv(senddn_edges.data(), send_sizes[1]*sizeof(struct EdgeTuple), MPI_BYTE, + down_, SR_DOWN_TAG, recvup_edges.data(), recv_sizes[0]*sizeof(struct EdgeTuple), + MPI_BYTE, up_, SR_DOWN_TAG, comm_, MPI_STATUS_IGNORE); + + // update local #edges + + // down + if (down_ != MPI_PROC_NULL) { + for (GraphElem i = 0; i < recv_sizes[1]; i++) { +#if defined(CHECK_NUM_EDGES) + numEdges++; +#endif + if (!unitEdgeWeight) + edgeList.emplace_back(recvdn_edges[i].ij_[0], recvdn_edges[i].ij_[1], recvdn_edges[i].w_); + else + edgeList.emplace_back(recvdn_edges[i].ij_[0], recvdn_edges[i].ij_[1]); + g->edge_indices_[recvdn_edges[i].ij_[0]+1]++; + } + } + + // up + if (up_ != MPI_PROC_NULL) { + for (GraphElem i = 0; i < recv_sizes[0]; i++) { +#if defined(CHECK_NUM_EDGES) + numEdges++; +#endif + if (!unitEdgeWeight) + edgeList.emplace_back(recvup_edges[i].ij_[0], recvup_edges[i].ij_[1], recvup_edges[i].w_); + else + edgeList.emplace_back(recvup_edges[i].ij_[0], recvup_edges[i].ij_[1]); + g->edge_indices_[recvup_edges[i].ij_[0]+1]++; + } + } + + // add random edges based on + // randomEdgePercent + if (randomEdgePercent > 0.0) { + const GraphElem pnedges = (edgeList.size()/2); + GraphElem tot_pnedges = 0; + + MPI_Allreduce(&pnedges, &tot_pnedges, 1, MPI_GRAPH_TYPE, MPI_SUM, comm_); + + // extra #edges per process + const GraphElem nrande = ((GraphElem)(randomEdgePercent * (GraphWeight)tot_pnedges)/100); + + GraphElem pnrande; + + // TODO FIXME try to ensure a fair edge distibution + if (nrande < nprocs_) { + if (rank_ == (nprocs_ - 1)) + pnrande += nrande; + } + else { + pnrande = nrande / nprocs_; + const GraphElem pnrem = nrande % nprocs_; + if (pnrem != 0) { + if (rank_ == (nprocs_ - 1)) + pnrande += pnrem; + } + } + + // add pnrande edges + + // send/recv buffers + std::vector> rand_edges(nprocs_); + std::vector sendrand_edges, recvrand_edges; + + // outgoing/incoming send/recv sizes + // TODO FIXME if number of randomly added edges are above + // INT_MAX, weird things will happen, fix it + std::vector sendrand_sizes(nprocs_), recvrand_sizes(nprocs_); + +#if defined(PRINT_EXTRA_NEDGES) + int extraEdges = 0; +#endif + +#if defined(DEBUG_PRINTF) + for (int i = 0; i < nprocs_; i++) { + if (i == rank_) { + std::cout << "[" << i << "]Target process for random edge insertion between " + << lo << " and " << hi << std::endl; + } + MPI_Barrier(comm_); + } +#endif + // make sure each process has a + // different seed this time since + // we want random edges + unsigned rande_seed = (unsigned)(time(0)^getpid()); + GraphWeight weight = 1.0; + std::hash reh; + + // cannot use genRandom if it's already been seeded + std::default_random_engine re(rande_seed); + std::uniform_int_distribution IR, JR; + std::uniform_real_distribution IJW; + + for (GraphElem k = 0; k < pnrande; k++) { + + // randomly pick start/end vertex and target from my list + const GraphElem i = (GraphElem)IR(re, std::uniform_int_distribution::param_type{0, (n_- 1)}); + const GraphElem g_j = (GraphElem)JR(re, std::uniform_int_distribution::param_type{0, (nv_- 1)}); + const int target = g->get_owner(g_j); + const GraphElem j = g->global_to_local(g_j, target); // local + + if (i == j) + continue; + + const GraphElem g_i = g->local_to_global(i); + + // check for duplicates prior to edgeList insertion + auto found = std::find_if(edgeList.begin(), edgeList.end(), + [&](EdgeTuple const& et) + { return ((et.ij_[0] == i) && (et.ij_[1] == g_j)); }); + + // OK to insert, not in list + if (found == std::end(edgeList)) { + + // calculate weight + if (!unitEdgeWeight) { + if (target == rank_) { + GraphWeight dx = X[i] - X[j]; + GraphWeight dy = Y[i] - Y[j]; + weight = sqrt(dx*dx + dy*dy); + } + else if (target == up_) { + GraphWeight dx = X[i] - X_up[j]; + GraphWeight dy = Y[i] - Y_up[j]; + weight = sqrt(dx*dx + dy*dy); + } + else if (target == down_) { + GraphWeight dx = X[i] - X_down[j]; + GraphWeight dy = Y[i] - Y_down[j]; + weight = sqrt(dx*dx + dy*dy); + } + else { + unsigned randw_seed = reh((GraphElem)(g_i*nv_+g_j)); + std::default_random_engine rew(randw_seed); + weight = (GraphWeight)IJW(rew, std::uniform_real_distribution::param_type{0.01, 1.0}); + } + } + + rand_edges[target].emplace_back(j, g_i, weight); + sendrand_sizes[target]++; + +#if defined(PRINT_EXTRA_NEDGES) + extraEdges++; +#endif +#if defined(CHECK_NUM_EDGES) + numEdges++; +#endif + edgeList.emplace_back(i, g_j, weight); + g->edge_indices_[i+1]++; + } + } + +#if defined(PRINT_EXTRA_NEDGES) + int totExtraEdges = 0; + MPI_Reduce(&extraEdges, &totExtraEdges, 1, MPI_INT, MPI_SUM, 0, comm_); + if (rank_ == 0) + std::cout << "Adding extra " << totExtraEdges << " edges while trying to incorporate " + << randomEdgePercent << "%" << " extra edges globally." << std::endl; +#endif + + MPI_Barrier(comm_); + + // communicate ghosts edges + MPI_Request rande_sreq; + + MPI_Ialltoall(sendrand_sizes.data(), 1, MPI_INT, + recvrand_sizes.data(), 1, MPI_INT, comm_, + &rande_sreq); + + // send data if outgoing size > 0 + for (int p = 0; p < nprocs_; p++) { + sendrand_edges.insert(sendrand_edges.end(), + rand_edges[p].begin(), rand_edges[p].end()); + } + + MPI_Wait(&rande_sreq, MPI_STATUS_IGNORE); + + // total recvbuffer size + const int rcount = std::accumulate(recvrand_sizes.begin(), recvrand_sizes.end(), 0); + recvrand_edges.resize(rcount); + + // alltoallv for incoming data + // TODO FIXME make sure size of extra edges is + // within INT limits + + int rpos = 0, spos = 0; + std::vector sdispls(nprocs_), rdispls(nprocs_); + + for (int p = 0; p < nprocs_; p++) { + + sendrand_sizes[p] *= sizeof(struct EdgeTuple); + recvrand_sizes[p] *= sizeof(struct EdgeTuple); + + sdispls[p] = spos; + rdispls[p] = rpos; + + spos += sendrand_sizes[p]; + rpos += recvrand_sizes[p]; + } + + MPI_Alltoallv(sendrand_edges.data(), sendrand_sizes.data(), sdispls.data(), + MPI_BYTE, recvrand_edges.data(), recvrand_sizes.data(), rdispls.data(), + MPI_BYTE, comm_); + + // update local edge list + for (int i = 0; i < rcount; i++) { +#if defined(CHECK_NUM_EDGES) + numEdges++; +#endif + edgeList.emplace_back(recvrand_edges[i].ij_[0], recvrand_edges[i].ij_[1], recvrand_edges[i].w_); + g->edge_indices_[recvrand_edges[i].ij_[0]+1]++; + } + + sendrand_edges.clear(); + recvrand_edges.clear(); + rand_edges.clear(); + } // end of (conditional) random edges addition + + MPI_Barrier(comm_); + + // set graph edge indices + + std::vector ecTmp(n_+1); + std::partial_sum(g->edge_indices_.begin(), g->edge_indices_.end(), ecTmp.begin()); + g->edge_indices_ = ecTmp; + + for(GraphElem i = 1; i < n_+1; i++) + g->edge_indices_[i] -= g->edge_indices_[0]; + g->edge_indices_[0] = 0; + + g->set_edge_index(0, 0); + for (GraphElem i = 0; i < n_; i++) + g->set_edge_index(i+1, g->edge_indices_[i+1]); + + const GraphElem nedges = g->edge_indices_[n_] - g->edge_indices_[0]; + g->set_nedges(nedges); + + // set graph edge list + // sort edge list + auto ecmp = [] (EdgeTuple const& e0, EdgeTuple const& e1) + { return ((e0.ij_[0] < e1.ij_[0]) || ((e0.ij_[0] == e1.ij_[0]) && (e0.ij_[1] < e1.ij_[1]))); }; + + if (!std::is_sorted(edgeList.begin(), edgeList.end(), ecmp)) { +#if defined(DEBUG_PRINTF) + std::cout << "Edge list is not sorted." << std::endl; +#endif + std::sort(edgeList.begin(), edgeList.end(), ecmp); + } +#if defined(DEBUG_PRINTF) + else + std::cout << "Edge list is sorted!" << std::endl; +#endif + + GraphElem ePos = 0; + for (GraphElem i = 0; i < n_; i++) { + GraphElem e0, e1; + + g->edge_range(i, e0, e1); +#if defined(DEBUG_PRINTF) + if ((i % 100000) == 0) + std::cout << "Processing edges for vertex: " << i << ", range(" << e0 << ", " << e1 << + ")" << std::endl; +#endif + for (GraphElem j = e0; j < e1; j++) { + Edge &edge = g->set_edge(j); + + assert(ePos == j); + assert(i == edgeList[ePos].ij_[0]); + + edge.tail_ = edgeList[ePos].ij_[1]; + edge.weight_ = edgeList[ePos].w_; + + ePos++; + } + } + +#if defined(CHECK_NUM_EDGES) + GraphElem tot_numEdges = 0; + MPI_Allreduce(&numEdges, &tot_numEdges, 1, MPI_GRAPH_TYPE, MPI_SUM, comm_); + const GraphElem tne = g->get_ne(); + assert(tne == tot_numEdges); +#endif + edgeList.clear(); + + X.clear(); + Y.clear(); + X_up.clear(); + Y_up.clear(); + X_down.clear(); + Y_down.clear(); + + sendup_edges.clear(); + senddn_edges.clear(); + recvup_edges.clear(); + recvdn_edges.clear(); + + return g; + } + + GraphWeight get_d() const { return rn_; } + GraphElem get_nv() const { return nv_; } + + private: + GraphElem nv_, n_; + GraphWeight rn_; + MPI_Comm comm_; + int nprocs_, rank_, up_, down_; +}; + +#endif diff --git a/main.cpp b/main.cpp new file mode 100644 index 0000000..b6fedff --- /dev/null +++ b/main.cpp @@ -0,0 +1,424 @@ +// *********************************************************************** +// +// NEVE +// +// *********************************************************************** +// +// Copyright (2019) Battelle Memorial Institute +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions +// are met: +// +// 1. Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS +// FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE +// COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +// INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, +// BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +// CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +// LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN +// ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +// POSSIBILITY OF SUCH DAMAGE. +// +// ************************************************************************ + + + +#include +#include +#include + +#include +#include + +#include +#include +#include +#include + +#include +#include "comm.hpp" + +static std::string inputFileName; +static int me, nprocs; +static int ranksPerNode = 1; +static GraphElem nvRGG = 0; +static int generateGraph = 0; + +static GraphWeight randomEdgePercent = 0.0; +static long minSizeExchange = 0; +static long maxSizeExchange = 0; +static long maxNumGhosts = 0; +static bool readBalanced = false; +static bool hardSkip = false; +static bool randomNumberLCG = false; +static bool fallAsleep = false; + +static int lttOption = 0; +static bool performBWTest = false; +static bool performLTTest = false; +static bool performLTTestNbrAlltoAll = false; +static bool performLTTestNbrAllGather = false; + +static bool chooseSingleNbr = false; +static int processNbr = 0; +static bool shrinkGraph = false; +static float graphShrinkPercent = 0; + +// parse command line parameters +static void parseCommandLine(const int argc, char * const argv[]); + +int main(int argc, char *argv[]) +{ + double t0, t1, td, td0, td1; + + MPI_Init(&argc, &argv); +#if defined(SCOREP_USER_ENABLE) + SCOREP_RECORDING_OFF(); +#endif + MPI_Comm_size(MPI_COMM_WORLD, &nprocs); + MPI_Comm_rank(MPI_COMM_WORLD, &me); + + // command line options + parseCommandLine(argc, argv); + + Graph* g = nullptr; + + td0 = MPI_Wtime(); + + // generate graph only supports RGG as of now + if (generateGraph) + { + GenerateRGG gr(nvRGG); + g = gr.generate(randomNumberLCG, true /*isUnitEdgeWeight*/, randomEdgePercent); + } + else + { // read input graph + BinaryEdgeList rm; + if (readBalanced == true) + { + if (me == 0) + { + std::cout << std::endl; + std::cout << "Trying to balance the edge distribution while reading: " << std::endl; + std::cout << inputFileName << std::endl; + } + g = rm.read_balanced(me, nprocs, ranksPerNode, inputFileName); + } + else + g = rm.read(me, nprocs, ranksPerNode, inputFileName); + } + +#if defined(PRINT_GRAPH_EDGES) + g->print(); +#endif + g->print_dist_stats(); + assert(g != nullptr); + + MPI_Barrier(MPI_COMM_WORLD); +#ifdef DEBUG_PRINTF + assert(g); +#endif + td1 = MPI_Wtime(); + td = td1 - td0; + + double tdt = 0.0; + MPI_Reduce(&td, &tdt, 1, MPI_DOUBLE, MPI_MAX, 0, MPI_COMM_WORLD); + + if (me == 0) + { + if (!generateGraph) + std::cout << "Time to read input file and create distributed graph (in s): " + << tdt << std::endl; + else + std::cout << "Time to generate distributed graph of " + << nvRGG << " vertices (in s): " << tdt << std::endl; + } + + // Comm object can be instantiated + // with iteration ranges and other + // info, see class Comm in comm.hpp + if (maxSizeExchange == 0) + maxSizeExchange = MAX_SIZE; + if (minSizeExchange == 0) + minSizeExchange = MIN_SIZE; + + Comm c(g, minSizeExchange, maxSizeExchange, graphShrinkPercent); + + MPI_Barrier(MPI_COMM_WORLD); + + t0 = MPI_Wtime(); + + // bandwidth test + if (performBWTest) + { + if (chooseSingleNbr) + { + if (me == 0) + { + std::cout << "Choosing the neighborhood of process #" << processNbr + << " for bandwidth test." << std::endl; + } + if (maxNumGhosts > 0) + c.p2p_bw_snbr(processNbr, maxNumGhosts); + else + c.p2p_bw_snbr(processNbr); + } + else + { + if (hardSkip) + c.p2p_bw_hardskip(); + else + c.p2p_bw(); + } + } + + // latency tests + if (performLTTest || performLTTestNbrAlltoAll || performLTTestNbrAllGather) + { + if (performLTTest) + { + if (chooseSingleNbr) + { + if (me == 0) + { + std::cout << "Choosing the neighborhood of process #" << processNbr + << " for latency test." << std::endl; + } + c.p2p_lt_snbr(processNbr); + } + else { + if (fallAsleep) { + if (me == 0) + std::cout << "Invoking (u)sleep for an epoch equal to #locally-owned-vertices" << std::endl; + c.p2p_lt_usleep(); + } + else + c.p2p_lt(); + } + } + + if (performLTTestNbrAlltoAll) + { + if (chooseSingleNbr) + { + if (me == 0) + { + std::cout << "Choosing the neighborhood of process #" << processNbr + << " for latency test (using MPI_Isend/Irecv)." << std::endl; + } + c.p2p_lt_snbr(processNbr); + } + else + c.nbr_ala_lt(); + } + + if (performLTTestNbrAllGather) + { + if (chooseSingleNbr) + { + if (me == 0) + { + std::cout << "Choosing the neighborhood of process #" << processNbr + << " for latency test (using MPI_Isend/Irecv)." << std::endl; + } + c.p2p_lt_snbr(processNbr); + } + else + c.nbr_aga_lt(); + } + } + + MPI_Barrier(MPI_COMM_WORLD); + t1 = MPI_Wtime(); + double p_tot = t1 - t0, t_tot = 0.0; + + MPI_Reduce(&p_tot, &t_tot, 1, MPI_DOUBLE, + MPI_SUM, 0, MPI_COMM_WORLD); + if (me == 0) + { + std::cout << "Average execution time (in s) for running the test on " << nprocs << " processes: " + << (double)(t_tot/(double)nprocs) << std::endl; + std::cout << "Resolution of MPI_Wtime: " << MPI_Wtick() << std::endl; + } + + c.destroy_nbr_comm(); + + MPI_Barrier(MPI_COMM_WORLD); + + MPI_Finalize(); + + return 0; +} + +void parseCommandLine(const int argc, char * const argv[]) +{ + int ret; + + while ((ret = getopt(argc, argv, "f:r:n:lhp:m:x:bg:t:ws:z:u")) != -1) { + switch (ret) { + case 'f': + inputFileName.assign(optarg); + break; + case 'b': + readBalanced = true; + break; + case 'r': + ranksPerNode = atoi(optarg); + break; + case 'n': + nvRGG = atol(optarg); + if (nvRGG > 0) + generateGraph = true; + break; + case 'l': + randomNumberLCG = true; + break; + case 'p': + randomEdgePercent = atof(optarg); + break; + case 'x': + maxSizeExchange = atol(optarg); + break; + case 'm': + minSizeExchange = atol(optarg); + break; + case 'g': + maxNumGhosts = atol(optarg); + break; + case 'w': + performBWTest = true; + break; + case 't': + lttOption = atoi(optarg); + if (lttOption == 0) + performLTTest = true; + else if (lttOption == 1) + performLTTestNbrAlltoAll = true; + else if (lttOption == 2) + performLTTestNbrAllGather = true; + else + performLTTest = true; + break; + case 'h': + hardSkip = true; + break; + case 's': + chooseSingleNbr = true; + processNbr = atoi(optarg); + break; + case 'z': + shrinkGraph = true; + graphShrinkPercent = atof(optarg); + break; + case 'u': + fallAsleep = true; + break; + default: + assert(0 && "Should not reach here!!"); + break; + } + } + + // warnings/info + if (me == 0 && (performLTTest || performLTTestNbrAlltoAll || performLTTestNbrAllGather) && maxNumGhosts) + { + std::cout << "Setting the number of ghost vertices (-g <...>) has no effect for latency test." + << std::endl; + } + + if (me == 0 && generateGraph && readBalanced) + { + std::cout << "Balanced read (option -b) is only applicable for real-world graphs. " + << "This option does nothing for generated (synthetic) graphs." << std::endl; + } + + if (me == 0 && generateGraph && shrinkGraph && graphShrinkPercent > 0.0) + { + std::cout << "Graph shrinking (option -z) is only applicable for real-world graphs. " + << "This option does nothing for generated (synthetic) graphs." << std::endl; + } + + if (me == 0 && shrinkGraph && graphShrinkPercent <= 0.0) + { + std::cout << "Graph shrinking (option -z) must be greater than 0.0. " << std::endl; + } + + if (me == 0 && shrinkGraph && (performLTTest || performLTTestNbrAlltoAll || performLTTestNbrAllGather)) + { + std::cout << "Graph shrinking is ONLY valid for bandwidth test, NOT latency test which just performs message exchanges across the process neighborhood of a graph." << std::endl; + } + + if (me == 0 && (performLTTest || performLTTestNbrAlltoAll || performLTTestNbrAllGather) && hardSkip) + { + std::cout << "The hard skip option to disable warmup and extra communication loops only affects the bandwidth test." << std::endl; + } + + if (me == 0 && chooseSingleNbr && (performLTTestNbrAlltoAll || performLTTestNbrAllGather)) + { + std::cout << "At present, only MPI Isend/Irecv communication is supported when a single process's neighborhood is selected.." << std::endl; + } + + if (me == 0 && lttOption > 2) + { + std::cout << "Valid values for latency test arguments are 0 (Isend/Irecv, the default case), 1 (Neighbor All-to-All) and 2 (Neighbor All-Gather)." << std::endl; + } + + // errors + if (me == 0 && (argc == 1)) + { + std::cerr << "Must specify some options." << std::endl; + MPI_Abort(MPI_COMM_WORLD, -99); + } + + if (me == 0 && !generateGraph && inputFileName.empty()) + { + std::cerr << "Must specify a binary file name with -f or provide parameters for generating a graph." << std::endl; + MPI_Abort(MPI_COMM_WORLD, -99); + } + + if (me == 0 && !generateGraph && randomNumberLCG) + { + std::cerr << "Must specify -g for graph generation using LCG." << std::endl; + MPI_Abort(MPI_COMM_WORLD, -99); + } + + if (me == 0 && !generateGraph && (randomEdgePercent > 0.0)) + { + std::cerr << "Must specify -g for graph generation first to add random edges to it." << std::endl; + MPI_Abort(MPI_COMM_WORLD, -99); + } + + if (me == 0 && generateGraph && ((randomEdgePercent < 0.0) || (randomEdgePercent >= 100.0))) + { + std::cerr << "Invalid random edge percentage for generated graph!" << std::endl; + MPI_Abort(MPI_COMM_WORLD, -99); + } + + if (me == 0 && !chooseSingleNbr && (maxNumGhosts > 0)) + { + std::cerr << "Fixing ghosts only allowed when a single neighborhood (-s <...>) is chosen." << std::endl; + MPI_Abort(MPI_COMM_WORLD, -99); + } + + if (me == 0 && !generateGraph && shrinkGraph && (graphShrinkPercent != 0.0 && (graphShrinkPercent < 0.0 || graphShrinkPercent > 100.0))) + { + std::cerr << "Allowable value of graph shrink percentage is 0.0...-100%." << std::endl; + MPI_Abort(MPI_COMM_WORLD, -99); + } +} // parseCommandLine diff --git a/utils.hpp b/utils.hpp new file mode 100644 index 0000000..fbe4953 --- /dev/null +++ b/utils.hpp @@ -0,0 +1,329 @@ +// *********************************************************************** +// +// NEVE +// +// *********************************************************************** +// +// Copyright (2019) Battelle Memorial Institute +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions +// are met: +// +// 1. Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS +// FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE +// COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +// INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, +// BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +// CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +// LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN +// ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +// POSSIBILITY OF SUCH DAMAGE. +// +// ************************************************************************ + + +#pragma once +#ifndef UTILS_HPP +#define UTILS_HPP + +#define PI (3.14159) +#define MAX_PRINT_NEDGE (100000) + +// Read https://en.wikipedia.org/wiki/Linear_congruential_generator#Period_length +// about choice of LCG parameters +// From numerical recipes +// TODO FIXME investigate larger periods +#define MLCG (2147483647) // 2^31 - 1 +#define ALCG (16807) // 7^5 +#define BLCG (0) + +#define SR_UP_TAG 100 +#define SR_DOWN_TAG 101 +#define SR_SIZES_UP_TAG 102 +#define SR_SIZES_DOWN_TAG 103 +#define SR_X_UP_TAG 104 +#define SR_X_DOWN_TAG 105 +#define SR_Y_UP_TAG 106 +#define SR_Y_DOWN_TAG 107 +#define SR_LCG_TAG 108 + +#include +#include +#include + +#ifdef USE_32_BIT_GRAPH +using GraphElem = int32_t; +using GraphWeight = float; +const MPI_Datatype MPI_GRAPH_TYPE = MPI_INT32_T; +const MPI_Datatype MPI_WEIGHT_TYPE = MPI_FLOAT; +#else +using GraphElem = int64_t; +using GraphWeight = double; +const MPI_Datatype MPI_GRAPH_TYPE = MPI_INT64_T; +const MPI_Datatype MPI_WEIGHT_TYPE = MPI_DOUBLE; +#endif + +extern unsigned seed; + +// Is nprocs a power-of-2? +int is_pwr2(int nprocs) +{ return ((nprocs != 0) && !(nprocs & (nprocs - 1))); } + +// return unint32_t seed +GraphElem reseeder(unsigned initseed) +{ + std::seed_seq seq({initseed}); + std::vector seeds(1); + seq.generate(seeds.begin(), seeds.end()); + + return (GraphElem)seeds[0]; +} + +// Local random number generator +template +T genRandom(T lo, T hi) +{ + thread_local static G gen(seed); + using Dist = typename std::conditional + < + std::is_integral::value + , std::uniform_int_distribution + , std::uniform_real_distribution + >::type; + + thread_local static Dist utd {}; + return utd(gen, typename Dist::param_type{lo, hi}); +} + +// Parallel Linear Congruential Generator +// x[i] = (a*x[i-1] + b)%M +class LCG +{ + public: + LCG(unsigned seed, GraphWeight* drand, + GraphElem n, MPI_Comm comm = MPI_COMM_WORLD): + seed_(seed), drand_(drand), n_(n) + { + comm_ = comm; + MPI_Comm_size(comm_, &nprocs_); + MPI_Comm_rank(comm_, &rank_); + + // allocate long random numbers + rnums_.resize(n_); + + // init x0 + if (rank_ == 0) + x0_ = reseeder(seed_); + + // step #1: bcast x0 from root + MPI_Bcast(&x0_, 1, MPI_GRAPH_TYPE, 0, comm_); + + // step #2: parallel prefix to generate first random value per process + parallel_prefix_op(); + } + + ~LCG() { rnums_.clear(); } + + // matrix-matrix multiplication for 2x2 matrices + void matmat_2x2(GraphElem c[], GraphElem a[], GraphElem b[]) + { + for (int i = 0; i < 2; i++) { + for (int j = 0; j < 2; j++) { + GraphElem sum = 0; + for (int k = 0; k < 2; k++) { + sum += a[i*2+k]*b[k*2+j]; + } + c[i*2+j] = sum; + } + } + } + + // x *= y + void matop_2x2(GraphElem x[], GraphElem y[]) + { + GraphElem tmp[4]; + matmat_2x2(tmp, x, y); + memcpy(x, tmp, sizeof(GraphElem[4])); + } + + // find kth power of a 2x2 matrix + void mat_power(GraphElem mat[], GraphElem k) + { + GraphElem tmp[4]; + memcpy(tmp, mat, sizeof(GraphElem[4])); + + // mat-mat multiply k times + for (GraphElem p = 0; p < k-1; p++) + matop_2x2(mat, tmp); + } + + // parallel prefix for matrix-matrix operation + // `x0 is the very first random number in the series + // `ab is a 2-length array which stores a and b + // `n_ is (n/p) + // `rnums is n_ length array which stores the random nums for a process + void parallel_prefix_op() + { + GraphElem global_op[4]; + global_op[0] = ALCG; + global_op[1] = 0; + global_op[2] = BLCG; + global_op[3] = 1; + + mat_power(global_op, n_); // M^(n/p) + GraphElem prefix_op[4] = {1,0,0,1}; // I in row-major + + GraphElem global_op_recv[4]; + + int steps = (int)(log2((double)nprocs_)); + + for (int s = 0; s < steps; s++) { + + int mate = rank_^(1 << s); // toggle the sth LSB to find my neighbor + + // send/recv global to/from mate + MPI_Sendrecv(global_op, 4, MPI_GRAPH_TYPE, mate, SR_LCG_TAG, + global_op_recv, 4, MPI_GRAPH_TYPE, mate, SR_LCG_TAG, + comm_, MPI_STATUS_IGNORE); + + matop_2x2(global_op, global_op_recv); + + if (mate < rank_) + matop_2x2(prefix_op, global_op_recv); + + MPI_Barrier(comm_); + } + + // populate the first random number entry for each process + // (x0*a + b)%P + if (rank_ == 0) + rnums_[0] = x0_; + else + rnums_[0] = (x0_*prefix_op[0] + prefix_op[2])%MLCG; + } + + // generate random number based on the first + // random number on a process + // TODO check the 'quick'n dirty generators to + // see if we can avoid the mod + void generate() + { +#if defined(PRINT_LCG_LONG_RANDOM_NUMBERS) + for (int k = 0; k < nprocs_; k++) { + if (k == rank_) { + std::cout << "------------" << std::endl; + std::cout << "Process#" << rank_ << " :" << std::endl; + std::cout << "------------" << std::endl; + std::cout << rnums_[0] << std::endl; + for (GraphElem i = 1; i < n_; i++) { + rnums_[i] = (rnums_[i-1]*ALCG + BLCG)%MLCG; + std::cout << rnums_[i] << std::endl; + } + } + MPI_Barrier(comm_); + } +#else + for (GraphElem i = 1; i < n_; i++) { + rnums_[i] = (rnums_[i-1]*ALCG + BLCG)%MLCG; + } +#endif + GraphWeight mult = 1.0 / (GraphWeight)(1.0 + (GraphWeight)(MLCG-1)); + +#if defined(PRINT_LCG_DOUBLE_RANDOM_NUMBERS) + for (int k = 0; k < nprocs_; k++) { + if (k == rank_) { + std::cout << "------------" << std::endl; + std::cout << "Process#" << rank_ << " :" << std::endl; + std::cout << "------------" << std::endl; + + for (GraphElem i = 0; i < n_; i++) { + drand_[i] = (GraphWeight)((GraphWeight)fabs(rnums_[i]) * mult ); // 0-1 + std::cout << drand_[i] << std::endl; + } + } + MPI_Barrier(comm_); + } +#else + for (GraphElem i = 0; i < n_; i++) + drand_[i] = (GraphWeight)((GraphWeight)fabs(rnums_[i]) * mult); // 0-1 +#endif + } + + // copy from drand_[idx_start] to new_drand, + // rescale the random numbers between lo and hi + void rescale(GraphWeight* new_drand, GraphElem idx_start, GraphWeight const& lo) + { + GraphWeight range = (1.0 / (GraphWeight)nprocs_); + +#if defined(PRINT_LCG_DOUBLE_LOHI_RANDOM_NUMBERS) + for (int k = 0; k < nprocs_; k++) { + if (k == rank_) { + std::cout << "------------" << std::endl; + std::cout << "Process#" << rank_ << " :" << std::endl; + std::cout << "------------" << std::endl; + + for (GraphElem i = idx_start, j = 0; i < n_; i++, j++) { + new_drand[j] = lo + (GraphWeight)(range * drand_[i]); + std::cout << new_drand[j] << std::endl; + } + } + MPI_Barrier(comm_); + } +#else + for (GraphElem i = idx_start, j = 0; i < n_; i++, j++) + new_drand[j] = lo + (GraphWeight)(range * drand_[i]); // lo-hi +#endif + } + + private: + MPI_Comm comm_; + int nprocs_, rank_; + unsigned seed_; + GraphElem n_, x0_; + GraphWeight* drand_; + std::vector rnums_; +}; + +// locks +#ifdef USE_OPENMP_LOCK +#else +#ifdef USE_SPINLOCK +#include +std::atomic_flag lkd_ = ATOMIC_FLAG_INIT; +#else +#include +std::mutex mtx_; +#endif +void lock() { +#ifdef USE_SPINLOCK + while (lkd_.test_and_set(std::memory_order_acquire)) { ; } +#else + mtx_.lock(); +#endif +} +void unlock() { +#ifdef USE_SPINLOCK + lkd_.clear(std::memory_order_release); +#else + mtx_.unlock(); +#endif +} +#endif + +#endif // UTILS