From 59a2343fa1e87a5490f155c99b1f9147e2b01f72 Mon Sep 17 00:00:00 2001 From: Shamser Ahmed Date: Tue, 23 Apr 2024 14:51:33 +0100 Subject: [PATCH] HPCC-31648 New StSizePeakEphemeralStorage and StSizePeakSubgraphTemp for sort Signed-off-by: Shamser Ahmed --- thorlcr/thorutil/thmem.cpp | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/thorlcr/thorutil/thmem.cpp b/thorlcr/thorutil/thmem.cpp index 65d11882106..1e625ccc918 100644 --- a/thorlcr/thorutil/thmem.cpp +++ b/thorlcr/thorutil/thmem.cpp @@ -233,7 +233,7 @@ class CSpillableStreamBase : public CSpillable EmptyRowSemantics emptyRowSemantics; unsigned spillCompInfo; CThorSpillableRowArray rows; - OwnedIFile spillFile; + Owned spillFile; bool spillRows() { @@ -245,11 +245,13 @@ class CSpillableStreamBase : public CSpillable StringBuffer tempName; VStringBuffer tempPrefix("streamspill_%d", activity.queryId()); GetTempFilePath(tempName, tempPrefix.str()); - spillFile.setown(createIFile(tempName.str())); + OwnedIFile iFile = createIFile(tempName.str()); + spillFile.setown(new CFileOwner(iFile.getLink(), activity.queryTempFileSizeTracker())); VStringBuffer spillPrefixStr("SpillableStream(%u)", spillPriority); - rows.save(*spillFile, spillCompInfo, false, spillPrefixStr.str()); // saves committed rows + rows.save(*iFile, spillCompInfo, false, spillPrefixStr.str()); // saves committed rows rows.kill(); // no longer needed, readers will pull from spillFile. NB: ok to kill array as rows is never written to or expanded + spillFile->noteSpill(iFile->size()); return true; } public: @@ -264,8 +266,6 @@ class CSpillableStreamBase : public CSpillable ~CSpillableStreamBase() { ensureSpillingCallbackRemoved(); - if (spillFile) - spillFile->remove(); } // IBufferedRowCallback virtual bool freeBufferedRows(bool critical) override @@ -338,7 +338,7 @@ class CSharedSpillableRowSet : public CSpillableStreamBase block.clearCB = true; assertex(((offset_t)-1) != outputOffset); unsigned rwFlags = DEFAULT_RWFLAGS | mapESRToRWFlags(owner->emptyRowSemantics); - spillStream.setown(::createRowStreamEx(owner->spillFile, owner->rowIf, outputOffset, (offset_t)-1, (unsigned __int64)-1, rwFlags)); + spillStream.setown(::createRowStreamEx(&(owner->spillFile->queryIFile()), owner->rowIf, outputOffset, (offset_t)-1, (unsigned __int64)-1, rwFlags)); owner->rows.unregisterWriteCallback(*this); // no longer needed ret = spillStream->nextRow(); } @@ -389,7 +389,7 @@ class CSharedSpillableRowSet : public CSpillableStreamBase { block.clearCB = true; unsigned rwFlags = DEFAULT_RWFLAGS | mapESRToRWFlags(emptyRowSemantics); - return ::createRowStream(spillFile, rowIf, rwFlags); + return ::createRowStream(&spillFile->queryIFile(), rowIf, rwFlags); } rowidx_t toRead = rows.numCommitted(); if (toRead) @@ -450,7 +450,7 @@ class CSpillableStream : public CSpillableStreamBase, implements IRowStream rwFlags |= spillCompInfo; } rwFlags |= mapESRToRWFlags(emptyRowSemantics); - spillStream.setown(createRowStream(spillFile, rowIf, rwFlags)); + spillStream.setown(createRowStream(&spillFile->queryIFile(), rowIf, rwFlags)); ReleaseThorRow(readRows); readRows = nullptr; return spillStream->nextRow(); @@ -1619,6 +1619,7 @@ class CThorRowCollectorBase : public CSpillable protected: CThorSpillableRowArray spillableRows; IPointerArrayOf spillFiles; + Linked tempFileSizeTracker; Owned serializer; RowCollectorSpillFlags diskMemMix; rowcount_t totalRows = 0; @@ -1659,10 +1660,13 @@ class CThorRowCollectorBase : public CSpillable Owned iFile = createIFile(tempName.str()); VStringBuffer spillPrefixStr("%sRowCollector(%d)", tracingPrefix.str(), spillPriority); spillableRows.save(*iFile, spillCompInfo, false, spillPrefixStr.str()); // saves committed rows - spillFiles.append(new CFileOwner(iFile.getLink())); + Owned tempFileOwner = new CFileOwner(iFile.getLink(), tempFileSizeTracker); + spillFiles.append(tempFileOwner.getLink()); ++overflowCount; statOverflowCount.fastAdd(1); // NB: this is total over multiple uses of this class - statSizeSpill.fastAdd(iFile->size()); + offset_t tempFileSize = iFile->size(); + statSizeSpill.fastAdd(tempFileSize); + tempFileOwner->noteSpill(tempFileSize); statSpillCycles.fastAdd(spillTimer.elapsedCycles()); return true; } @@ -1837,7 +1841,7 @@ class CThorRowCollectorBase : public CSpillable CThorRowCollectorBase(CActivityBase &_activity, IThorRowInterfaces *_rowIf, ICompare *_iCompare, StableSortFlag _stableSort, RowCollectorSpillFlags _diskMemMix, unsigned _spillPriority) : CSpillable(_activity, _rowIf, _spillPriority), iCompare(_iCompare), stableSort(_stableSort), diskMemMix(_diskMemMix), - spillableRows(_activity) + spillableRows(_activity), tempFileSizeTracker(_activity.queryTempFileSizeTracker()) { if (rc_allMem == diskMemMix) spillPriority = SPILL_PRIORITY_DISABLE; // all mem, implies no spilling