Skip to content

Commit

Permalink
HPCC-31648 New StSizePeakEphemeralStorage and StSizePeakSubgraphTemp …
Browse files Browse the repository at this point in the history
…for sort

Signed-off-by: Shamser Ahmed <[email protected]>
  • Loading branch information
shamser committed Apr 25, 2024
1 parent 1a12426 commit 59a2343
Showing 1 changed file with 15 additions and 11 deletions.
26 changes: 15 additions & 11 deletions thorlcr/thorutil/thmem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ class CSpillableStreamBase : public CSpillable
EmptyRowSemantics emptyRowSemantics;
unsigned spillCompInfo;
CThorSpillableRowArray rows;
OwnedIFile spillFile;
Owned<CFileOwner> spillFile;

bool spillRows()
{
Expand All @@ -245,11 +245,13 @@ class CSpillableStreamBase : public CSpillable
StringBuffer tempName;
VStringBuffer tempPrefix("streamspill_%d", activity.queryId());
GetTempFilePath(tempName, tempPrefix.str());
spillFile.setown(createIFile(tempName.str()));
OwnedIFile iFile = createIFile(tempName.str());
spillFile.setown(new CFileOwner(iFile.getLink(), activity.queryTempFileSizeTracker()));

VStringBuffer spillPrefixStr("SpillableStream(%u)", spillPriority);
rows.save(*spillFile, spillCompInfo, false, spillPrefixStr.str()); // saves committed rows
rows.save(*iFile, spillCompInfo, false, spillPrefixStr.str()); // saves committed rows
rows.kill(); // no longer needed, readers will pull from spillFile. NB: ok to kill array as rows is never written to or expanded
spillFile->noteSpill(iFile->size());
return true;
}
public:
Expand All @@ -264,8 +266,6 @@ class CSpillableStreamBase : public CSpillable
~CSpillableStreamBase()
{
ensureSpillingCallbackRemoved();
if (spillFile)
spillFile->remove();
}
// IBufferedRowCallback
virtual bool freeBufferedRows(bool critical) override
Expand Down Expand Up @@ -338,7 +338,7 @@ class CSharedSpillableRowSet : public CSpillableStreamBase
block.clearCB = true;
assertex(((offset_t)-1) != outputOffset);
unsigned rwFlags = DEFAULT_RWFLAGS | mapESRToRWFlags(owner->emptyRowSemantics);
spillStream.setown(::createRowStreamEx(owner->spillFile, owner->rowIf, outputOffset, (offset_t)-1, (unsigned __int64)-1, rwFlags));
spillStream.setown(::createRowStreamEx(&(owner->spillFile->queryIFile()), owner->rowIf, outputOffset, (offset_t)-1, (unsigned __int64)-1, rwFlags));
owner->rows.unregisterWriteCallback(*this); // no longer needed
ret = spillStream->nextRow();
}
Expand Down Expand Up @@ -389,7 +389,7 @@ class CSharedSpillableRowSet : public CSpillableStreamBase
{
block.clearCB = true;
unsigned rwFlags = DEFAULT_RWFLAGS | mapESRToRWFlags(emptyRowSemantics);
return ::createRowStream(spillFile, rowIf, rwFlags);
return ::createRowStream(&spillFile->queryIFile(), rowIf, rwFlags);
}
rowidx_t toRead = rows.numCommitted();
if (toRead)
Expand Down Expand Up @@ -450,7 +450,7 @@ class CSpillableStream : public CSpillableStreamBase, implements IRowStream
rwFlags |= spillCompInfo;
}
rwFlags |= mapESRToRWFlags(emptyRowSemantics);
spillStream.setown(createRowStream(spillFile, rowIf, rwFlags));
spillStream.setown(createRowStream(&spillFile->queryIFile(), rowIf, rwFlags));
ReleaseThorRow(readRows);
readRows = nullptr;
return spillStream->nextRow();
Expand Down Expand Up @@ -1619,6 +1619,7 @@ class CThorRowCollectorBase : public CSpillable
protected:
CThorSpillableRowArray spillableRows;
IPointerArrayOf<CFileOwner> spillFiles;
Linked<CFileSizeTracker> tempFileSizeTracker;
Owned<IOutputRowSerializer> serializer;
RowCollectorSpillFlags diskMemMix;
rowcount_t totalRows = 0;
Expand Down Expand Up @@ -1659,10 +1660,13 @@ class CThorRowCollectorBase : public CSpillable
Owned<IFile> iFile = createIFile(tempName.str());
VStringBuffer spillPrefixStr("%sRowCollector(%d)", tracingPrefix.str(), spillPriority);
spillableRows.save(*iFile, spillCompInfo, false, spillPrefixStr.str()); // saves committed rows
spillFiles.append(new CFileOwner(iFile.getLink()));
Owned<CFileOwner> tempFileOwner = new CFileOwner(iFile.getLink(), tempFileSizeTracker);
spillFiles.append(tempFileOwner.getLink());
++overflowCount;
statOverflowCount.fastAdd(1); // NB: this is total over multiple uses of this class
statSizeSpill.fastAdd(iFile->size());
offset_t tempFileSize = iFile->size();
statSizeSpill.fastAdd(tempFileSize);
tempFileOwner->noteSpill(tempFileSize);
statSpillCycles.fastAdd(spillTimer.elapsedCycles());
return true;
}
Expand Down Expand Up @@ -1837,7 +1841,7 @@ class CThorRowCollectorBase : public CSpillable
CThorRowCollectorBase(CActivityBase &_activity, IThorRowInterfaces *_rowIf, ICompare *_iCompare, StableSortFlag _stableSort, RowCollectorSpillFlags _diskMemMix, unsigned _spillPriority)
: CSpillable(_activity, _rowIf, _spillPriority),
iCompare(_iCompare), stableSort(_stableSort), diskMemMix(_diskMemMix),
spillableRows(_activity)
spillableRows(_activity), tempFileSizeTracker(_activity.queryTempFileSizeTracker())
{
if (rc_allMem == diskMemMix)
spillPriority = SPILL_PRIORITY_DISABLE; // all mem, implies no spilling
Expand Down

0 comments on commit 59a2343

Please sign in to comment.