Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spc alltoall #12

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions contrib/spc-a2a-looper/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
Example SPC Alltoall Looper
---------------------------

Simple example that loops over `MPI_Alltoall()`. It tests
the `OMPI_SPC_TIME_ALLTOALL` counter. The test calculates
the diff per-rank at the App level and shows this info each
loop iteration (at all ranks).

The counter acculates and shows the full time for all Alltoall,
but the per-rank view shows the diff per loop.

Pre-reqs
--------
- Patch with `OMPI_SPC_TIME_ALLTOALL` counter
- OMPI build with `--enable-spc`

Usage
-----

```sh
mpirun -np $nprocs ./a2a_looper [N]

# (optional) arg1 - positive-integer for number of loops
```

Example
-------

Run for just 9 loops:

```sh
mpirun \
-np 4 \
--mca mpi_spc_attach OMPI_SPC_TIME_ALLTOALL \
--mca mpi_spc_dump_enabled true \
./a2a_looper 9
```

Notes
-----
- Less than 10 will print each loop, and
above that will print at each interval of 10 loops.

- Initial SPC code bits adapted from `ompi/examples/spc_example.c`

193 changes: 193 additions & 0 deletions contrib/spc-a2a-looper/a2a_looper.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
/*
* Tue Nov 29 2022 Thomas Naughton <[email protected]>
*
* Loops over MPI_Alltoall() 'MAX_NLOOP' times.
*
* Usage: mpirun -np $nprocs ./a2a_looper [N]
*
* Optional position-sensitive argument:
* arg1 - positive-integer for number of loops
*
* If no args are provided the program uses default values.
*
* Note: Initial SPC code bits adapted from 'ompi/examples/spc_example.c'
*
* TJN: Modified to have only one counter (OMPI_SPC_TIME_ALLTOALL),
* also we calculate the diff per-rank at the App level and show
* this info each run (at all ranks).
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <math.h>
#include <mpi.h>

int MAX_NLOOP = 100;

int main (int argc, char **argv)
{
int rank, size;
int *inbuf = NULL;
int *outbuf = NULL;
int i, j;
int nloop;

int rc;
int provided, num, name_len, desc_len, verbosity, bind, var_class, readonly, continuous, atomic, count, index;
char name[256], description[256];
MPI_Datatype datatype;
MPI_T_enum enumtype;
long long value;
int found = 0;
int num_elem = 1024;
long long _time_alltoall_past_value = 0;

if (argc > 1) {
MAX_NLOOP = atoi(argv[1]);
}

MPI_Init (&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size);

/* Counter names to be read by ranks 0 and 1 */
/* (See also: ompi_spc_counters_t for list) */
char *counter_name = "runtime_spc_OMPI_SPC_TIME_ALLTOALL";
MPI_T_pvar_handle handle;
MPI_T_pvar_session session;

MPI_T_init_thread(MPI_THREAD_SINGLE, &provided);

/* Determine the MPI_T pvar indices for the OMPI_BYTES_SENT/RECIEVED_USER SPCs */
MPI_T_pvar_get_num(&num);

rc = MPI_T_pvar_session_create(&session);

for(i = 0; i < num; i++) {
name_len = desc_len = 256;
rc = PMPI_T_pvar_get_info(i, name, &name_len, &verbosity,
&var_class, &datatype, &enumtype, description, &desc_len, &bind,
&readonly, &continuous, &atomic);
if( MPI_SUCCESS != rc )
continue;

if(strcmp(name, counter_name) == 0) {
/* Create the MPI_T sessions/handles for the counters and start the counters */
rc = MPI_T_pvar_handle_alloc(session, i, NULL, &handle, &count);
rc = MPI_T_pvar_start(session, handle);
found = 1;
//printf("[%d] =====================================\n", rank);
//printf("[%d] %s -> %s\n", rank, name, description);
//printf("[%d] =====================================\n", rank);
//fflush(stdout);
}
}

/* Make sure we found the counters */
if(found == 0) {
fprintf(stderr, "ERROR: Couldn't find the appropriate SPC counter in the MPI_T pvars.\n");
MPI_Abort(MPI_COMM_WORLD, -1);
}

inbuf = (int *) malloc ( size * num_elem * sizeof(int) );
if (NULL == inbuf) {
fprintf(stderr, "Error: malloc failed (inbuf)\n");
goto cleanup;
}

outbuf = (int *) malloc ( size * num_elem * sizeof(int) );
if (NULL == outbuf) {
fprintf(stderr, "Error: malloc failed (outbuf)\n");
goto cleanup;
}

for (i=0; i < size * num_elem; i++) {
inbuf[i] = 100 + rank;
outbuf[i] = 0;
}

MPI_Barrier(MPI_COMM_WORLD);

MPI_Barrier(MPI_COMM_WORLD);

for (nloop=0; nloop < MAX_NLOOP; nloop++) {
long long tmp_max;
int global_rc;
long long new_value = 0;
long long diff = 0;

MPI_Barrier(MPI_COMM_WORLD);
fflush(NULL);

rc = MPI_Alltoall(inbuf, num_elem, MPI_INT, outbuf, num_elem, MPI_INT, MPI_COMM_WORLD);

/* Check if alltoall had any problems? */
MPI_Allreduce( &rc, &global_rc, 1, MPI_INT, MPI_SUM, MPI_COMM_WORLD );
if (rank == 0) {
if (global_rc != 0) {
fprintf(stderr, "Error: Alltoall failed! (rc=%d)\n", global_rc);
goto cleanup;
}
}

MPI_T_pvar_read(session, handle, &value);
MPI_Allreduce(&value, &tmp_max, 1, MPI_LONG_LONG, MPI_MAX, MPI_COMM_WORLD);

rc = ompi_spc_value_diff("OMPI_SPC_TIME_ALLTOALL",
_time_alltoall_past_value,
&new_value,
&diff);


MPI_Barrier(MPI_COMM_WORLD);

if ((MAX_NLOOP <= 20) || ( !(nloop % 10) )) {
//int usecs = 0;
int usecs = 250000; /* 0.25 sec */
//int usecs = 100000; /* 0.1 sec */
//int usecs = 2000000; /* 2 sec */

printf("%12s: Rank: %5d Size: %5d Loop: %8d %s: %lld max: %lld prev_value: %lld new_value: %lld diff: %lld -- SLEEP: %dus\n",
"a2a_looper", rank, size, nloop, counter_name, value, tmp_max, _time_alltoall_past_value, new_value, diff, usecs);
usleep(usecs);
}

_time_alltoall_past_value = new_value;

fflush(NULL);
MPI_Barrier(MPI_COMM_WORLD);
}

MPI_Barrier(MPI_COMM_WORLD);

#if 0
printf("[%d] ==========================\n", rank);
fflush(NULL);

rc = MPI_T_pvar_read(session, handle, &value);
printf("TJN: [%d] Value Read: %lld (%s)\n", rank, value, counter_name);
fflush(stdout);

MPI_Barrier(MPI_COMM_WORLD);
#endif

/* Stop the MPI_T session, free the handle, and then free the session */
rc = MPI_T_pvar_stop(session, handle);
rc = MPI_T_pvar_handle_free(session, &handle);

/* Stop the MPI_T session, free the handle, and then free the session */
rc = MPI_T_pvar_session_free(&session);

cleanup:
if (NULL != inbuf)
free(inbuf);

if (NULL != outbuf)
free(outbuf);

MPI_T_finalize();
MPI_Finalize();

return (0);
}
6 changes: 6 additions & 0 deletions ompi/mpi/c/alltoall.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ int MPI_Alltoall(const void *sendbuf, int sendcount, MPI_Datatype sendtype,
{
int err;
size_t recvtype_size;
opal_timer_t timer = 0; /* SPC */

SPC_RECORD(OMPI_SPC_ALLTOALL, 1);

Expand Down Expand Up @@ -116,10 +117,15 @@ int MPI_Alltoall(const void *sendbuf, int sendcount, MPI_Datatype sendtype,
}
}

SPC_TIMER_START(OMPI_SPC_TIME_ALLTOALL, &timer);

/* Invoke the coll component to perform the back-end operation */
err = comm->c_coll->coll_alltoall(sendbuf, sendcount, sendtype,
recvbuf, recvcount, recvtype,
comm, comm->c_coll->coll_alltoall_module);

SPC_TIMER_STOP(OMPI_SPC_TIME_ALLTOALL, &timer);

OMPI_ERRHANDLER_RETURN(err, comm, err, FUNC_NAME);
}

5 changes: 5 additions & 0 deletions ompi/mpi/c/alltoallv.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ int MPI_Alltoallv(const void *sendbuf, const int sendcounts[],
MPI_Datatype recvtype, MPI_Comm comm)
{
int i, size, err;
opal_timer_t timer = 0; /* SPC */

SPC_RECORD(OMPI_SPC_ALLTOALLV, 1);

Expand Down Expand Up @@ -135,10 +136,14 @@ int MPI_Alltoallv(const void *sendbuf, const int sendcounts[],
}
#endif

SPC_TIMER_START(OMPI_SPC_TIME_ALLTOALLV, &timer);

/* Invoke the coll component to perform the back-end operation */
err = comm->c_coll->coll_alltoallv(sendbuf, sendcounts, sdispls, sendtype,
recvbuf, recvcounts, rdispls, recvtype,
comm, comm->c_coll->coll_alltoallv_module);
SPC_TIMER_STOP(OMPI_SPC_TIME_ALLTOALLV, &timer);

OMPI_ERRHANDLER_RETURN(err, comm, err, FUNC_NAME);
}

69 changes: 69 additions & 0 deletions ompi/runtime/ompi_spc.c
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ static const ompi_spc_event_t ompi_spc_events_desc[OMPI_SPC_NUM_COUNTERS] = {
SET_COUNTER_ARRAY(OMPI_SPC_TESTALL, "The number of times MPI_Testall was called.", false, false),
SET_COUNTER_ARRAY(OMPI_SPC_TESTANY, "The number of times MPI_Testany was called.", false, false),
SET_COUNTER_ARRAY(OMPI_SPC_TESTSOME, "The number of times MPI_Testsome was called.", false, false),
SET_COUNTER_ARRAY(OMPI_SPC_TIME_ALLTOALL, "The number microseconds spent performing the MPI_Alltoall operation. Note: The timer used on the back end is in cycles, which could potentially be problematic on a system where the clock frequency can change. On such a system, this counter could be inaccurate since we assume a fixed clock rate.", false, true),
SET_COUNTER_ARRAY(OMPI_SPC_TIME_ALLTOALLV, "The number microseconds spent performing the MPI_Alltoallv operation. Note: The timer used on the back end is in cycles, which could potentially be problematic on a system where the clock frequency can change. On such a system, this counter could be inaccurate since we assume a fixed clock rate.", false, true),
SET_COUNTER_ARRAY(OMPI_SPC_WAIT, "The number of times MPI_Wait was called.", false, false),
SET_COUNTER_ARRAY(OMPI_SPC_WAITALL, "The number of times MPI_Waitall was called.", false, false),
SET_COUNTER_ARRAY(OMPI_SPC_WAITANY, "The number of times MPI_Waitany was called.", false, false),
Expand Down Expand Up @@ -397,6 +399,73 @@ static void ompi_spc_dump(void)
ompi_spc_comm->c_coll->coll_barrier(ompi_spc_comm, ompi_spc_comm->c_coll->coll_barrier_module);
}


/*
* Helper function for checking diff with given SPC.
*
* Given a specific SPC name and prior value, we
* get the new value and return the difference between
* the prior and new values (diff = new - prev).
* If do not care about the diff you can pass NULL for spc_diff,
* and will simply get the new_value.
*
* Note: The value for timer events are converted to microseconds.
* Note: Any highwater events are reset after being read.
*
* On success, return MPI_SUCCESS, otherwise return -1.
*/
int ompi_spc_value_diff(char *spc_name,
long long prev_value,
long long *cur_value,
long long *diff)
{
int i;
long long value = -1;
int found = 0;

if (NULL == ompi_spc_events) {
//fprintf(stderr, " #-- DBG: WARN: SPC system not setup/available\n");
return -1;
}

/* Find the index of given SPC. */
for(i = 0; i < OMPI_SPC_NUM_COUNTERS; i++) {

/* If this is our requested counter */
if( 0 == strcmp(ompi_spc_events_desc[i].counter_name, spc_name) ) {

value = (long long)ompi_spc_events[i].value;

/* If this is a timer-based counter, convert from cycles to microseconds */
if( ompi_spc_events[i].is_timer_event ) {
value = ompi_spc_cycles_to_usecs_internal(value);
}

/* If this is a high watermark counter, reset it after it has been read */
if( ompi_spc_events[i].is_high_watermark) {
ompi_spc_events[i].value = 0;
}

found = 1;
break;
}
}

if (found != 1) {
fprintf(stderr, "Error: Failed to find SPC counter '%s'\n", spc_name);
return -1;
}

*cur_value = value;

if (NULL != diff) {
*diff = value - prev_value;
}

return MPI_SUCCESS;
}


/* Frees any dynamically allocated OMPI SPC data structures */
void ompi_spc_fini(void)
{
Expand Down
Loading