diff --git a/src/aiori-MPIIO.c b/src/aiori-MPIIO.c index 38138657..cc8641a2 100755 --- a/src/aiori-MPIIO.c +++ b/src/aiori-MPIIO.c @@ -416,6 +416,10 @@ static IOR_offset_t MPIIO_Xfer(int access, aiori_fd_t * fdp, IOR_size_t * buffer * MPI_Datatype, MPI_Status *); */ MPI_Status status; + MPI_Count elementsAccessed; + MPI_Count elementSize; + IOR_offset_t xferBytes, expectedBytes; + int retryAccess = 1, xferRetries = 0; /* point functions to appropriate MPIIO calls */ if (access == WRITE) { /* WRITE */ @@ -443,100 +447,141 @@ static IOR_offset_t MPIIO_Xfer(int access, aiori_fd_t * fdp, IOR_size_t * buffer * Access_ordered = MPI_File_read_ordered; */ } - - /* - * 'useFileView' uses derived datatypes and individual file pointers - */ - if (param->useFileView) { - /* find offset in file */ - if (SeekOffset(mfd->fd, offset, module_options) < - 0) { - /* if unsuccessful */ - length = -1; - } else { - - /* - * 'useStridedDatatype' fits multi-strided pattern into a datatype; - * must use 'length' to determine repetitions (fix this for - * multi-segments someday, WEL): - * e.g., 'IOR -s 2 -b 32K -t 32K -a MPIIO --mpiio.useStridedDatatype --mpiio.useFileView' - */ - if (param->useStridedDatatype) { - if(offset >= (rank+1) * hints->blockSize){ - /* we shall write only once per transferSize */ - /* printf("FAKE access %d %lld\n", rank, offset); */ - return hints->transferSize; - } - length = hints->segmentCount; - MPI_CHECK(MPI_File_set_view(mfd->fd, offset, - mfd->contigType, - mfd->fileType, - "native", - (MPI_Info) MPI_INFO_NULL), "cannot set file view"); - /* printf("ACCESS %d %lld -> %lld\n", rank, offset, length); */ - }else{ - length = 1; - } - if (hints->collective) { - /* individual, collective call */ - MPI_CHECK(Access_all - (mfd->fd, buffer, length, - mfd->transferType, &status), - "cannot access collective"); - } else { - /* individual, noncollective call */ - MPI_CHECK(Access - (mfd->fd, buffer, length, - mfd->transferType, &status), - "cannot access noncollective"); - } - /* MPI-IO driver does "nontcontiguous" by transfering - * 'segment' regions of 'transfersize' bytes, but - * our caller WriteOrReadSingle does not know how to - * deal with us reporting that we wrote N times more - * data than requested. */ - length = hints->transferSize; - } - } else { + while( retryAccess ) { /* - * !useFileView does not use derived datatypes, but it uses either - * shared or explicit file pointers + * 'useFileView' uses derived datatypes and individual file pointers */ - if (param->useSharedFilePointer) { + if (param->useFileView) { /* find offset in file */ - if (SeekOffset - (mfd->fd, offset, module_options) < 0) { + if (SeekOffset(mfd->fd, offset, module_options) < + 0) { /* if unsuccessful */ length = -1; + retryAccess = 0; // don't retry } else { - /* shared, collective call */ + /* - * this needs to be properly implemented: - * - * MPI_CHECK(Access_ordered(fd.MPIIO, buffer, length, - * MPI_BYTE, &status), - * "cannot access shared, collective"); - */ - fprintf(stdout, - "useSharedFilePointer not implemented\n"); + * 'useStridedDatatype' fits multi-strided pattern into a datatype; + * must use 'length' to determine repetitions (fix this for + * multi-segments someday, WEL): + * e.g., 'IOR -s 2 -b 32K -t 32K -a MPIIO --mpiio.useStridedDatatype --mpiio.useFileView' + */ + MPI_CHECK(MPI_Type_size_x(mfd->transferType, &elementSize), "couldn't get size of type"); + if (param->useStridedDatatype) { + if(offset >= (rank+1) * hints->blockSize){ + /* we shall write only once per transferSize */ + /* printf("FAKE access %d %lld\n", rank, offset); */ + return hints->transferSize; + } + length = hints->segmentCount; + MPI_CHECK(MPI_File_set_view(mfd->fd, offset, + mfd->contigType, + mfd->fileType, + "native", + (MPI_Info) MPI_INFO_NULL), "cannot set file view"); + expectedBytes = hints->segmentCount * elementSize; + /* printf("ACCESS %d %lld -> %lld\n", rank, offset, length); */ + /* get the size of transferType; length is overloaded and used + * set to number of segments */ + }else{ + length = 1; + expectedBytes = elementSize; + } + if (hints->collective) { + /* individual, collective call */ + MPI_CHECK(Access_all + (mfd->fd, buffer, length, + mfd->transferType, &status), + "cannot access collective"); + } else { + /* individual, noncollective call */ + MPI_CHECK(Access + (mfd->fd, buffer, length, + mfd->transferType, &status), + "cannot access noncollective"); + } + /* MPI-IO driver does "nontcontiguous" by transfering + * 'segment' regions of 'transfersize' bytes, but + * our caller WriteOrReadSingle does not know how to + * deal with us reporting that we wrote N times more + * data than requested. */ + length = hints->transferSize; + MPI_CHECK(MPI_Get_elements_x(&status, MPI_INT, &elementsAccessed), + "can't get elements accessed" ); + xferBytes = elementsAccessed * sizeof(MPI_BYTE); } } else { - if (hints->collective) { - /* explicit, collective call */ - MPI_CHECK(Access_at_all - (mfd->fd, offset, - buffer, length, MPI_BYTE, &status), - "cannot access explicit, collective"); + /* + * !useFileView does not use derived datatypes, but it uses either + * shared or explicit file pointers + */ + if (param->useSharedFilePointer) { + /* find offset in file */ + if (SeekOffset + (mfd->fd, offset, module_options) < 0) { + /* if unsuccessful */ + length = -1; + } else { + /* shared, collective call */ + /* + * this needs to be properly implemented: + * + * MPI_CHECK(Access_ordered(fd.MPIIO, buffer, length, + * MPI_BYTE, &status), + * "cannot access shared, collective"); + */ + fprintf(stdout, + "useSharedFilePointer not implemented\n"); + } } else { - /* explicit, noncollective call */ - MPI_CHECK(Access_at - (mfd->fd, offset, - buffer, length, MPI_BYTE, &status), - "cannot access explicit, noncollective"); + if (hints->collective) { + /* explicit, collective call */ + MPI_CHECK(Access_at_all + (mfd->fd, offset, + buffer, length, MPI_BYTE, &status), + "cannot access explicit, collective"); + } else { + /* explicit, noncollective call */ + MPI_CHECK(Access_at + (mfd->fd, offset, + buffer, length, MPI_BYTE, &status), + "cannot access explicit, noncollective"); + } + } + MPI_CHECK(MPI_Get_elements_x(&status, MPI_INT, &elementsAccessed), + "can't get elements accessed" ); + + expectedBytes = length; + xferBytes = elementsAccessed * sizeof(MPI_BYTE); + } + + /* Retrying collective xfers would require syncing after every IO to check if any + * IOs were short and all ranks retrying. Instead, report the short access which + * will lead to an abort after returning. Otherwise, retry the IO similarly to + * what's done for POSIX except retrying the full request. expectedBytes is + * transferSize/length unless --mpiio-useStridedDatatype and it will then be + * segmentCount * transferSize */ + if (xferBytes != expectedBytes ){ + WARNF("task %d, partial %s, %lld of %lld bytes at offset %lld, xferRetries: %d\n", + rank, + access == WRITE ? "write()" : "read()", + xferBytes, expectedBytes, + offset, xferRetries); + /* don't allow retry if collective, return the short xfer amount now */ + if (hints->collective) { + WARNF("task %d, not retrying collective %s\n", rank, + access == WRITE ? "write()" : "read()"); + return xferBytes; + } + if (xferRetries++ > MAX_RETRY || hints->singleXferAttempt){ + WARN("too many retries -- aborting"); + return xferBytes; } + } else { + retryAccess = 0; // expected data transferred, return normally } } - return hints->transferSize; + return hints->transferSize; // short xfers already returned in the expectedBytes/retry check } /*