Skip to content

Commit

Permalink
Merge pull request #4 from mendygral/feature/dl-congestor
Browse files Browse the repository at this point in the history
adding in a DL training congestor based off the Rice Data Science 201…
  • Loading branch information
Peter Mendygral authored Apr 6, 2020
2 parents 03aa329 + a45f1be commit 52e4506
Show file tree
Hide file tree
Showing 6 changed files with 239 additions and 130 deletions.
18 changes: 12 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

This benchmark suite consists of two applications:

network_test: Full system network tests in random and natural ring, alltoall
network_test: Full system network tests in random and natural ring, alltoall
and allreduce

network_load_test: Select full system network tests run with four congestors to
Expand Down Expand Up @@ -45,15 +45,15 @@ or

aprun -n 2304 -N 36 ./network_load_test

Each application has no arguments.
Each application has no arguments.

# Benchmarking Practices

GPCNeT applications should be run at full system scale, in particular
network_load_test. network_test can be run at any scale above 2 nodes to
measure the capability of a network for complex communication patterns.

network_load_test should not be run at much less than full system scale
network_load_test should not be run at much less than full system scale
(ie, run on at least 95% of system nodes). The results
will likely not be representative if the network has significant head room. Additionally,
the spirit of this benchmark is that it is run with default network and MPI configuration
Expand All @@ -62,8 +62,8 @@ network and MPI configuration is used, the baseline performance for communicatio
inspected with network_test prior to measuring congestion impacts with network_load_test.

The primary tuning parameter users can use is the number of processes per NIC (PPN).
We refer to process per NIC (rather than process per node) because modern nodes span a wide range of capabilities.
Consider a dual socket node with 2 NICS and 6 GPUs vs a single socket single NIC node.
We refer to process per NIC (rather than process per node) because modern nodes span a wide range of capabilities.
Consider a dual socket node with 2 NICS and 6 GPUs vs a single socket single NIC node.
The number of NICs is a reasonable proxy for expected communication capability.
The higher the PPN the more the benchmark will push the network. For the network_test,
higher PPN will push bandwidth per NIC (note the benchmark reports bandwidth per rank)
Expand Down Expand Up @@ -95,7 +95,7 @@ congestors can be lessened by reducing the number of processes per NIC or
modifying the message sizes of congestors.

Tuning of message sizes and loop counts is done with the defs at the beginning of
network_test.c or network_load_test.c. For example, to modify the message size of
network_test.c or network_load_test.c. For example, to modify the message size of
of the one-sided incast look for this line in network_load_test.c

#define INCAST_MSG_COUNT 512
Expand All @@ -109,3 +109,9 @@ Please contact any of the following people if you have any questions.
* Taylor Groves ([email protected])
* Sudheer Chunduri ([email protected])
* Pete Mendygral ([email protected])

# ChangeLog #

4/3/2020: Adding in a new congestor type that mimics the allreduce operations for DL training. It is based on
the paper presented on at 2019 Rice Data Science Conference
https://2019datascienceconference.sched.com/event/UYuZ/sharing-resources-in-the-age-of-deep-learning
83 changes: 53 additions & 30 deletions congestors.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,23 @@
int p2p_incast_congestor(CommConfig_t *config, MPI_Comm comm, int myrank, int comm_ranks)
{
int i;
MPI_Request *request_list = NULL;
MPI_Request *request_list = NULL;
request_list = malloc(sizeof(MPI_Request) * comm_ranks);
if (request_list == NULL) {
die("Failed to allocate request_list in p2p_incast_congestor()\n");
}

if(myrank == 0) {
if(myrank == 0) {

for(i=1; i < comm_ranks; i++) {
mpi_error(MPI_Irecv(&config->a2a_rbuffer[i * config->incast_cnt], config->incast_cnt,
MPI_DOUBLE, i, 987, comm, &request_list[i-1]));
}
mpi_error(MPI_Irecv(&config->a2a_rbuffer[i * config->incast_cnt], config->incast_cnt,
MPI_DOUBLE, i, 987, comm, &request_list[i-1]));
}
mpi_error(MPI_Waitall(comm_ranks-1, request_list, MPI_STATUS_IGNORE));

} else {

mpi_error(MPI_Send(config->a2a_sbuffer, config->incast_cnt, MPI_DOUBLE, 0, 987, comm));
mpi_error(MPI_Send(config->a2a_sbuffer, config->incast_cnt, MPI_DOUBLE, 0, 987, comm));

}
free(request_list);
Expand All @@ -49,22 +49,22 @@ int p2p_incast_congestor(CommConfig_t *config, MPI_Comm comm, int myrank, int co
int p2p_bcast_congestor(CommConfig_t *config, MPI_Comm comm, int myrank, int comm_ranks)
{
int i;
MPI_Request *request_list = NULL;
MPI_Request *request_list = NULL;
request_list = malloc(sizeof(MPI_Request) * comm_ranks);
if (request_list == NULL) {
die("Failed to allocate request_list in p2p_bcast_congestor()\n");
}

if(myrank == 0) {
if(myrank == 0) {

for(i=1; i < comm_ranks; i++) {
mpi_error(MPI_Isend(config->p2p_buffer, config->bcast_cnt, MPI_DOUBLE, i, 987, comm, &request_list[i-1]));
}
mpi_error(MPI_Isend(config->p2p_buffer, config->bcast_cnt, MPI_DOUBLE, i, 987, comm, &request_list[i-1]));
}
mpi_error(MPI_Waitall(comm_ranks-1, request_list, MPI_STATUS_IGNORE));

} else {

mpi_error(MPI_Recv(config->p2p_buffer, config->bcast_cnt, MPI_DOUBLE, 0, 987, comm, MPI_STATUS_IGNORE));
mpi_error(MPI_Recv(config->p2p_buffer, config->bcast_cnt, MPI_DOUBLE, 0, 987, comm, MPI_STATUS_IGNORE));

}
free(request_list);
Expand All @@ -76,7 +76,7 @@ int a2a_congestor(CommConfig_t *config, MPI_Comm comm, int myrank, int comm_rank
{
int i, pof2, src, dst;
i = 1;

/* comm_size a power-of-two? */
while (i < comm_ranks)
i *= 2;
Expand All @@ -87,28 +87,36 @@ int a2a_congestor(CommConfig_t *config, MPI_Comm comm, int myrank, int comm_rank

/* do the pairwise exchanges */
for(i = 0; i < comm_ranks; i++) {

if (pof2 == 1) {
/* use exclusive-or algorithm */
src = dst = myrank ^ i;
} else {
src = (myrank - i + comm_ranks) % comm_ranks;
dst = (myrank + i) % comm_ranks;
}
mpi_error(MPI_Sendrecv(&config->a2a_sbuffer[i * config->a2a_cnt], config->a2a_cnt, MPI_DOUBLE,
dst, 987, &config->a2a_rbuffer[i * config->a2a_cnt], config->a2a_cnt, MPI_DOUBLE,

mpi_error(MPI_Sendrecv(&config->a2a_sbuffer[i * config->a2a_cnt], config->a2a_cnt, MPI_DOUBLE,
dst, 987, &config->a2a_rbuffer[i * config->a2a_cnt], config->a2a_cnt, MPI_DOUBLE,
src, 987, comm, MPI_STATUS_IGNORE));
}

return 0;
}

int allreduce_congestor(CommConfig_t *config, MPI_Comm comm, int myrank, int comm_ranks)
{
mpi_error(MPI_Allreduce(config->ar_sbuffer, config->ar_rbuffer, config->ar_cnt,
MPI_DOUBLE, MPI_SUM, comm));

return 0;
}

int rma_incast_congestor(CommConfig_t *config, MPI_Comm comm, int myrank, int comm_ranks)
{
if (myrank != 0) {
mpi_error(MPI_Put(&config->rma_a2a_buffer[0], config->incast_cnt, MPI_DOUBLE, 0,
if (myrank != 0) {

mpi_error(MPI_Put(&config->rma_a2a_buffer[0], config->incast_cnt, MPI_DOUBLE, 0,
(MPI_Aint)(myrank * config->incast_cnt), config->incast_cnt, MPI_DOUBLE, config->rma_a2a_window));
mpi_error(MPI_Win_flush(0, config->rma_a2a_window));

Expand All @@ -119,9 +127,9 @@ int rma_incast_congestor(CommConfig_t *config, MPI_Comm comm, int myrank, int co

int rma_bcast_congestor(CommConfig_t *config, MPI_Comm comm, int myrank, int comm_ranks)
{
if (myrank != 0) {
if (myrank != 0) {

mpi_error(MPI_Get(&config->rma_buffer[0], config->bcast_cnt, MPI_DOUBLE, 0, 0,
mpi_error(MPI_Get(&config->rma_buffer[0], config->bcast_cnt, MPI_DOUBLE, 0, 0,
config->bcast_cnt, MPI_DOUBLE, config->rma_window));
mpi_error(MPI_Win_flush(0, config->rma_window));

Expand All @@ -130,7 +138,7 @@ int rma_bcast_congestor(CommConfig_t *config, MPI_Comm comm, int myrank, int com
return 0;
}

int congestor(CommConfig_t *config, int n_measurements, int niters, MPI_Comm test_comm, CommTest_t req_test,
int congestor(CommConfig_t *config, int n_measurements, int niters, MPI_Comm test_comm, CommTest_t req_test,
int record_perf, double * perfvals, double * perfval, int *real_n_measurements)
{
int i, m, test_myrank, test_nranks;
Expand Down Expand Up @@ -161,7 +169,7 @@ int congestor(CommConfig_t *config, int n_measurements, int niters, MPI_Comm tes
timeout = MPI_Wtime() - timeout_t1;
mpi_error(MPI_Iallreduce(MPI_IN_PLACE, &timeout, 1, MPI_DOUBLE, MPI_MAX, test_comm, &req));

if (record_perf) mpi_error(MPI_Barrier(test_comm));
if (record_perf) mpi_error(MPI_Barrier(test_comm));
for (i = -1; i < niters; i++) {
if (i == 0) bt1 = MPI_Wtime();
if (i >= 0) clock_gettime(CLOCK_MONOTONIC, &t1);
Expand All @@ -170,6 +178,9 @@ int congestor(CommConfig_t *config, int n_measurements, int niters, MPI_Comm tes
case A2A_CONGESTOR:
a2a_congestor(config, test_comm, test_myrank, test_nranks);
break;
case ALLREDUCE_CONGESTOR:
allreduce_congestor(config, test_comm, test_myrank, test_nranks);
break;
case P2P_INCAST_CONGESTOR:
p2p_incast_congestor(config, test_comm, test_myrank, test_nranks);
break;
Expand Down Expand Up @@ -208,11 +219,23 @@ int congestor(CommConfig_t *config, int n_measurements, int niters, MPI_Comm tes
for (i = 0; i < *real_n_measurements*niters; i++) {
perfvals[i] = (double)(sizeof(double) * config->a2a_cnt * (test_nranks-1)) / (perfvals[i] * 1024. * 1024.);
}
perfval[0] = (double)(sizeof(double) * config->a2a_cnt * *real_n_measurements*niters *
perfval[0] = (double)(sizeof(double) * config->a2a_cnt * *real_n_measurements*niters *
(test_nranks-1)) / (bt * 1024. * 1024.);

} else if (req_test == ALLREDUCE_CONGESTOR) {

/* we report uni-directional BW in MiB/s/rank. we assume an algorithm,
like recursive halving/recursive doubling, that has each rank send 2X the msglen in the limit
of large N_ranks.
*/
for (i = 0; i < *real_n_measurements*niters; i++) {
perfvals[i] = (double)(sizeof(double) * config->ar_cnt * 2.) / (perfvals[i] * 1024. * 1024.);
}
perfval[0] = (double)(sizeof(double) * config->ar_cnt * *real_n_measurements*niters * 2.) /
(bt * 1024. * 1024.);

} else if (req_test == P2P_INCAST_CONGESTOR || req_test == RMA_INCAST_CONGESTOR) {

/* we report uni-directional BW in MiB/s/rank */
if (test_myrank == 0) {
for (i = 0; i < *real_n_measurements*niters; i++) {
Expand All @@ -223,12 +246,12 @@ int congestor(CommConfig_t *config, int n_measurements, int niters, MPI_Comm tes
for (i = 0; i < *real_n_measurements*niters; i++) {
perfvals[i] = (double)(sizeof(double) * config->incast_cnt) / (perfvals[i] * 1024. * 1024.);
}
perfval[0] = (double)(sizeof(double) * config->incast_cnt * *real_n_measurements*niters) /
perfval[0] = (double)(sizeof(double) * config->incast_cnt * *real_n_measurements*niters) /
(bt * 1024. * 1024.);
}

} else if (req_test == P2P_BCAST_CONGESTOR || req_test == RMA_BCAST_CONGESTOR) {

/* we report uni-directional BW in MiB/s/rank */
if (test_myrank == 0) {
for (i = 0; i < *real_n_measurements*niters; i++) {
Expand All @@ -239,7 +262,7 @@ int congestor(CommConfig_t *config, int n_measurements, int niters, MPI_Comm tes
for (i = 0; i < *real_n_measurements*niters; i++) {
perfvals[i] = (double)(sizeof(double) * config->bcast_cnt) / (perfvals[i] * 1024. * 1024.);
}
perfval[0] = (double)(sizeof(double) * config->bcast_cnt * *real_n_measurements*niters) /
perfval[0] = (double)(sizeof(double) * config->bcast_cnt * *real_n_measurements*niters) /
(bt * 1024. * 1024.);
}

Expand Down
Loading

0 comments on commit 52e4506

Please sign in to comment.