Skip to content

Commit

Permalink
HPCC-31648 Changes following review
Browse files Browse the repository at this point in the history
Signed-off-by: Shamser Ahmed <[email protected]>
  • Loading branch information
shamser committed Jun 11, 2024
1 parent 687e468 commit 4949830
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 21 deletions.
30 changes: 15 additions & 15 deletions thorlcr/msort/tsorts.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class CWriteIntercept : public CSimpleInterface
CActivityBase &activity;
CriticalSection crit;
IThorRowInterfaces *rowIf;
Owned<IFile> dataFile, idxFile;
Owned<CFileOwner> dataFile, idxFile;
Owned<IFileIO> dataFileIO, idxFileIO;
Owned<ISerialStream> dataFileStream;
Linked<IFileIOStream> idxFileStream;
Expand All @@ -91,6 +91,7 @@ class CWriteIntercept : public CSimpleInterface
if (idxFileStream.get())
{
idxFileStream->write(sizeof(o),&o);
idxFile->noteSize(idxFileIO->getStatistic(StSizeDiskWrite));
return;
}
if (lastofs)
Expand All @@ -100,12 +101,13 @@ class CWriteIntercept : public CSimpleInterface
// right create idx
StringBuffer tempname;
GetTempFilePath(tempname.clear(),"srtidx");
idxFile.setown(createIFile(tempname.str()));
idxFileIO.setown(idxFile->open(IFOcreaterw));
Owned<IFile> tmpFile = createIFile(tempname.str());
idxFile.setown(new CFileOwner(tmpFile, activity.queryTempFileSizeTracker()));
idxFileIO.setown(tmpFile->open(IFOcreaterw));
if (!idxFileIO.get())
{
StringBuffer err;
err.append("Cannot create ").append(idxFile->queryFilename());
err.append("Cannot create ").append(tmpFile->queryFilename());
LOG(MCerror, "%s", err.str());
throw MakeActivityException(&activity, -1, "%s", err.str());
}
Expand Down Expand Up @@ -141,7 +143,7 @@ class CWriteIntercept : public CSimpleInterface
if (!idxFileIO.get())
{
assertex(idxFile);
idxFileIO.setown(idxFile->open(IFOread));
idxFileIO.setown(idxFile->queryIFile().open(IFOread));
}
size32_t rd = idxFileIO->read((offset_t)pos*(offset_t)sizeof(offset_t),sizeof(*ofs)*n,ofs);
if (closeIO)
Expand All @@ -161,12 +163,12 @@ class CWriteIntercept : public CSimpleInterface
{
if (parent->compressedOverflowFile)
{
Owned<ICompressedFileIO> iFileIO = createCompressedFileReader(parent->dataFile);
Owned<ICompressedFileIO> iFileIO = createCompressedFileReader(&(parent->dataFile->queryIFile()));
assertex(iFileIO);
stream.setown(createRowStreamEx(iFileIO, parent->rowIf, startOffset, (offset_t)-1, max));
}
else
stream.setown(createRowStreamEx(parent->dataFile, parent->rowIf, startOffset, (offset_t)-1, max));
stream.setown(createRowStreamEx(&(parent->dataFile->queryIFile()), parent->rowIf, startOffset, (offset_t)-1, max));
}
virtual const void *nextRow() { return stream->nextRow(); }
virtual void stop() { stream->stop(); }
Expand All @@ -187,16 +189,13 @@ class CWriteIntercept : public CSimpleInterface
~CWriteIntercept()
{
closeFiles();
if (dataFile)
dataFile->remove();
if (idxFile)
idxFile->remove();
}
offset_t write(IRowStream *input)
{
StringBuffer tempname;
GetTempFilePath(tempname,"srtmrg");
dataFile.setown(createIFile(tempname.str()));
Owned<IFile> tmpFile = createIFile(tempname.str());
dataFile.setown(new CFileOwner(tmpFile, activity.queryTempFileSizeTracker()));

unsigned rwFlags = DEFAULT_RWFLAGS;
size32_t compBlkSz = 0;
Expand Down Expand Up @@ -227,7 +226,7 @@ class CWriteIntercept : public CSimpleInterface
}
}

Owned<IExtRowWriter> output = createRowWriter(dataFile, rowIf, rwFlags, nullptr, compBlkSz);
Owned<IExtRowWriter> output = createRowWriter(&(dataFile->queryIFile()), rowIf, rwFlags, nullptr, compBlkSz);

bool overflowed = false;
ActPrintLog(&activity, "Local Overflow Merge start");
Expand Down Expand Up @@ -262,6 +261,7 @@ class CWriteIntercept : public CSimpleInterface
output->flush();
offset_t end = output->getPosition();
output.clear();
dataFile->noteSize(dataFileIO->getStatistic(StSizeDiskWrite));
writeidxofs(end);
if (idxFileIO)
{
Expand All @@ -271,7 +271,7 @@ class CWriteIntercept : public CSimpleInterface
}
if (overflowed)
IWARNLOG("Overflowed by %" I64F "d", overflowsize);
ActPrintLog(&activity, "Local Overflow Merge done: overflow file '%s', size = %" I64F "d", dataFile->queryFilename(), dataFile->size());
ActPrintLog(&activity, "Local Overflow Merge done: overflow file '%s', size = %" I64F "d", dataFile->queryIFile().queryFilename(), dataFile->queryIFile().size());
return end;
}
IRowStream *getStream(offset_t startOffset, rowcount_t max)
Expand Down Expand Up @@ -299,7 +299,7 @@ class CWriteIntercept : public CSimpleInterface
size32_t idxSz = (size32_t)(ofs[1]-ofs[0]);
if (!dataFileIO)
{
dataFileIO.setown(dataFile->open(IFOread));
dataFileIO.setown(dataFile->queryIFile().open(IFOread));
if (compressedOverflowFile)
{
dataFileIO.setown(createCompressedFileReader(dataFileIO));
Expand Down
4 changes: 2 additions & 2 deletions thorlcr/thorutil/thmem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ class CSpillableStreamBase : public CSpillable
VStringBuffer tempPrefix("streamspill_%d", activity.queryId());
GetTempFilePath(tempName, tempPrefix.str());
OwnedIFile iFile = createIFile(tempName.str());
spillFile.setown(new CFileOwner(iFile.getLink(), activity.queryTempFileSizeTracker()));
spillFile.setown(new CFileOwner(iFile, activity.queryTempFileSizeTracker()));

VStringBuffer spillPrefixStr("SpillableStream(%u)", spillPriority);
rows.save(*iFile, spillCompInfo, false, spillPrefixStr.str()); // saves committed rows
Expand Down Expand Up @@ -1660,7 +1660,7 @@ class CThorRowCollectorBase : public CSpillable
Owned<IFile> iFile = createIFile(tempName.str());
VStringBuffer spillPrefixStr("%sRowCollector(%d)", tracingPrefix.str(), spillPriority);
spillableRows.save(*iFile, spillCompInfo, false, spillPrefixStr.str()); // saves committed rows
Owned<CFileOwner> tempFileOwner = new CFileOwner(iFile.getLink(), tempFileSizeTracker);
Owned<CFileOwner> tempFileOwner = new CFileOwner(iFile, tempFileSizeTracker);
spillFiles.append(tempFileOwner.getLink());
++overflowCount;
statOverflowCount.fastAdd(1); // NB: this is total over multiple uses of this class
Expand Down
2 changes: 1 addition & 1 deletion thorlcr/thorutil/thormisc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ Owned<IPropertyTree> globals;
static Owned<IMPtagAllocator> ClusterMPAllocator;

// stat. mappings shared between master and slave activities
const StatisticsMapping spillStatistics({StTimeSpillElapsed, StTimeSortElapsed, StNumSpills, StSizeSpillFile});
const StatisticsMapping spillStatistics({StTimeSpillElapsed, StTimeSortElapsed, StNumSpills, StSizeSpillFile, StSizePeakTempDisk});
const StatisticsMapping soapcallStatistics({StTimeSoapcall});
const StatisticsMapping basicActivityStatistics({StTimeTotalExecute, StTimeLocalExecute, StTimeBlocked});
const StatisticsMapping groupActivityStatistics({StNumGroups, StNumGroupMax}, basicActivityStatistics);
Expand Down
11 changes: 8 additions & 3 deletions thorlcr/thorutil/thormisc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -356,9 +356,14 @@ class graph_decl CFileOwner : public CSimpleInterface, implements IInterface
}
void noteSize(offset_t size)
{
fileSize = size;
if (fileSizeTracker)
fileSizeTracker->growSize(fileSize);
if (fileSizeTracker && fileSize!=size)
{
if (size > fileSize)
fileSizeTracker->growSize(size-fileSize);
else
fileSizeTracker->shrinkSize(fileSize-size);
fileSize = size;
}
}
IFile &queryIFile() const { return *iFile; }
};
Expand Down

0 comments on commit 4949830

Please sign in to comment.