Skip to content

Commit

Permalink
posix: Implement shm bcast using release, gather
Browse files Browse the repository at this point in the history
Intra-node bcast is implemented using release step followed by gather
step. Data movement takes place in release (top-down step) in the tree.
Gather (bottom-up step) is used for acknowledgement. Non-roots notify
the root that the data was copied out of shared bcast buffer and root
can reuse the buffer for next bcast call. Bcast buffer is split into
multiple cells, so that the copying in of the next chunk by root can be
overlapped with copying out of previous chunks by non-roots
(pipelining). Large messages are split into chunks of cell size each and
pipelining is used.
  • Loading branch information
jainsura-intel committed Apr 11, 2019
1 parent 6c9fd39 commit e546ffc
Show file tree
Hide file tree
Showing 8 changed files with 213 additions and 4 deletions.
3 changes: 3 additions & 0 deletions src/mpid/ch4/shm/posix/Makefile.mk
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ noinst_HEADERS += src/mpid/ch4/shm/posix/posix_am.h \
src/mpid/ch4/shm/posix/posix_proc.h \
src/mpid/ch4/shm/posix/posix_types.h

if ENABLE_IZEM_ATOMIC
noinst_HEADERS += src/mpid/ch4/shm/posix/posix_coll_release_gather.h
endif
mpi_core_sources += src/mpid/ch4/shm/posix/globals.c \
src/mpid/ch4/shm/posix/posix_comm.c \
src/mpid/ch4/shm/posix/posix_init.c \
Expand Down
15 changes: 15 additions & 0 deletions src/mpid/ch4/shm/posix/posix_coll.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
#include "ch4_coll_select.h"
#include "posix_coll_params.h"
#include "posix_coll_select.h"
#ifdef ENABLE_IZEM_ATOMIC
#include "posix_coll_release_gather.h"
#endif

static inline int MPIDI_POSIX_mpi_barrier(MPIR_Comm * comm, MPIR_Errflag_t * errflag,
const void *ch4_algo_parameters_container_in)
Expand Down Expand Up @@ -77,6 +80,18 @@ static inline int MPIDI_POSIX_mpi_bcast(void *buffer, int count, MPI_Datatype da
MPIR_Bcast_intra_scatter_ring_allgather(buffer, count, datatype,
root, comm, errflag);
break;
case MPIDI_POSIX_Bcast_intra_release_gather_id:
#ifdef ENABLE_IZEM_ATOMIC
mpi_errno =
MPIDI_POSIX_mpi_bcast_release_gather(buffer, count, datatype, root, comm, errflag);
#else
/* else block is needed to keep the compiler happy */
/* release_gather based algorithms cannot be used without izem submodule */
MPIR_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**noizem");
#endif
break;
case MPIDI_POSIX_Bcast_intra_invalid_id:
MPIR_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**noizem");
default:
mpi_errno = MPIR_Bcast_impl(buffer, count, datatype, root, comm, errflag);
break;
Expand Down
1 change: 1 addition & 0 deletions src/mpid/ch4/shm/posix/posix_coll_containers.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ extern const MPIDI_POSIX_coll_algo_container_t
MPIDI_POSIX_Bcast_intra_scatter_recursive_doubling_allgather_cnt;
extern const MPIDI_POSIX_coll_algo_container_t MPIDI_POSIX_Bcast_intra_scatter_ring_allgather_cnt;
extern const MPIDI_POSIX_coll_algo_container_t MPIDI_POSIX_Bcast_intra_invalid_cnt;
extern const MPIDI_POSIX_coll_algo_container_t MPIDI_POSIX_Bcast_intra_release_gather_cnt;

/* Reduce POSIX containers declaration */
extern const MPIDI_POSIX_coll_algo_container_t MPIDI_POSIX_Reduce_intra_binomial_cnt;
Expand Down
4 changes: 4 additions & 0 deletions src/mpid/ch4/shm/posix/posix_coll_globals_default.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ const MPIDI_POSIX_coll_algo_container_t MPIDI_POSIX_Bcast_intra_invalid_cnt = {
.id = MPIDI_POSIX_Bcast_intra_invalid_id
};

const MPIDI_POSIX_coll_algo_container_t MPIDI_POSIX_Bcast_intra_release_gather_cnt = {
.id = MPIDI_POSIX_Bcast_intra_release_gather_id
};

/* Reduce default POSIX containers initialization*/
const MPIDI_POSIX_coll_algo_container_t MPIDI_POSIX_Reduce_intra_reduce_scatter_gather_cnt = {
.id = MPIDI_POSIX_Reduce_intra_reduce_scatter_gather_id
Expand Down
9 changes: 7 additions & 2 deletions src/mpid/ch4/shm/posix/posix_coll_init.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
scope : MPI_T_SCOPE_ALL_EQ
description : >-
Variable to select algorithm for intra-node bcast
auto - Internal algorithm selection from pt2pt based algorithms
auto - Internal algorithm selection from pt2pt based algorithms
release_gather - Force shm optimized algo using release, gather primitives
(izem submodule should be build and enabled for this)
- name : MPIR_CVAR_REDUCE_POSIX_INTRA_ALGORITHM
category : COLLECTIVE
Expand All @@ -49,7 +51,10 @@ int MPIDI_POSIX_coll_cvars_init(void)
int mpi_errno = MPI_SUCCESS;

/* Bcast */
MPIDI_POSIX_Bcast_algo_choice = MPIDI_POSIX_Bcast_intra_auto_id;
if (0 == strcmp(MPIR_CVAR_BCAST_POSIX_INTRA_ALGORITHM, "release_gather"))
MPIDI_POSIX_Bcast_algo_choice = MPIDI_POSIX_Bcast_intra_release_gather_id;
else
MPIDI_POSIX_Bcast_algo_choice = MPIDI_POSIX_Bcast_intra_auto_id;

/* Reduce */
MPIDI_POSIX_Reduce_algo_choice = MPIDI_POSIX_Reduce_intra_auto_id;
Expand Down
9 changes: 8 additions & 1 deletion src/mpid/ch4/shm/posix/posix_coll_params.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ typedef enum {
MPIDI_POSIX_Bcast_intra_scatter_recursive_doubling_allgather_id,
MPIDI_POSIX_Bcast_intra_scatter_ring_allgather_id,
MPIDI_POSIX_Bcast_intra_auto_id,
MPIDI_POSIX_Bcast_intra_invalid_id
MPIDI_POSIX_Bcast_intra_invalid_id,
MPIDI_POSIX_Bcast_intra_release_gather_id
} MPIDI_POSIX_Bcast_id_t;

typedef union {
Expand All @@ -26,6 +27,12 @@ typedef union {
int radix;
int block_size;
} posix_bcast_knomial_parameters;
struct MPIDI_POSIX_Bcast_release_gather_parameters {
int radix;
int tree_type;
int buffer_size;
int num_buffers;
} posix_bcast_release_gather_parameters;
struct MPIDI_POSIX_Bcast_empty_parameters {
int empty;
} posix_bcast_empty_parameters;
Expand Down
155 changes: 155 additions & 0 deletions src/mpid/ch4/shm/posix/posix_coll_release_gather.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
/*
*
* (C) 2018 by Argonne National Laboratory.
* See COPYRIGHT in top-level directory.
*/

#ifndef POSIX_COLL_RELEASE_GATHER_H_INCLUDED
#define POSIX_COLL_RELEASE_GATHER_H_INCLUDED

#include "mpiimpl.h"
#include "algo_common.h"
#include "release_gather.h"

/* Intra-node bcast is implemented as a release step followed by gather step in release_gather
* framework. The actual data movement happens in release step. Gather step makes sure that
* the shared bcast buffer can be reused for next bcast call. Release gather framework has
* multitple cells in bcast buffer, so that the copying in next cell can be overlapped with
* copying out of previous cells (pipelining).
*/
MPL_STATIC_INLINE_PREFIX int MPIDI_POSIX_mpi_bcast_release_gather(void *buffer,
int count,
MPI_Datatype datatype,
int root, MPIR_Comm * comm_ptr,
MPIR_Errflag_t * errflag)
{
MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_POSIX_MPI_BCAST_RELEASE_GATHER);
MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_POSIX_MPI_BCAST_RELEASE_GATHER);

int i, my_rank, num_chunks, chunk_count_floor, chunk_count_ceil;
int offset = 0, is_contig, ori_count = count;
int mpi_errno = MPI_SUCCESS, mpi_errno_ret = MPI_SUCCESS;
MPI_Aint position;
MPI_Aint lb, true_extent, extent, type_size;
void *ori_buffer = buffer;
MPI_Datatype ori_datatype = datatype;

/* If there is only one process or no data, return */
if (count == 0 || (MPIR_Comm_size(comm_ptr) == 1)) {
goto fn_exit;
}

/* Lazy initialization of release_gather specific struct */
mpi_errno =
MPIDI_POSIX_mpi_release_gather_comm_init(comm_ptr, MPIDI_POSIX_RELEASE_GATHER_OPCODE_BCAST);
if (mpi_errno) {
/* Fall back to other algo as release_gather based bcast cannot be used */
mpi_errno = MPIR_Bcast_impl(buffer, count, datatype, root, comm_ptr, errflag);
goto fn_exit;
}

my_rank = MPIR_Comm_rank(comm_ptr);
MPIR_Type_get_extent_impl(datatype, &lb, &extent);
MPIR_Type_get_true_extent_impl(datatype, &lb, &true_extent);
extent = MPL_MAX(extent, true_extent);

MPIR_Datatype_is_contig(datatype, &is_contig);

if (is_contig) {
MPIR_Datatype_get_size_macro(datatype, type_size);
} else {
MPIR_Pack_size_impl(1, datatype, &type_size);
}

if (!is_contig || type_size >= MPIDI_POSIX_RELEASE_GATHER_BCAST_CELLSIZE) {
/* Convert to MPI_BYTE datatype */
count = type_size * count;
datatype = MPI_BYTE;
type_size = 1;
extent = 1;

if (!is_contig) {
buffer = MPL_malloc(count, MPL_MEM_COLL);
if (my_rank == root) {
/* Root packs the data before sending, for non contiguous datatypes */
position = 0;
mpi_errno =
MPIR_Pack_impl(ori_buffer, ori_count, ori_datatype, buffer, count, &position);
if (mpi_errno) {
/* for communication errors, just record the error but continue */
*errflag = MPIR_ERR_OTHER;
MPIR_ERR_SET(mpi_errno, *errflag, "**fail");
MPIR_ERR_ADD(mpi_errno_ret, mpi_errno);
}
}
}
}

/* Calculate chunking information for pipelining */
MPIR_Algo_calculate_pipeline_chunk_info(MPIDI_POSIX_RELEASE_GATHER_BCAST_CELLSIZE, type_size,
count, &num_chunks, &chunk_count_floor,
&chunk_count_ceil);
/* Print chunking information */
MPL_DBG_MSG_FMT(MPIR_DBG_COLL, VERBOSE, (MPL_DBG_FDEST,
"Bcast shmgr pipeline info: segsize=%d count=%d num_chunks=%d chunk_count_floor=%d chunk_count_ceil=%d \n",
MPIDI_POSIX_RELEASE_GATHER_BCAST_CELLSIZE, count,
num_chunks, chunk_count_floor, chunk_count_ceil));

/* Do pipelined release-gather */
for (i = 0; i < num_chunks; i++) {
int chunk_count = (i == 0) ? chunk_count_floor : chunk_count_ceil;

mpi_errno =
MPIDI_POSIX_mpi_release_gather_release((char *) buffer + offset * extent,
chunk_count, datatype, root, comm_ptr,
errflag,
MPIDI_POSIX_RELEASE_GATHER_OPCODE_BCAST);
if (mpi_errno) {
/* for communication errors, just record the error but continue */
*errflag =
MPIX_ERR_PROC_FAILED ==
MPIR_ERR_GET_CLASS(mpi_errno) ? MPIR_ERR_PROC_FAILED : MPIR_ERR_OTHER;
MPIR_ERR_SET(mpi_errno, *errflag, "**fail");
MPIR_ERR_ADD(mpi_errno_ret, mpi_errno);
}

mpi_errno =
MPIDI_POSIX_mpi_release_gather_gather(NULL, NULL, 0, MPI_DATATYPE_NULL,
MPI_OP_NULL, root, comm_ptr, errflag,
MPIDI_POSIX_RELEASE_GATHER_OPCODE_BCAST);
if (mpi_errno) {
/* for communication errors, just record the error but continue */
*errflag =
MPIX_ERR_PROC_FAILED ==
MPIR_ERR_GET_CLASS(mpi_errno) ? MPIR_ERR_PROC_FAILED : MPIR_ERR_OTHER;
MPIR_ERR_SET(mpi_errno, *errflag, "**fail");
MPIR_ERR_ADD(mpi_errno_ret, mpi_errno);
}
offset += chunk_count;
}

if (!is_contig) {
if (my_rank != root) {
/* Non-root unpack the data if expecting non-contiguous datatypes */
position = 0;
mpi_errno =
MPIR_Unpack_impl(buffer, count, &position, ori_buffer, ori_count, ori_datatype);
if (mpi_errno) {
/* for communication errors, just record the error but continue */
*errflag = MPIR_ERR_OTHER;
MPIR_ERR_SET(mpi_errno, *errflag, "**fail");
MPIR_ERR_ADD(mpi_errno_ret, mpi_errno);
}
}
MPL_free(buffer);
}

fn_exit:
MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDI_POSIX_MPI_BCAST_RELEASE_GATHER);
return mpi_errno;
fn_fail:
goto fn_exit;
}

#endif /* POSIX_COLL_RELEASE_GATHER_H_INCLUDED */
21 changes: 20 additions & 1 deletion src/mpid/ch4/shm/posix/posix_coll_select.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,27 @@ MPIDI_POSIX_coll_algo_container_t *MPIDI_POSIX_Bcast_select(void *buffer,
int nbytes = 0;
MPI_Aint type_size = 0;

MPIR_Datatype_get_size_macro(datatype, type_size);
if (MPIDI_POSIX_Bcast_algo_choice == MPIDI_POSIX_Bcast_intra_release_gather_id) {
/* release_gather based algorithm can be used only if izem submodule is built (and enabled)
* and MPICH is not multi-threaded */
#ifdef ENABLE_IZEM_ATOMIC
#ifdef MPICH_IS_THREADED
if (!MPIR_ThreadInfo.isThreaded) {
/* MPICH configured with threading support but not actually used */
return &MPIDI_POSIX_Bcast_intra_release_gather_cnt;
}
#else
/* MPICH not configured with threading support */
return &MPIDI_POSIX_Bcast_intra_release_gather_cnt;
#endif /* MPICH_IS_THREADED */
#else
/* release_gather algo is chosen through CVAR but izem is not built */
return &MPIDI_POSIX_Bcast_intra_invalid_cnt;
#endif /* ENABLE_IZEM_ATOMIC */
}

/* Choose from pt2pt based algorithms */
MPIR_Datatype_get_size_macro(datatype, type_size);
nbytes = type_size * count;

if ((nbytes < MPIR_CVAR_BCAST_SHORT_MSG_SIZE) || (comm->local_size < MPIR_CVAR_BCAST_MIN_PROCS)) {
Expand Down

0 comments on commit e546ffc

Please sign in to comment.