diff --git a/thorlcr/msort/tsorts.cpp b/thorlcr/msort/tsorts.cpp index f2603ea7c45..5f4533de69d 100644 --- a/thorlcr/msort/tsorts.cpp +++ b/thorlcr/msort/tsorts.cpp @@ -69,7 +69,7 @@ class CWriteIntercept : public CSimpleInterface CActivityBase &activity; CriticalSection crit; IThorRowInterfaces *rowIf; - Owned dataFile, idxFile; + Owned dataFile, idxFile; Owned dataFileIO, idxFileIO; Owned dataFileStream; Linked idxFileStream; @@ -91,6 +91,7 @@ class CWriteIntercept : public CSimpleInterface if (idxFileStream.get()) { idxFileStream->write(sizeof(o),&o); + idxFile->noteSize(idxFileIO->getStatistic(StSizeDiskWrite)); return; } if (lastofs) @@ -100,12 +101,13 @@ class CWriteIntercept : public CSimpleInterface // right create idx StringBuffer tempname; GetTempFilePath(tempname.clear(),"srtidx"); - idxFile.setown(createIFile(tempname.str())); - idxFileIO.setown(idxFile->open(IFOcreaterw)); + Owned tmpFile = createIFile(tempname.str()); + idxFile.setown(new CFileOwner(tmpFile, activity.queryTempFileSizeTracker())); + idxFileIO.setown(tmpFile->open(IFOcreaterw)); if (!idxFileIO.get()) { StringBuffer err; - err.append("Cannot create ").append(idxFile->queryFilename()); + err.append("Cannot create ").append(tmpFile->queryFilename()); LOG(MCerror, "%s", err.str()); throw MakeActivityException(&activity, -1, "%s", err.str()); } @@ -141,7 +143,7 @@ class CWriteIntercept : public CSimpleInterface if (!idxFileIO.get()) { assertex(idxFile); - idxFileIO.setown(idxFile->open(IFOread)); + idxFileIO.setown(idxFile->queryIFile().open(IFOread)); } size32_t rd = idxFileIO->read((offset_t)pos*(offset_t)sizeof(offset_t),sizeof(*ofs)*n,ofs); if (closeIO) @@ -161,12 +163,12 @@ class CWriteIntercept : public CSimpleInterface { if (parent->compressedOverflowFile) { - Owned iFileIO = createCompressedFileReader(parent->dataFile); + Owned iFileIO = createCompressedFileReader(&(parent->dataFile->queryIFile())); assertex(iFileIO); stream.setown(createRowStreamEx(iFileIO, parent->rowIf, startOffset, (offset_t)-1, max)); } else - stream.setown(createRowStreamEx(parent->dataFile, parent->rowIf, startOffset, (offset_t)-1, max)); + stream.setown(createRowStreamEx(&(parent->dataFile->queryIFile()), parent->rowIf, startOffset, (offset_t)-1, max)); } virtual const void *nextRow() { return stream->nextRow(); } virtual void stop() { stream->stop(); } @@ -187,16 +189,13 @@ class CWriteIntercept : public CSimpleInterface ~CWriteIntercept() { closeFiles(); - if (dataFile) - dataFile->remove(); - if (idxFile) - idxFile->remove(); } offset_t write(IRowStream *input) { StringBuffer tempname; GetTempFilePath(tempname,"srtmrg"); - dataFile.setown(createIFile(tempname.str())); + Owned tmpFile = createIFile(tempname.str()); + dataFile.setown(new CFileOwner(tmpFile, activity.queryTempFileSizeTracker())); unsigned rwFlags = DEFAULT_RWFLAGS; size32_t compBlkSz = 0; @@ -227,7 +226,7 @@ class CWriteIntercept : public CSimpleInterface } } - Owned output = createRowWriter(dataFile, rowIf, rwFlags, nullptr, compBlkSz); + Owned output = createRowWriter(&(dataFile->queryIFile()), rowIf, rwFlags, nullptr, compBlkSz); bool overflowed = false; ActPrintLog(&activity, "Local Overflow Merge start"); @@ -262,6 +261,7 @@ class CWriteIntercept : public CSimpleInterface output->flush(); offset_t end = output->getPosition(); output.clear(); + dataFile->noteSize(dataFileIO->getStatistic(StSizeDiskWrite)); writeidxofs(end); if (idxFileIO) { @@ -271,7 +271,7 @@ class CWriteIntercept : public CSimpleInterface } if (overflowed) IWARNLOG("Overflowed by %" I64F "d", overflowsize); - ActPrintLog(&activity, "Local Overflow Merge done: overflow file '%s', size = %" I64F "d", dataFile->queryFilename(), dataFile->size()); + ActPrintLog(&activity, "Local Overflow Merge done: overflow file '%s', size = %" I64F "d", dataFile->queryIFile().queryFilename(), dataFile->queryIFile().size()); return end; } IRowStream *getStream(offset_t startOffset, rowcount_t max) @@ -299,7 +299,7 @@ class CWriteIntercept : public CSimpleInterface size32_t idxSz = (size32_t)(ofs[1]-ofs[0]); if (!dataFileIO) { - dataFileIO.setown(dataFile->open(IFOread)); + dataFileIO.setown(dataFile->queryIFile().open(IFOread)); if (compressedOverflowFile) { dataFileIO.setown(createCompressedFileReader(dataFileIO)); diff --git a/thorlcr/thorutil/thmem.cpp b/thorlcr/thorutil/thmem.cpp index d866d39a564..f2dbe309d6e 100644 --- a/thorlcr/thorutil/thmem.cpp +++ b/thorlcr/thorutil/thmem.cpp @@ -246,7 +246,7 @@ class CSpillableStreamBase : public CSpillable VStringBuffer tempPrefix("streamspill_%d", activity.queryId()); GetTempFilePath(tempName, tempPrefix.str()); OwnedIFile iFile = createIFile(tempName.str()); - spillFile.setown(new CFileOwner(iFile.getLink(), activity.queryTempFileSizeTracker())); + spillFile.setown(new CFileOwner(iFile, activity.queryTempFileSizeTracker())); VStringBuffer spillPrefixStr("SpillableStream(%u)", spillPriority); rows.save(*iFile, spillCompInfo, false, spillPrefixStr.str()); // saves committed rows @@ -1660,7 +1660,7 @@ class CThorRowCollectorBase : public CSpillable Owned iFile = createIFile(tempName.str()); VStringBuffer spillPrefixStr("%sRowCollector(%d)", tracingPrefix.str(), spillPriority); spillableRows.save(*iFile, spillCompInfo, false, spillPrefixStr.str()); // saves committed rows - Owned tempFileOwner = new CFileOwner(iFile.getLink(), tempFileSizeTracker); + Owned tempFileOwner = new CFileOwner(iFile, tempFileSizeTracker); spillFiles.append(tempFileOwner.getLink()); ++overflowCount; statOverflowCount.fastAdd(1); // NB: this is total over multiple uses of this class diff --git a/thorlcr/thorutil/thormisc.cpp b/thorlcr/thorutil/thormisc.cpp index a97fc94c5f8..c6de269762e 100644 --- a/thorlcr/thorutil/thormisc.cpp +++ b/thorlcr/thorutil/thormisc.cpp @@ -73,7 +73,7 @@ Owned globals; static Owned ClusterMPAllocator; // stat. mappings shared between master and slave activities -const StatisticsMapping spillStatistics({StTimeSpillElapsed, StTimeSortElapsed, StNumSpills, StSizeSpillFile}); +const StatisticsMapping spillStatistics({StTimeSpillElapsed, StTimeSortElapsed, StNumSpills, StSizeSpillFile, StSizePeakTempDisk}); const StatisticsMapping soapcallStatistics({StTimeSoapcall}); const StatisticsMapping basicActivityStatistics({StTimeTotalExecute, StTimeLocalExecute, StTimeBlocked}); const StatisticsMapping groupActivityStatistics({StNumGroups, StNumGroupMax}, basicActivityStatistics); diff --git a/thorlcr/thorutil/thormisc.hpp b/thorlcr/thorutil/thormisc.hpp index 2bc8f0c776d..9faad9556ae 100644 --- a/thorlcr/thorutil/thormisc.hpp +++ b/thorlcr/thorutil/thormisc.hpp @@ -356,9 +356,14 @@ class graph_decl CFileOwner : public CSimpleInterface, implements IInterface } void noteSize(offset_t size) { - fileSize = size; - if (fileSizeTracker) - fileSizeTracker->growSize(fileSize); + if (fileSizeTracker && fileSize!=size) + { + if (size > fileSize) + fileSizeTracker->growSize(size-fileSize); + else + fileSizeTracker->shrinkSize(fileSize-size); + fileSize = size; + } } IFile &queryIFile() const { return *iFile; } };