diff --git a/src/mpid/ch4/shm/posix/posix_coll.h b/src/mpid/ch4/shm/posix/posix_coll.h index 660ddd1fec5..4cab059693b 100644 --- a/src/mpid/ch4/shm/posix/posix_coll.h +++ b/src/mpid/ch4/shm/posix/posix_coll.h @@ -592,6 +592,19 @@ static inline int MPIDI_POSIX_mpi_reduce(const void *sendbuf, void *recvbuf, int MPIR_Reduce_intra_binomial(sendbuf, recvbuf, count, datatype, op, root, comm, errflag); break; + case MPIDI_POSIX_Reduce_intra_release_gather_id: +#ifdef ENABLE_IZEM_ATOMIC + mpi_errno = + MPIDI_POSIX_mpi_reduce_release_gather(sendbuf, recvbuf, count, datatype, + op, root, comm, errflag); +#else + /* else block is needed to keep the compiler happy */ + /* release_gather based algorithms cannot be used without izem submodule */ + MPIR_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**noizem"); +#endif + break; + case MPIDI_POSIX_Reduce_intra_invalid_id: + MPIR_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**noizem"); default: mpi_errno = MPIR_Reduce_impl(sendbuf, recvbuf, count, datatype, op, root, comm, errflag); diff --git a/src/mpid/ch4/shm/posix/posix_coll_containers.h b/src/mpid/ch4/shm/posix/posix_coll_containers.h index 3a0be269b9a..f335566d5c0 100644 --- a/src/mpid/ch4/shm/posix/posix_coll_containers.h +++ b/src/mpid/ch4/shm/posix/posix_coll_containers.h @@ -15,7 +15,8 @@ extern const MPIDI_POSIX_coll_algo_container_t MPIDI_POSIX_Bcast_intra_release_g /* Reduce POSIX containers declaration */ extern const MPIDI_POSIX_coll_algo_container_t MPIDI_POSIX_Reduce_intra_binomial_cnt; extern const MPIDI_POSIX_coll_algo_container_t MPIDI_POSIX_Reduce_intra_reduce_scatter_gather_cnt; -extern const MPIDI_POSIX_coll_algo_container_t MPIDI_POSIX_Reduce_intra_reduce_invalid_cnt; +extern const MPIDI_POSIX_coll_algo_container_t MPIDI_POSIX_Reduce_intra_invalid_cnt; +extern const MPIDI_POSIX_coll_algo_container_t MPIDI_POSIX_Reduce_intra_release_gather_cnt; /* Allreduce POSIX containers declaration */ extern const MPIDI_POSIX_coll_algo_container_t MPIDI_POSIX_Allreduce_intra_recursive_doubling_cnt; diff --git a/src/mpid/ch4/shm/posix/posix_coll_globals_default.c b/src/mpid/ch4/shm/posix/posix_coll_globals_default.c index c4fe3244134..ab6f3ad95cd 100644 --- a/src/mpid/ch4/shm/posix/posix_coll_globals_default.c +++ b/src/mpid/ch4/shm/posix/posix_coll_globals_default.c @@ -43,6 +43,10 @@ const MPIDI_POSIX_coll_algo_container_t MPIDI_POSIX_Reduce_intra_invalid_cnt = { .id = MPIDI_POSIX_Reduce_intra_invalid_id }; +const MPIDI_POSIX_coll_algo_container_t MPIDI_POSIX_Reduce_intra_release_gather_cnt = { + .id = MPIDI_POSIX_Reduce_intra_release_gather_id +}; + /* Allreduce default POSIX containers initialization*/ const MPIDI_POSIX_coll_algo_container_t MPIDI_POSIX_Allreduce_intra_recursive_doubling_cnt = { .id = MPIDI_POSIX_Allreduce_intra_recursive_doubling_id diff --git a/src/mpid/ch4/shm/posix/posix_coll_init.c b/src/mpid/ch4/shm/posix/posix_coll_init.c index 857803c6290..080d9723d4f 100644 --- a/src/mpid/ch4/shm/posix/posix_coll_init.c +++ b/src/mpid/ch4/shm/posix/posix_coll_init.c @@ -38,7 +38,9 @@ scope : MPI_T_SCOPE_ALL_EQ description : >- Variable to select algorithm for intra-node reduce - auto - Internal algorithm selection from pt2pt based algorithms + auto - Internal algorithm selection from pt2pt based algorithms + release_gather - Force shm optimized algo using release, gather primitives + (izem submodule should be build and enabled for this) === END_MPI_T_CVAR_INFO_BLOCK === */ @@ -57,7 +59,10 @@ int MPIDI_POSIX_coll_cvars_init(void) MPIDI_POSIX_Bcast_algo_choice = MPIDI_POSIX_Bcast_intra_auto_id; /* Reduce */ - MPIDI_POSIX_Reduce_algo_choice = MPIDI_POSIX_Reduce_intra_auto_id; + if (0 == strcmp(MPIR_CVAR_REDUCE_POSIX_INTRA_ALGORITHM, "release_gather")) + MPIDI_POSIX_Reduce_algo_choice = MPIDI_POSIX_Reduce_intra_release_gather_id; + else + MPIDI_POSIX_Reduce_algo_choice = MPIDI_POSIX_Reduce_intra_auto_id; fn_exit: return mpi_errno; diff --git a/src/mpid/ch4/shm/posix/posix_coll_params.h b/src/mpid/ch4/shm/posix/posix_coll_params.h index 3cbbffdaa97..bbf08fdd377 100644 --- a/src/mpid/ch4/shm/posix/posix_coll_params.h +++ b/src/mpid/ch4/shm/posix/posix_coll_params.h @@ -44,11 +44,18 @@ typedef enum { MPIDI_POSIX_Reduce_intra_reduce_scatter_gather_id, MPIDI_POSIX_Reduce_intra_binomial_id, MPIDI_POSIX_Reduce_intra_auto_id, - MPIDI_POSIX_Reduce_intra_invalid_id + MPIDI_POSIX_Reduce_intra_invalid_id, + MPIDI_POSIX_Reduce_intra_release_gather_id } MPIDI_POSIX_Reduce_id_t; typedef union { /* reserved for parameters related to SHM specific collectives */ + struct MPIDI_POSIX_Reduce_release_gather_parameters { + int radix; + int tree_type; + int buffer_size; + int num_buffers; + } posix_reduce_release_gather_parameters; struct MPIDI_POSIX_Reduce_empty_parameters { int empty; } posix_reduce_empty_parameters; diff --git a/src/mpid/ch4/shm/posix/posix_coll_release_gather.h b/src/mpid/ch4/shm/posix/posix_coll_release_gather.h index ae4a85bea07..bb49b786409 100644 --- a/src/mpid/ch4/shm/posix/posix_coll_release_gather.h +++ b/src/mpid/ch4/shm/posix/posix_coll_release_gather.h @@ -152,4 +152,122 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_POSIX_mpi_bcast_release_gather(void *buffer, goto fn_exit; } +/* Intra-node reduce is implemented as a release step followed by gather step in release_gather + * framework. The actual data movement happens in gather step. Release step makes sure that + * the shared reduce buffer can be reused for next reduce call. Release gather framework has + * multitple cells in reduce buffer, so that the copying in next cell can be overlapped with + * reduction and copying out of previous cells (pipelining). + */ +MPL_STATIC_INLINE_PREFIX int MPIDI_POSIX_mpi_reduce_release_gather(const void *sendbuf, + void *recvbuf, int count, + MPI_Datatype datatype, + MPI_Op op, int root, + MPIR_Comm * comm_ptr, + MPIR_Errflag_t * errflag) +{ + int i, num_chunks, chunk_size_floor, chunk_size_ceil; + int offset = 0, is_contig; + int mpi_errno = MPI_SUCCESS, mpi_errno_ret = MPI_SUCCESS; + MPI_Aint lb, true_extent, extent, type_size; + + MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_POSIX_MPI_REDUCE_RELEASE_GATHER); + MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_POSIX_MPI_REDUCE_RELEASE_GATHER); + + /* If there is no data, return */ + if (count == 0) { + goto fn_exit; + } + + if ((MPIR_Comm_size(comm_ptr) == 1) && (sendbuf != MPI_IN_PLACE)) { + /* Simply copy the data from sendbuf to recvbuf if there is only 1 rank and MPI_IN_PLACE + * is not used */ + mpi_errno = MPIR_Localcopy(sendbuf, count, datatype, recvbuf, count, datatype); + if (mpi_errno) { + /* for communication errors, just record the error but continue */ + *errflag = MPIR_ERR_OTHER; + MPIR_ERR_SET(mpi_errno, *errflag, "**fail"); + MPIR_ERR_ADD(mpi_errno_ret, mpi_errno); + } + goto fn_exit; + } + + /* Lazy initialization of release_gather specific struct */ + mpi_errno = + MPIDI_POSIX_mpi_release_gather_comm_init(comm_ptr, + MPIDI_POSIX_RELEASE_GATHER_OPCODE_REDUCE); + if (mpi_errno) { + /* Fall back to other algo as release_gather algo cannot be used */ + mpi_errno = + MPIR_Reduce_impl(sendbuf, recvbuf, count, datatype, op, root, comm_ptr, errflag); + goto fn_exit; + } + + MPIR_Type_get_extent_impl(datatype, &lb, &extent); + MPIR_Type_get_true_extent_impl(datatype, &lb, &true_extent); + extent = MPL_MAX(extent, true_extent); + + MPIR_Datatype_is_contig(datatype, &is_contig); + + if (is_contig) { + MPIR_Datatype_get_size_macro(datatype, type_size); + } else { + MPIR_Pack_size_impl(1, datatype, &type_size); + } + + if (sendbuf == MPI_IN_PLACE) { + sendbuf = recvbuf; + } + + /* Calculate chunking information, taking max(extent, type_size) handles contiguous and non-contiguous datatypes both */ + MPIR_Algo_calculate_pipeline_chunk_info(MPIDI_POSIX_RELEASE_GATHER_REDUCE_CELLSIZE, + MPL_MAX(extent, type_size), count, &num_chunks, + &chunk_size_floor, &chunk_size_ceil); + + /* Print chunking information */ + MPL_DBG_MSG_FMT(MPIR_DBG_COLL, VERBOSE, (MPL_DBG_FDEST, + "Reduce shmgr pipeline info: segsize=%d count=%d num_chunks=%d chunk_size_floor=%d chunk_size_ceil=%d \n", + MPIDI_POSIX_RELEASE_GATHER_REDUCE_CELLSIZE, count, + num_chunks, chunk_size_floor, chunk_size_ceil)); + + /* Do pipelined release-gather */ + for (i = 0; i < num_chunks; i++) { + int chunk_count = (i == 0) ? chunk_size_floor : chunk_size_ceil; + + mpi_errno = + MPIDI_POSIX_mpi_release_gather_release(NULL, 0, MPI_DATATYPE_NULL, root, + comm_ptr, errflag, + MPIDI_POSIX_RELEASE_GATHER_OPCODE_REDUCE); + if (mpi_errno) { + /* for communication errors, just record the error but continue */ + *errflag = + MPIX_ERR_PROC_FAILED == + MPIR_ERR_GET_CLASS(mpi_errno) ? MPIR_ERR_PROC_FAILED : MPIR_ERR_OTHER; + MPIR_ERR_SET(mpi_errno, *errflag, "**fail"); + MPIR_ERR_ADD(mpi_errno_ret, mpi_errno); + } + + mpi_errno = + MPIDI_POSIX_mpi_release_gather_gather((char *) sendbuf + offset * extent, + (char *) recvbuf + offset * extent, + chunk_count, datatype, op, root, comm_ptr, + errflag, + MPIDI_POSIX_RELEASE_GATHER_OPCODE_REDUCE); + if (mpi_errno) { + /* for communication errors, just record the error but continue */ + *errflag = + MPIX_ERR_PROC_FAILED == + MPIR_ERR_GET_CLASS(mpi_errno) ? MPIR_ERR_PROC_FAILED : MPIR_ERR_OTHER; + MPIR_ERR_SET(mpi_errno, *errflag, "**fail"); + MPIR_ERR_ADD(mpi_errno_ret, mpi_errno); + } + offset += chunk_count; + } + + fn_exit: + MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_POSIX_MPI_REDUCE_RELEASE_GATHER); + return mpi_errno; + fn_fail: + goto fn_exit; +} + #endif /* POSIX_COLL_RELEASE_GATHER_H_INCLUDED */ diff --git a/src/mpid/ch4/shm/posix/posix_coll_select.h b/src/mpid/ch4/shm/posix/posix_coll_select.h index ad7cb656f5d..d6aa56a4a2b 100644 --- a/src/mpid/ch4/shm/posix/posix_coll_select.h +++ b/src/mpid/ch4/shm/posix/posix_coll_select.h @@ -101,6 +101,27 @@ MPIDI_POSIX_coll_algo_container_t *MPIDI_POSIX_Reduce_select(const void *sendbuf MPI_Aint type_size = 0; int pof2 = 0; + if (MPIDI_POSIX_Reduce_algo_choice == MPIDI_POSIX_Reduce_intra_release_gather_id && + MPIR_Op_is_commutative(op)) { + /* release_gather based algorithm can be used only if izem submodule is built (and enabled) + * and MPICH is not multi-threaded. Also when the op is commutative */ +#ifdef ENABLE_IZEM_ATOMIC +#ifdef MPICH_IS_THREADED + if (!MPIR_ThreadInfo.isThreaded) { + /* MPICH configured with threading support but not actually used */ + return &MPIDI_POSIX_Reduce_intra_release_gather_cnt; + } +#else + /* MPICH not configured with threading support */ + return &MPIDI_POSIX_Reduce_intra_release_gather_cnt; +#endif /* MPICH_IS_THREADED */ +#else + /* release_gather algo is chosen through CVAR but izem is not built */ + return &MPIDI_POSIX_Reduce_intra_invalid_cnt; +#endif /* ENABLE_IZEM_ATOMIC */ + } + + /* Choose from pt2pt based algorithms */ MPIR_Datatype_get_size_macro(datatype, type_size); pof2 = comm->pof2; if ((count * type_size > MPIR_CVAR_REDUCE_SHORT_MSG_SIZE) &&