From 87bd0e131635d6ccc29b65242d83ef87a507e8c4 Mon Sep 17 00:00:00 2001 From: Surabhi Jain Date: Tue, 12 Jun 2018 11:36:59 -0500 Subject: [PATCH] posix: Implement shm reduce using release, gather Intra-node reduce is implemented using release step followed by gather step. Data movement takes place in gather (bottom-up step) in the tree. Release (top-down) step is used for acknowledgement. Root notifies the non-roots that the data was reduced and copied out of its reduce buffer. Hence, children ranks can reuse the reduce buffer for next reduce call. There is a reduce shm buffer per rank, as each rank contributes data in reduce. Each buffer is split into multiple cells, so the copying in of the next chunk by children can be overlapped with reduce and copy out by the parent rank for the previous cells (pipelining). Large messages are split into chunks of cell size each and pipelining is used. --- src/mpid/ch4/shm/posix/posix_coll.h | 13 ++ .../ch4/shm/posix/posix_coll_containers.h | 3 +- .../shm/posix/posix_coll_globals_default.c | 4 + src/mpid/ch4/shm/posix/posix_coll_init.c | 9 +- src/mpid/ch4/shm/posix/posix_coll_params.h | 9 +- .../ch4/shm/posix/posix_coll_release_gather.h | 118 ++++++++++++++++++ src/mpid/ch4/shm/posix/posix_coll_select.h | 21 ++++ 7 files changed, 173 insertions(+), 4 deletions(-) diff --git a/src/mpid/ch4/shm/posix/posix_coll.h b/src/mpid/ch4/shm/posix/posix_coll.h index 660ddd1fec5..4cab059693b 100644 --- a/src/mpid/ch4/shm/posix/posix_coll.h +++ b/src/mpid/ch4/shm/posix/posix_coll.h @@ -592,6 +592,19 @@ static inline int MPIDI_POSIX_mpi_reduce(const void *sendbuf, void *recvbuf, int MPIR_Reduce_intra_binomial(sendbuf, recvbuf, count, datatype, op, root, comm, errflag); break; + case MPIDI_POSIX_Reduce_intra_release_gather_id: +#ifdef ENABLE_IZEM_ATOMIC + mpi_errno = + MPIDI_POSIX_mpi_reduce_release_gather(sendbuf, recvbuf, count, datatype, + op, root, comm, errflag); +#else + /* else block is needed to keep the compiler happy */ + /* release_gather based algorithms cannot be used without izem submodule */ + MPIR_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**noizem"); +#endif + break; + case MPIDI_POSIX_Reduce_intra_invalid_id: + MPIR_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**noizem"); default: mpi_errno = MPIR_Reduce_impl(sendbuf, recvbuf, count, datatype, op, root, comm, errflag); diff --git a/src/mpid/ch4/shm/posix/posix_coll_containers.h b/src/mpid/ch4/shm/posix/posix_coll_containers.h index 3a0be269b9a..f335566d5c0 100644 --- a/src/mpid/ch4/shm/posix/posix_coll_containers.h +++ b/src/mpid/ch4/shm/posix/posix_coll_containers.h @@ -15,7 +15,8 @@ extern const MPIDI_POSIX_coll_algo_container_t MPIDI_POSIX_Bcast_intra_release_g /* Reduce POSIX containers declaration */ extern const MPIDI_POSIX_coll_algo_container_t MPIDI_POSIX_Reduce_intra_binomial_cnt; extern const MPIDI_POSIX_coll_algo_container_t MPIDI_POSIX_Reduce_intra_reduce_scatter_gather_cnt; -extern const MPIDI_POSIX_coll_algo_container_t MPIDI_POSIX_Reduce_intra_reduce_invalid_cnt; +extern const MPIDI_POSIX_coll_algo_container_t MPIDI_POSIX_Reduce_intra_invalid_cnt; +extern const MPIDI_POSIX_coll_algo_container_t MPIDI_POSIX_Reduce_intra_release_gather_cnt; /* Allreduce POSIX containers declaration */ extern const MPIDI_POSIX_coll_algo_container_t MPIDI_POSIX_Allreduce_intra_recursive_doubling_cnt; diff --git a/src/mpid/ch4/shm/posix/posix_coll_globals_default.c b/src/mpid/ch4/shm/posix/posix_coll_globals_default.c index c4fe3244134..ab6f3ad95cd 100644 --- a/src/mpid/ch4/shm/posix/posix_coll_globals_default.c +++ b/src/mpid/ch4/shm/posix/posix_coll_globals_default.c @@ -43,6 +43,10 @@ const MPIDI_POSIX_coll_algo_container_t MPIDI_POSIX_Reduce_intra_invalid_cnt = { .id = MPIDI_POSIX_Reduce_intra_invalid_id }; +const MPIDI_POSIX_coll_algo_container_t MPIDI_POSIX_Reduce_intra_release_gather_cnt = { + .id = MPIDI_POSIX_Reduce_intra_release_gather_id +}; + /* Allreduce default POSIX containers initialization*/ const MPIDI_POSIX_coll_algo_container_t MPIDI_POSIX_Allreduce_intra_recursive_doubling_cnt = { .id = MPIDI_POSIX_Allreduce_intra_recursive_doubling_id diff --git a/src/mpid/ch4/shm/posix/posix_coll_init.c b/src/mpid/ch4/shm/posix/posix_coll_init.c index 857803c6290..080d9723d4f 100644 --- a/src/mpid/ch4/shm/posix/posix_coll_init.c +++ b/src/mpid/ch4/shm/posix/posix_coll_init.c @@ -38,7 +38,9 @@ scope : MPI_T_SCOPE_ALL_EQ description : >- Variable to select algorithm for intra-node reduce - auto - Internal algorithm selection from pt2pt based algorithms + auto - Internal algorithm selection from pt2pt based algorithms + release_gather - Force shm optimized algo using release, gather primitives + (izem submodule should be build and enabled for this) === END_MPI_T_CVAR_INFO_BLOCK === */ @@ -57,7 +59,10 @@ int MPIDI_POSIX_coll_cvars_init(void) MPIDI_POSIX_Bcast_algo_choice = MPIDI_POSIX_Bcast_intra_auto_id; /* Reduce */ - MPIDI_POSIX_Reduce_algo_choice = MPIDI_POSIX_Reduce_intra_auto_id; + if (0 == strcmp(MPIR_CVAR_REDUCE_POSIX_INTRA_ALGORITHM, "release_gather")) + MPIDI_POSIX_Reduce_algo_choice = MPIDI_POSIX_Reduce_intra_release_gather_id; + else + MPIDI_POSIX_Reduce_algo_choice = MPIDI_POSIX_Reduce_intra_auto_id; fn_exit: return mpi_errno; diff --git a/src/mpid/ch4/shm/posix/posix_coll_params.h b/src/mpid/ch4/shm/posix/posix_coll_params.h index 3cbbffdaa97..bbf08fdd377 100644 --- a/src/mpid/ch4/shm/posix/posix_coll_params.h +++ b/src/mpid/ch4/shm/posix/posix_coll_params.h @@ -44,11 +44,18 @@ typedef enum { MPIDI_POSIX_Reduce_intra_reduce_scatter_gather_id, MPIDI_POSIX_Reduce_intra_binomial_id, MPIDI_POSIX_Reduce_intra_auto_id, - MPIDI_POSIX_Reduce_intra_invalid_id + MPIDI_POSIX_Reduce_intra_invalid_id, + MPIDI_POSIX_Reduce_intra_release_gather_id } MPIDI_POSIX_Reduce_id_t; typedef union { /* reserved for parameters related to SHM specific collectives */ + struct MPIDI_POSIX_Reduce_release_gather_parameters { + int radix; + int tree_type; + int buffer_size; + int num_buffers; + } posix_reduce_release_gather_parameters; struct MPIDI_POSIX_Reduce_empty_parameters { int empty; } posix_reduce_empty_parameters; diff --git a/src/mpid/ch4/shm/posix/posix_coll_release_gather.h b/src/mpid/ch4/shm/posix/posix_coll_release_gather.h index ae4a85bea07..bb49b786409 100644 --- a/src/mpid/ch4/shm/posix/posix_coll_release_gather.h +++ b/src/mpid/ch4/shm/posix/posix_coll_release_gather.h @@ -152,4 +152,122 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_POSIX_mpi_bcast_release_gather(void *buffer, goto fn_exit; } +/* Intra-node reduce is implemented as a release step followed by gather step in release_gather + * framework. The actual data movement happens in gather step. Release step makes sure that + * the shared reduce buffer can be reused for next reduce call. Release gather framework has + * multitple cells in reduce buffer, so that the copying in next cell can be overlapped with + * reduction and copying out of previous cells (pipelining). + */ +MPL_STATIC_INLINE_PREFIX int MPIDI_POSIX_mpi_reduce_release_gather(const void *sendbuf, + void *recvbuf, int count, + MPI_Datatype datatype, + MPI_Op op, int root, + MPIR_Comm * comm_ptr, + MPIR_Errflag_t * errflag) +{ + int i, num_chunks, chunk_size_floor, chunk_size_ceil; + int offset = 0, is_contig; + int mpi_errno = MPI_SUCCESS, mpi_errno_ret = MPI_SUCCESS; + MPI_Aint lb, true_extent, extent, type_size; + + MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_POSIX_MPI_REDUCE_RELEASE_GATHER); + MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_POSIX_MPI_REDUCE_RELEASE_GATHER); + + /* If there is no data, return */ + if (count == 0) { + goto fn_exit; + } + + if ((MPIR_Comm_size(comm_ptr) == 1) && (sendbuf != MPI_IN_PLACE)) { + /* Simply copy the data from sendbuf to recvbuf if there is only 1 rank and MPI_IN_PLACE + * is not used */ + mpi_errno = MPIR_Localcopy(sendbuf, count, datatype, recvbuf, count, datatype); + if (mpi_errno) { + /* for communication errors, just record the error but continue */ + *errflag = MPIR_ERR_OTHER; + MPIR_ERR_SET(mpi_errno, *errflag, "**fail"); + MPIR_ERR_ADD(mpi_errno_ret, mpi_errno); + } + goto fn_exit; + } + + /* Lazy initialization of release_gather specific struct */ + mpi_errno = + MPIDI_POSIX_mpi_release_gather_comm_init(comm_ptr, + MPIDI_POSIX_RELEASE_GATHER_OPCODE_REDUCE); + if (mpi_errno) { + /* Fall back to other algo as release_gather algo cannot be used */ + mpi_errno = + MPIR_Reduce_impl(sendbuf, recvbuf, count, datatype, op, root, comm_ptr, errflag); + goto fn_exit; + } + + MPIR_Type_get_extent_impl(datatype, &lb, &extent); + MPIR_Type_get_true_extent_impl(datatype, &lb, &true_extent); + extent = MPL_MAX(extent, true_extent); + + MPIR_Datatype_is_contig(datatype, &is_contig); + + if (is_contig) { + MPIR_Datatype_get_size_macro(datatype, type_size); + } else { + MPIR_Pack_size_impl(1, datatype, &type_size); + } + + if (sendbuf == MPI_IN_PLACE) { + sendbuf = recvbuf; + } + + /* Calculate chunking information, taking max(extent, type_size) handles contiguous and non-contiguous datatypes both */ + MPIR_Algo_calculate_pipeline_chunk_info(MPIDI_POSIX_RELEASE_GATHER_REDUCE_CELLSIZE, + MPL_MAX(extent, type_size), count, &num_chunks, + &chunk_size_floor, &chunk_size_ceil); + + /* Print chunking information */ + MPL_DBG_MSG_FMT(MPIR_DBG_COLL, VERBOSE, (MPL_DBG_FDEST, + "Reduce shmgr pipeline info: segsize=%d count=%d num_chunks=%d chunk_size_floor=%d chunk_size_ceil=%d \n", + MPIDI_POSIX_RELEASE_GATHER_REDUCE_CELLSIZE, count, + num_chunks, chunk_size_floor, chunk_size_ceil)); + + /* Do pipelined release-gather */ + for (i = 0; i < num_chunks; i++) { + int chunk_count = (i == 0) ? chunk_size_floor : chunk_size_ceil; + + mpi_errno = + MPIDI_POSIX_mpi_release_gather_release(NULL, 0, MPI_DATATYPE_NULL, root, + comm_ptr, errflag, + MPIDI_POSIX_RELEASE_GATHER_OPCODE_REDUCE); + if (mpi_errno) { + /* for communication errors, just record the error but continue */ + *errflag = + MPIX_ERR_PROC_FAILED == + MPIR_ERR_GET_CLASS(mpi_errno) ? MPIR_ERR_PROC_FAILED : MPIR_ERR_OTHER; + MPIR_ERR_SET(mpi_errno, *errflag, "**fail"); + MPIR_ERR_ADD(mpi_errno_ret, mpi_errno); + } + + mpi_errno = + MPIDI_POSIX_mpi_release_gather_gather((char *) sendbuf + offset * extent, + (char *) recvbuf + offset * extent, + chunk_count, datatype, op, root, comm_ptr, + errflag, + MPIDI_POSIX_RELEASE_GATHER_OPCODE_REDUCE); + if (mpi_errno) { + /* for communication errors, just record the error but continue */ + *errflag = + MPIX_ERR_PROC_FAILED == + MPIR_ERR_GET_CLASS(mpi_errno) ? MPIR_ERR_PROC_FAILED : MPIR_ERR_OTHER; + MPIR_ERR_SET(mpi_errno, *errflag, "**fail"); + MPIR_ERR_ADD(mpi_errno_ret, mpi_errno); + } + offset += chunk_count; + } + + fn_exit: + MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_POSIX_MPI_REDUCE_RELEASE_GATHER); + return mpi_errno; + fn_fail: + goto fn_exit; +} + #endif /* POSIX_COLL_RELEASE_GATHER_H_INCLUDED */ diff --git a/src/mpid/ch4/shm/posix/posix_coll_select.h b/src/mpid/ch4/shm/posix/posix_coll_select.h index ad7cb656f5d..d6aa56a4a2b 100644 --- a/src/mpid/ch4/shm/posix/posix_coll_select.h +++ b/src/mpid/ch4/shm/posix/posix_coll_select.h @@ -101,6 +101,27 @@ MPIDI_POSIX_coll_algo_container_t *MPIDI_POSIX_Reduce_select(const void *sendbuf MPI_Aint type_size = 0; int pof2 = 0; + if (MPIDI_POSIX_Reduce_algo_choice == MPIDI_POSIX_Reduce_intra_release_gather_id && + MPIR_Op_is_commutative(op)) { + /* release_gather based algorithm can be used only if izem submodule is built (and enabled) + * and MPICH is not multi-threaded. Also when the op is commutative */ +#ifdef ENABLE_IZEM_ATOMIC +#ifdef MPICH_IS_THREADED + if (!MPIR_ThreadInfo.isThreaded) { + /* MPICH configured with threading support but not actually used */ + return &MPIDI_POSIX_Reduce_intra_release_gather_cnt; + } +#else + /* MPICH not configured with threading support */ + return &MPIDI_POSIX_Reduce_intra_release_gather_cnt; +#endif /* MPICH_IS_THREADED */ +#else + /* release_gather algo is chosen through CVAR but izem is not built */ + return &MPIDI_POSIX_Reduce_intra_invalid_cnt; +#endif /* ENABLE_IZEM_ATOMIC */ + } + + /* Choose from pt2pt based algorithms */ MPIR_Datatype_get_size_macro(datatype, type_size); pof2 = comm->pof2; if ((count * type_size > MPIR_CVAR_REDUCE_SHORT_MSG_SIZE) &&