Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix to check bytes accessed by MPIIO xfers and retry if possible #501

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
207 changes: 126 additions & 81 deletions src/aiori-MPIIO.c
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,10 @@ static IOR_offset_t MPIIO_Xfer(int access, aiori_fd_t * fdp, IOR_size_t * buffer
* MPI_Datatype, MPI_Status *);
*/
MPI_Status status;
MPI_Count elementsAccessed;
MPI_Count elementSize;
IOR_offset_t xferBytes, expectedBytes;
int retryAccess = 1, xferRetries = 0;

/* point functions to appropriate MPIIO calls */
if (access == WRITE) { /* WRITE */
Expand Down Expand Up @@ -443,100 +447,141 @@ static IOR_offset_t MPIIO_Xfer(int access, aiori_fd_t * fdp, IOR_size_t * buffer
* Access_ordered = MPI_File_read_ordered;
*/
}

/*
* 'useFileView' uses derived datatypes and individual file pointers
*/
if (param->useFileView) {
/* find offset in file */
if (SeekOffset(mfd->fd, offset, module_options) <
0) {
/* if unsuccessful */
length = -1;
} else {

/*
* 'useStridedDatatype' fits multi-strided pattern into a datatype;
* must use 'length' to determine repetitions (fix this for
* multi-segments someday, WEL):
* e.g., 'IOR -s 2 -b 32K -t 32K -a MPIIO --mpiio.useStridedDatatype --mpiio.useFileView'
*/
if (param->useStridedDatatype) {
if(offset >= (rank+1) * hints->blockSize){
/* we shall write only once per transferSize */
/* printf("FAKE access %d %lld\n", rank, offset); */
return hints->transferSize;
}
length = hints->segmentCount;
MPI_CHECK(MPI_File_set_view(mfd->fd, offset,
mfd->contigType,
mfd->fileType,
"native",
(MPI_Info) MPI_INFO_NULL), "cannot set file view");
/* printf("ACCESS %d %lld -> %lld\n", rank, offset, length); */
}else{
length = 1;
}
if (hints->collective) {
/* individual, collective call */
MPI_CHECK(Access_all
(mfd->fd, buffer, length,
mfd->transferType, &status),
"cannot access collective");
} else {
/* individual, noncollective call */
MPI_CHECK(Access
(mfd->fd, buffer, length,
mfd->transferType, &status),
"cannot access noncollective");
}
/* MPI-IO driver does "nontcontiguous" by transfering
* 'segment' regions of 'transfersize' bytes, but
* our caller WriteOrReadSingle does not know how to
* deal with us reporting that we wrote N times more
* data than requested. */
length = hints->transferSize;
}
} else {
while( retryAccess ) {
/*
* !useFileView does not use derived datatypes, but it uses either
* shared or explicit file pointers
* 'useFileView' uses derived datatypes and individual file pointers
*/
if (param->useSharedFilePointer) {
if (param->useFileView) {
/* find offset in file */
if (SeekOffset
(mfd->fd, offset, module_options) < 0) {
if (SeekOffset(mfd->fd, offset, module_options) <
0) {
/* if unsuccessful */
length = -1;
retryAccess = 0; // don't retry
} else {
/* shared, collective call */

/*
* this needs to be properly implemented:
*
* MPI_CHECK(Access_ordered(fd.MPIIO, buffer, length,
* MPI_BYTE, &status),
* "cannot access shared, collective");
*/
fprintf(stdout,
"useSharedFilePointer not implemented\n");
* 'useStridedDatatype' fits multi-strided pattern into a datatype;
* must use 'length' to determine repetitions (fix this for
* multi-segments someday, WEL):
* e.g., 'IOR -s 2 -b 32K -t 32K -a MPIIO --mpiio.useStridedDatatype --mpiio.useFileView'
*/
MPI_CHECK(MPI_Type_size_x(mfd->transferType, &elementSize), "couldn't get size of type");
if (param->useStridedDatatype) {
if(offset >= (rank+1) * hints->blockSize){
/* we shall write only once per transferSize */
/* printf("FAKE access %d %lld\n", rank, offset); */
return hints->transferSize;
}
length = hints->segmentCount;
MPI_CHECK(MPI_File_set_view(mfd->fd, offset,
mfd->contigType,
mfd->fileType,
"native",
(MPI_Info) MPI_INFO_NULL), "cannot set file view");
expectedBytes = hints->segmentCount * elementSize;
/* printf("ACCESS %d %lld -> %lld\n", rank, offset, length); */
/* get the size of transferType; length is overloaded and used
* set to number of segments */
}else{
length = 1;
expectedBytes = elementSize;
}
if (hints->collective) {
/* individual, collective call */
MPI_CHECK(Access_all
(mfd->fd, buffer, length,
mfd->transferType, &status),
"cannot access collective");
} else {
/* individual, noncollective call */
MPI_CHECK(Access
(mfd->fd, buffer, length,
mfd->transferType, &status),
"cannot access noncollective");
}
/* MPI-IO driver does "nontcontiguous" by transfering
* 'segment' regions of 'transfersize' bytes, but
* our caller WriteOrReadSingle does not know how to
* deal with us reporting that we wrote N times more
* data than requested. */
length = hints->transferSize;
MPI_CHECK(MPI_Get_elements_x(&status, MPI_INT, &elementsAccessed),
"can't get elements accessed" );
xferBytes = elementsAccessed * sizeof(MPI_BYTE);
}
} else {
if (hints->collective) {
/* explicit, collective call */
MPI_CHECK(Access_at_all
(mfd->fd, offset,
buffer, length, MPI_BYTE, &status),
"cannot access explicit, collective");
/*
* !useFileView does not use derived datatypes, but it uses either
* shared or explicit file pointers
*/
if (param->useSharedFilePointer) {
/* find offset in file */
if (SeekOffset
(mfd->fd, offset, module_options) < 0) {
/* if unsuccessful */
length = -1;
} else {
/* shared, collective call */
/*
* this needs to be properly implemented:
*
* MPI_CHECK(Access_ordered(fd.MPIIO, buffer, length,
* MPI_BYTE, &status),
* "cannot access shared, collective");
*/
fprintf(stdout,
"useSharedFilePointer not implemented\n");
}
} else {
/* explicit, noncollective call */
MPI_CHECK(Access_at
(mfd->fd, offset,
buffer, length, MPI_BYTE, &status),
"cannot access explicit, noncollective");
if (hints->collective) {
/* explicit, collective call */
MPI_CHECK(Access_at_all
(mfd->fd, offset,
buffer, length, MPI_BYTE, &status),
"cannot access explicit, collective");
} else {
/* explicit, noncollective call */
MPI_CHECK(Access_at
(mfd->fd, offset,
buffer, length, MPI_BYTE, &status),
"cannot access explicit, noncollective");
}
}
MPI_CHECK(MPI_Get_elements_x(&status, MPI_INT, &elementsAccessed),
"can't get elements accessed" );

expectedBytes = length;
xferBytes = elementsAccessed * sizeof(MPI_BYTE);
}

/* Retrying collective xfers would require syncing after every IO to check if any
* IOs were short and all ranks retrying. Instead, report the short access which
* will lead to an abort after returning. Otherwise, retry the IO similarly to
* what's done for POSIX except retrying the full request. expectedBytes is
* transferSize/length unless --mpiio-useStridedDatatype and it will then be
* segmentCount * transferSize */
if (xferBytes != expectedBytes ){
WARNF("task %d, partial %s, %lld of %lld bytes at offset %lld, xferRetries: %d\n",
rank,
access == WRITE ? "write()" : "read()",
xferBytes, expectedBytes,
offset, xferRetries);
/* don't allow retry if collective, return the short xfer amount now */
if (hints->collective) {
WARNF("task %d, not retrying collective %s\n", rank,
access == WRITE ? "write()" : "read()");
return xferBytes;
}
if (xferRetries++ > MAX_RETRY || hints->singleXferAttempt){
WARN("too many retries -- aborting");
return xferBytes;
}
} else {
retryAccess = 0; // expected data transferred, return normally
}
}
return hints->transferSize;
return hints->transferSize; // short xfers already returned in the expectedBytes/retry check
}

/*
Expand Down