From 5aaba8c39da4e827bbe84e2d960161ce0b8be60a Mon Sep 17 00:00:00 2001 From: Shamser Ahmed Date: Wed, 3 Jul 2024 12:04:04 +0100 Subject: [PATCH] HPCC-32193 Fix some issues with spill stats in smart join activity The current temp file statistics for smartjoin did not include all stats from all temp files: 1) temp files were closed before its sizes were recorded in the stats 2) stats from some types of temp files were not being tracked such as overflowWriteFile from RHS 3) stats from temp files that were closed in CSpillableStreamBase were not preserved 4) peak temp file size was not tracked in CThorSpillableRowArray 5) make CThorRowCollectorBase use of stats from CThorSpillableRowArray for more accurate and simpler temp stats tracking. Signed-off-by: Shamser Ahmed --- .../lookupjoin/thlookupjoinslave.cpp | 35 ++++++++++++-- thorlcr/thorutil/thmem.cpp | 47 ++++++++----------- thorlcr/thorutil/thmem.hpp | 8 +++- 3 files changed, 56 insertions(+), 34 deletions(-) diff --git a/thorlcr/activities/lookupjoin/thlookupjoinslave.cpp b/thorlcr/activities/lookupjoin/thlookupjoinslave.cpp index 0441534af72..2d99be825ec 100644 --- a/thorlcr/activities/lookupjoin/thlookupjoinslave.cpp +++ b/thorlcr/activities/lookupjoin/thlookupjoinslave.cpp @@ -1092,7 +1092,10 @@ class CInMemJoinBase : public CSlaveActivity, public CAllOrLookupHelper, { CThorSpillableRowArray *rows = rhsSlaveRows.item(a); if (rows) + { + mergeStats(inactiveStats, rows); rows->kill(); + } } rhs.kill(); } @@ -1808,6 +1811,7 @@ class CLookupJoinActivityBase : public CInMemJoinBase overflowWriteFile; Owned overflowWriteStream; + OwnedIFileIO overflowWriteFileIO; rowcount_t overflowWriteCount; OwnedMalloc channelDistributors; unsigned nextRhsToSpill = 0; @@ -2096,8 +2100,14 @@ class CLookupJoinActivityBase : public CInMemJoinBasenoteSize(overflowWriteFileIO->getStatistic(StSizeDiskWrite)); + overflowWriteFileIO.clear(); + } + overflowWriteFile.clear(); rightRowManager->addRowBuffer(this); } doBroadcastRHS(stopping); @@ -2156,6 +2166,7 @@ class CLookupJoinActivityBase : public CInMemJoinBaseinit(left, right, leftAllocator, rightAllocator, ::queryRowMetaData(leftITDL)); + mergeStats(PARENT::inactiveStats, rowLoader); } void getRHS(bool stopping) { @@ -2678,8 +2690,12 @@ class CLookupJoinActivityBase : public CInMemJoinBasequeryFromActivity()->queryContainer().queryHelper()->queryOutputMeta(); // rows may either be in separate slave row arrays or in single rhs array, or split. rowcount_t total = rightCollector ? rightCollector->numRows() : (getGlobalRHSTotal() + rhs.ordinality()); + if (rightCollector && rightCollector->hasSpilt()) + mergeStats(PARENT::inactiveStats, rightCollector); throw checkAndCreateOOMContextException(this, e, "gathering RHS rows for lookup join", total, inputOutputMeta, NULL); } + if (rightCollector && rightCollector->hasSpilt()) + mergeStats(PARENT::inactiveStats, rightCollector); } public: static bool needDedup(IHThorHashJoinArg *helper) @@ -2943,7 +2959,7 @@ class CLookupJoinActivityBase : public CInMemJoinBaseputRow(rhsInRowsTemp.getClear(r)); - overflowWriteFile->noteSize(overflowWriteStream->getStatistic(StSizeDiskWrite)); + overflowWriteFile->noteSize(overflowWriteFileIO->getStatistic(StSizeDiskWrite)); return true; } if (hasFailedOverToLocal()) @@ -3001,12 +3017,13 @@ class CLookupJoinActivityBase : public CInMemJoinBasecreateOwnedTempFile(tempFilename.str())); - overflowWriteStream.setown(createRowWriter(&(overflowWriteFile->queryIFile()), queryRowInterfaces(rightITDL), rwFlags)); + overflowWriteFileIO.setown(overflowWriteFile->queryIFile().open(IFOcreate)); + overflowWriteStream.setown(createRowWriter(overflowWriteFileIO, queryRowInterfaces(rightITDL), rwFlags)); overflowWriteCount += rhsInRowsTemp.ordinality(); ForEachItemIn(r, rhsInRowsTemp) overflowWriteStream->putRow(rhsInRowsTemp.getClear(r)); - overflowWriteFile->noteSize(overflowWriteStream->getStatistic(StSizeDiskWrite)); + overflowWriteFile->noteSize(overflowWriteFileIO->getStatistic(StSizeDiskWrite)); return true; } virtual void gatherActiveStats(CRuntimeStatisticCollection &activeStats) const @@ -3018,6 +3035,13 @@ class CLookupJoinActivityBase : public CInMemJoinBase ForEachItemIn(a, rhsSlaveRows) { CThorSpillableRowArray &rows = *rhsSlaveRows.item(a); + mergeStats(PARENT::inactiveStats, &rows); rhs.appendRows(rows, true); rows.kill(); // free up ptr table asap } diff --git a/thorlcr/thorutil/thmem.cpp b/thorlcr/thorutil/thmem.cpp index 6f2c7e4fcfd..9845fc6e35a 100644 --- a/thorlcr/thorutil/thmem.cpp +++ b/thorlcr/thorutil/thmem.cpp @@ -234,6 +234,7 @@ class CSpillableStreamBase : public CSpillable unsigned spillCompInfo; CThorSpillableRowArray rows; Owned spillFile; + CRuntimeStatisticCollection inactiveStats; bool spillRows() { @@ -248,13 +249,13 @@ class CSpillableStreamBase : public CSpillable spillFile.setown(activity.createOwnedTempFile(tempName.str())); VStringBuffer spillPrefixStr("SpillableStream(%u)", spillPriority); rows.save(*spillFile, spillCompInfo, false, spillPrefixStr.str()); // saves committed rows + mergeStats(inactiveStats, &rows); rows.kill(); // no longer needed, readers will pull from spillFile. NB: ok to kill array as rows is never written to or expanded - spillFile->noteSize(spillFile->queryIFile().size()); return true; } public: CSpillableStreamBase(CActivityBase &_activity, CThorSpillableRowArray &inRows, IThorRowInterfaces *_rowIf, EmptyRowSemantics _emptyRowSemantics, unsigned _spillPriority) - : CSpillable(_activity, _rowIf, _spillPriority), rows(_activity), emptyRowSemantics(_emptyRowSemantics) + : CSpillable(_activity, _rowIf, _spillPriority), rows(_activity), emptyRowSemantics(_emptyRowSemantics), inactiveStats(spillStatistics) { assertex(inRows.isFlushed()); spillCompInfo = 0x0; @@ -265,6 +266,10 @@ class CSpillableStreamBase : public CSpillable { ensureSpillingCallbackRemoved(); } + unsigned __int64 getStatistic(StatisticKind kind) const + { + return inactiveStats.getStatisticValue(kind); + } // IBufferedRowCallback virtual bool freeBufferedRows(bool critical) override { @@ -1328,13 +1333,13 @@ void CThorSpillableRowArray::safeUnregisterWriteCallback(IWritePosCallback &cb) } CThorSpillableRowArray::CThorSpillableRowArray(CActivityBase &activity) - : CThorExpandingRowArray(activity) + : CThorExpandingRowArray(activity), inactiveStats(spillStatistics) { throwOnOom = false; } CThorSpillableRowArray::CThorSpillableRowArray(CActivityBase &activity, IThorRowInterfaces *rowIf, EmptyRowSemantics emptyRowSemantics, StableSortFlag stableSort, rowidx_t initialSize, size32_t _commitDelta) - : CThorExpandingRowArray(activity, rowIf, ers_forbidden, stableSort, false, initialSize), commitDelta(_commitDelta) + : CThorExpandingRowArray(activity, rowIf, ers_forbidden, stableSort, false, initialSize), commitDelta(_commitDelta), inactiveStats(spillStatistics) { } @@ -1363,6 +1368,7 @@ void CThorSpillableRowArray::kill() { clearRows(); CThorExpandingRowArray::kill(); + inactiveStats.reset(); } void CThorSpillableRowArray::sort(ICompare &compare, unsigned maxCores) @@ -1413,7 +1419,8 @@ rowidx_t CThorSpillableRowArray::save(CFileOwner &iFileOwner, unsigned _spillCom nextCB = &cbCopy.popGet(); nextCBI = nextCB->queryRecordNumber(); } - Owned writer = createRowWriter(&iFileOwner.queryIFile(), rowIf, rwFlags, nullptr, compBlkSz); + OwnedIFileIO iFileIO = iFileOwner.queryIFile().open(IFOcreate); + Owned writer = createRowWriter(iFileIO, rowIf, rwFlags, nullptr, compBlkSz); rowidx_t i=0; rowidx_t rowsWritten=0; try @@ -1452,7 +1459,6 @@ rowidx_t CThorSpillableRowArray::save(CFileOwner &iFileOwner, unsigned _spillCom ++i; } writer->flush(NULL); - iFileOwner.noteSize(writer->getStatistic(StSizeDiskWrite)); } catch (IException *e) { @@ -1463,6 +1469,10 @@ rowidx_t CThorSpillableRowArray::save(CFileOwner &iFileOwner, unsigned _spillCom firstRow += n; offset_t bytesWritten = writer->getPosition(); writer.clear(); + mergeRemappedStats(inactiveStats, iFileIO, diskToTempStatsMap); + offset_t sizeTempFile = iFileIO->getStatistic(StSizeDiskWrite); + iFileOwner.noteSize(sizeTempFile); + inactiveStats.addStatistic(StNumSpills, 1); ActPrintLog(&activity, "%s: CThorSpillableRowArray::save done, rows written = %" RIPF "u, bytes = %" I64F "u, firstRow = %u", _tracingPrefix, rowsWritten, (__int64)bytesWritten, firstRow); return rowsWritten; } @@ -1638,11 +1648,8 @@ class CThorRowCollectorBase : public CSpillable Owned spillableRowSet; unsigned options = 0; unsigned spillCompInfo = 0; - RelaxedAtomic statOverflowCount{0}; RelaxedAtomic statSizeSpill{0}; - RelaxedAtomic<__uint64> statSpillCycles{0}; RelaxedAtomic<__uint64> statSortCycles{0}; - bool spillRows(bool critical) { //This must only be called while a lock is held on spillableRows @@ -1668,11 +1675,6 @@ class CThorRowCollectorBase : public CSpillable spillableRows.save(*tempFileOwner, spillCompInfo, false, spillPrefixStr.str()); // saves committed rows spillFiles.append(tempFileOwner.getLink()); ++overflowCount; - statOverflowCount.fastAdd(1); // NB: this is total over multiple uses of this class - offset_t tempFileSize = tempFileOwner->queryIFile().size(); - statSizeSpill.fastAdd(tempFileSize); - tempFileOwner->noteSize(tempFileSize); - statSpillCycles.fastAdd(spillTimer.elapsedCycles()); return true; } void setEmptyRowSemantics(EmptyRowSemantics _emptyRowSemantics) @@ -1960,26 +1962,17 @@ class CThorRowCollectorBase : public CSpillable { options = _options; } - unsigned __int64 getStatistic(StatisticKind kind) + unsigned __int64 getStatistic(StatisticKind kind) const { switch (kind) { - case StCycleSpillElapsedCycles: - return statSpillCycles; case StCycleSortElapsedCycles: return statSortCycles; - case StTimeSpillElapsed: - return cycle_to_nanosec(statSpillCycles); case StTimeSortElapsed: return cycle_to_nanosec(statSortCycles); - case StNumSpills: - return statOverflowCount; - case StSizeSpillFile: - return statSizeSpill; default: - break; + return spillableRows.getStatistic(kind); } - return 0; } bool hasSpilt() const { return overflowCount >= 1; } @@ -2048,7 +2041,7 @@ class CThorRowLoader : public CThorRowCollectorBase, implements IThorRowLoader } virtual void resize(rowidx_t max) override { CThorRowCollectorBase::resize(max); } virtual void setOptions(unsigned options) override { CThorRowCollectorBase::setOptions(options); } - virtual unsigned __int64 getStatistic(StatisticKind kind) override { return CThorRowCollectorBase::getStatistic(kind); } + virtual unsigned __int64 getStatistic(StatisticKind kind) const override { return CThorRowCollectorBase::getStatistic(kind); } virtual bool hasSpilt() const override { return CThorRowCollectorBase::hasSpilt(); } virtual void setTracingPrefix(const char *tracing) override { CThorRowCollectorBase::setTracingPrefix(tracing); } virtual void reset() override { CThorRowCollectorBase::reset(); } @@ -2103,7 +2096,7 @@ class CThorRowCollector : public CThorRowCollectorBase, implements IThorRowColle } virtual void resize(rowidx_t max) override { CThorRowCollectorBase::resize(max); } virtual void setOptions(unsigned options) override { CThorRowCollectorBase::setOptions(options); } - virtual unsigned __int64 getStatistic(StatisticKind kind) override { return CThorRowCollectorBase::getStatistic(kind); } + virtual unsigned __int64 getStatistic(StatisticKind kind) const override { return CThorRowCollectorBase::getStatistic(kind); } virtual bool hasSpilt() const override { return CThorRowCollectorBase::hasSpilt(); } virtual void setTracingPrefix(const char *tracing) override { CThorRowCollectorBase::setTracingPrefix(tracing); } // IThorArrayLock diff --git a/thorlcr/thorutil/thmem.hpp b/thorlcr/thorutil/thmem.hpp index 8e4f1b896a8..bdcda28efbf 100644 --- a/thorlcr/thorutil/thmem.hpp +++ b/thorlcr/thorutil/thmem.hpp @@ -413,7 +413,7 @@ class graph_decl CThorSpillableRowArray : private CThorExpandingRowArray, implem mutable CriticalSection cs; ICopyArrayOf writeCallbacks; size32_t compBlkSz = 0; // means use default - + CRuntimeStatisticCollection inactiveStats; // reset after each kill bool _flush(bool force); void doFlush(); inline bool needToMoveRows(bool force) { return (firstRow != 0 && (force || (firstRow >= commitRows/2))); } @@ -484,6 +484,10 @@ class graph_decl CThorSpillableRowArray : private CThorExpandingRowArray, implem inline rowidx_t numCommitted() const { return commitRows - firstRow; } //MORE::Not convinced this is very safe! inline rowidx_t queryTotalRows() const { return CThorExpandingRowArray::ordinality(); } // includes uncommited rows + inline unsigned __int64 getStatistic(StatisticKind kind) const + { + return inactiveStats.getStatisticValue(kind); + } // access to void swap(CThorSpillableRowArray &src); @@ -542,7 +546,7 @@ interface IThorRowCollectorCommon : extends IInterface, extends IThorArrayLock virtual void setup(ICompare *iCompare, StableSortFlag stableSort=stableSort_none, RowCollectorSpillFlags diskMemMix=rc_mixed, unsigned spillPriority=50) = 0; virtual void resize(rowidx_t max) = 0; virtual void setOptions(unsigned options) = 0; - virtual unsigned __int64 getStatistic(StatisticKind kind) = 0; + virtual unsigned __int64 getStatistic(StatisticKind kind) const = 0; virtual bool hasSpilt() const = 0; // equivalent to numOverlows() >= 1 virtual void setTracingPrefix(const char *tracing) = 0; virtual void reset() = 0;