Skip to content

Commit

Permalink
posix: Implement shm reduce using release, gather
Browse files Browse the repository at this point in the history
Intra-node reduce is implemented using release step followed by gather
step. Data movement takes place in gather (bottom-up step) in the tree.
Release (top-down) step is used for acknowledgement. Root notifies the
non-roots that the data was reduced and copied out of its reduce buffer.
Hence, children ranks can reuse the reduce buffer for next reduce call.
There is a reduce shm buffer per rank, as each rank contributes data in
reduce. Each buffer is split into multiple cells, so the copying in of
the next chunk by children can be overlapped with reduce and copy out by
the parent rank for the previous cells (pipelining). Large messages are
split into chunks of cell size each and pipelining is used.
  • Loading branch information
jainsura-intel committed Apr 11, 2019
1 parent e546ffc commit 87bd0e1
Show file tree
Hide file tree
Showing 7 changed files with 173 additions and 4 deletions.
13 changes: 13 additions & 0 deletions src/mpid/ch4/shm/posix/posix_coll.h
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,19 @@ static inline int MPIDI_POSIX_mpi_reduce(const void *sendbuf, void *recvbuf, int
MPIR_Reduce_intra_binomial(sendbuf, recvbuf, count, datatype,
op, root, comm, errflag);
break;
case MPIDI_POSIX_Reduce_intra_release_gather_id:
#ifdef ENABLE_IZEM_ATOMIC
mpi_errno =
MPIDI_POSIX_mpi_reduce_release_gather(sendbuf, recvbuf, count, datatype,
op, root, comm, errflag);
#else
/* else block is needed to keep the compiler happy */
/* release_gather based algorithms cannot be used without izem submodule */
MPIR_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**noizem");
#endif
break;
case MPIDI_POSIX_Reduce_intra_invalid_id:
MPIR_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**noizem");
default:
mpi_errno = MPIR_Reduce_impl(sendbuf, recvbuf, count, datatype, op,
root, comm, errflag);
Expand Down
3 changes: 2 additions & 1 deletion src/mpid/ch4/shm/posix/posix_coll_containers.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ extern const MPIDI_POSIX_coll_algo_container_t MPIDI_POSIX_Bcast_intra_release_g
/* Reduce POSIX containers declaration */
extern const MPIDI_POSIX_coll_algo_container_t MPIDI_POSIX_Reduce_intra_binomial_cnt;
extern const MPIDI_POSIX_coll_algo_container_t MPIDI_POSIX_Reduce_intra_reduce_scatter_gather_cnt;
extern const MPIDI_POSIX_coll_algo_container_t MPIDI_POSIX_Reduce_intra_reduce_invalid_cnt;
extern const MPIDI_POSIX_coll_algo_container_t MPIDI_POSIX_Reduce_intra_invalid_cnt;
extern const MPIDI_POSIX_coll_algo_container_t MPIDI_POSIX_Reduce_intra_release_gather_cnt;

/* Allreduce POSIX containers declaration */
extern const MPIDI_POSIX_coll_algo_container_t MPIDI_POSIX_Allreduce_intra_recursive_doubling_cnt;
Expand Down
4 changes: 4 additions & 0 deletions src/mpid/ch4/shm/posix/posix_coll_globals_default.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ const MPIDI_POSIX_coll_algo_container_t MPIDI_POSIX_Reduce_intra_invalid_cnt = {
.id = MPIDI_POSIX_Reduce_intra_invalid_id
};

const MPIDI_POSIX_coll_algo_container_t MPIDI_POSIX_Reduce_intra_release_gather_cnt = {
.id = MPIDI_POSIX_Reduce_intra_release_gather_id
};

/* Allreduce default POSIX containers initialization*/
const MPIDI_POSIX_coll_algo_container_t MPIDI_POSIX_Allreduce_intra_recursive_doubling_cnt = {
.id = MPIDI_POSIX_Allreduce_intra_recursive_doubling_id
Expand Down
9 changes: 7 additions & 2 deletions src/mpid/ch4/shm/posix/posix_coll_init.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@
scope : MPI_T_SCOPE_ALL_EQ
description : >-
Variable to select algorithm for intra-node reduce
auto - Internal algorithm selection from pt2pt based algorithms
auto - Internal algorithm selection from pt2pt based algorithms
release_gather - Force shm optimized algo using release, gather primitives
(izem submodule should be build and enabled for this)
=== END_MPI_T_CVAR_INFO_BLOCK ===
*/
Expand All @@ -57,7 +59,10 @@ int MPIDI_POSIX_coll_cvars_init(void)
MPIDI_POSIX_Bcast_algo_choice = MPIDI_POSIX_Bcast_intra_auto_id;

/* Reduce */
MPIDI_POSIX_Reduce_algo_choice = MPIDI_POSIX_Reduce_intra_auto_id;
if (0 == strcmp(MPIR_CVAR_REDUCE_POSIX_INTRA_ALGORITHM, "release_gather"))
MPIDI_POSIX_Reduce_algo_choice = MPIDI_POSIX_Reduce_intra_release_gather_id;
else
MPIDI_POSIX_Reduce_algo_choice = MPIDI_POSIX_Reduce_intra_auto_id;

fn_exit:
return mpi_errno;
Expand Down
9 changes: 8 additions & 1 deletion src/mpid/ch4/shm/posix/posix_coll_params.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,18 @@ typedef enum {
MPIDI_POSIX_Reduce_intra_reduce_scatter_gather_id,
MPIDI_POSIX_Reduce_intra_binomial_id,
MPIDI_POSIX_Reduce_intra_auto_id,
MPIDI_POSIX_Reduce_intra_invalid_id
MPIDI_POSIX_Reduce_intra_invalid_id,
MPIDI_POSIX_Reduce_intra_release_gather_id
} MPIDI_POSIX_Reduce_id_t;

typedef union {
/* reserved for parameters related to SHM specific collectives */
struct MPIDI_POSIX_Reduce_release_gather_parameters {
int radix;
int tree_type;
int buffer_size;
int num_buffers;
} posix_reduce_release_gather_parameters;
struct MPIDI_POSIX_Reduce_empty_parameters {
int empty;
} posix_reduce_empty_parameters;
Expand Down
118 changes: 118 additions & 0 deletions src/mpid/ch4/shm/posix/posix_coll_release_gather.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,4 +152,122 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_POSIX_mpi_bcast_release_gather(void *buffer,
goto fn_exit;
}

/* Intra-node reduce is implemented as a release step followed by gather step in release_gather
* framework. The actual data movement happens in gather step. Release step makes sure that
* the shared reduce buffer can be reused for next reduce call. Release gather framework has
* multitple cells in reduce buffer, so that the copying in next cell can be overlapped with
* reduction and copying out of previous cells (pipelining).
*/
MPL_STATIC_INLINE_PREFIX int MPIDI_POSIX_mpi_reduce_release_gather(const void *sendbuf,
void *recvbuf, int count,
MPI_Datatype datatype,
MPI_Op op, int root,
MPIR_Comm * comm_ptr,
MPIR_Errflag_t * errflag)
{
int i, num_chunks, chunk_size_floor, chunk_size_ceil;
int offset = 0, is_contig;
int mpi_errno = MPI_SUCCESS, mpi_errno_ret = MPI_SUCCESS;
MPI_Aint lb, true_extent, extent, type_size;

MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_POSIX_MPI_REDUCE_RELEASE_GATHER);
MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_POSIX_MPI_REDUCE_RELEASE_GATHER);

/* If there is no data, return */
if (count == 0) {
goto fn_exit;
}

if ((MPIR_Comm_size(comm_ptr) == 1) && (sendbuf != MPI_IN_PLACE)) {
/* Simply copy the data from sendbuf to recvbuf if there is only 1 rank and MPI_IN_PLACE
* is not used */
mpi_errno = MPIR_Localcopy(sendbuf, count, datatype, recvbuf, count, datatype);
if (mpi_errno) {
/* for communication errors, just record the error but continue */
*errflag = MPIR_ERR_OTHER;
MPIR_ERR_SET(mpi_errno, *errflag, "**fail");
MPIR_ERR_ADD(mpi_errno_ret, mpi_errno);
}
goto fn_exit;
}

/* Lazy initialization of release_gather specific struct */
mpi_errno =
MPIDI_POSIX_mpi_release_gather_comm_init(comm_ptr,
MPIDI_POSIX_RELEASE_GATHER_OPCODE_REDUCE);
if (mpi_errno) {
/* Fall back to other algo as release_gather algo cannot be used */
mpi_errno =
MPIR_Reduce_impl(sendbuf, recvbuf, count, datatype, op, root, comm_ptr, errflag);
goto fn_exit;
}

MPIR_Type_get_extent_impl(datatype, &lb, &extent);
MPIR_Type_get_true_extent_impl(datatype, &lb, &true_extent);
extent = MPL_MAX(extent, true_extent);

MPIR_Datatype_is_contig(datatype, &is_contig);

if (is_contig) {
MPIR_Datatype_get_size_macro(datatype, type_size);
} else {
MPIR_Pack_size_impl(1, datatype, &type_size);
}

if (sendbuf == MPI_IN_PLACE) {
sendbuf = recvbuf;
}

/* Calculate chunking information, taking max(extent, type_size) handles contiguous and non-contiguous datatypes both */
MPIR_Algo_calculate_pipeline_chunk_info(MPIDI_POSIX_RELEASE_GATHER_REDUCE_CELLSIZE,
MPL_MAX(extent, type_size), count, &num_chunks,
&chunk_size_floor, &chunk_size_ceil);

/* Print chunking information */
MPL_DBG_MSG_FMT(MPIR_DBG_COLL, VERBOSE, (MPL_DBG_FDEST,
"Reduce shmgr pipeline info: segsize=%d count=%d num_chunks=%d chunk_size_floor=%d chunk_size_ceil=%d \n",
MPIDI_POSIX_RELEASE_GATHER_REDUCE_CELLSIZE, count,
num_chunks, chunk_size_floor, chunk_size_ceil));

/* Do pipelined release-gather */
for (i = 0; i < num_chunks; i++) {
int chunk_count = (i == 0) ? chunk_size_floor : chunk_size_ceil;

mpi_errno =
MPIDI_POSIX_mpi_release_gather_release(NULL, 0, MPI_DATATYPE_NULL, root,
comm_ptr, errflag,
MPIDI_POSIX_RELEASE_GATHER_OPCODE_REDUCE);
if (mpi_errno) {
/* for communication errors, just record the error but continue */
*errflag =
MPIX_ERR_PROC_FAILED ==
MPIR_ERR_GET_CLASS(mpi_errno) ? MPIR_ERR_PROC_FAILED : MPIR_ERR_OTHER;
MPIR_ERR_SET(mpi_errno, *errflag, "**fail");
MPIR_ERR_ADD(mpi_errno_ret, mpi_errno);
}

mpi_errno =
MPIDI_POSIX_mpi_release_gather_gather((char *) sendbuf + offset * extent,
(char *) recvbuf + offset * extent,
chunk_count, datatype, op, root, comm_ptr,
errflag,
MPIDI_POSIX_RELEASE_GATHER_OPCODE_REDUCE);
if (mpi_errno) {
/* for communication errors, just record the error but continue */
*errflag =
MPIX_ERR_PROC_FAILED ==
MPIR_ERR_GET_CLASS(mpi_errno) ? MPIR_ERR_PROC_FAILED : MPIR_ERR_OTHER;
MPIR_ERR_SET(mpi_errno, *errflag, "**fail");
MPIR_ERR_ADD(mpi_errno_ret, mpi_errno);
}
offset += chunk_count;
}

fn_exit:
MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_POSIX_MPI_REDUCE_RELEASE_GATHER);
return mpi_errno;
fn_fail:
goto fn_exit;
}

#endif /* POSIX_COLL_RELEASE_GATHER_H_INCLUDED */
21 changes: 21 additions & 0 deletions src/mpid/ch4/shm/posix/posix_coll_select.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,27 @@ MPIDI_POSIX_coll_algo_container_t *MPIDI_POSIX_Reduce_select(const void *sendbuf
MPI_Aint type_size = 0;
int pof2 = 0;

if (MPIDI_POSIX_Reduce_algo_choice == MPIDI_POSIX_Reduce_intra_release_gather_id &&
MPIR_Op_is_commutative(op)) {
/* release_gather based algorithm can be used only if izem submodule is built (and enabled)
* and MPICH is not multi-threaded. Also when the op is commutative */
#ifdef ENABLE_IZEM_ATOMIC
#ifdef MPICH_IS_THREADED
if (!MPIR_ThreadInfo.isThreaded) {
/* MPICH configured with threading support but not actually used */
return &MPIDI_POSIX_Reduce_intra_release_gather_cnt;
}
#else
/* MPICH not configured with threading support */
return &MPIDI_POSIX_Reduce_intra_release_gather_cnt;
#endif /* MPICH_IS_THREADED */
#else
/* release_gather algo is chosen through CVAR but izem is not built */
return &MPIDI_POSIX_Reduce_intra_invalid_cnt;
#endif /* ENABLE_IZEM_ATOMIC */
}

/* Choose from pt2pt based algorithms */
MPIR_Datatype_get_size_macro(datatype, type_size);
pof2 = comm->pof2;
if ((count * type_size > MPIR_CVAR_REDUCE_SHORT_MSG_SIZE) &&
Expand Down

0 comments on commit 87bd0e1

Please sign in to comment.