From 2a673937051af464d849ff53f944026f50a6927d Mon Sep 17 00:00:00 2001 From: Shamser Ahmed Date: Wed, 3 Jul 2024 12:04:04 +0100 Subject: [PATCH] HPCC-32193 Fix some issues with spill stats in smart join activity Signed-off-by: Shamser Ahmed --- .../lookupjoin/thlookupjoinslave.cpp | 35 ++++++++++++-- thorlcr/thorutil/thmem.cpp | 47 ++++++++----------- thorlcr/thorutil/thmem.hpp | 8 +++- 3 files changed, 56 insertions(+), 34 deletions(-) diff --git a/thorlcr/activities/lookupjoin/thlookupjoinslave.cpp b/thorlcr/activities/lookupjoin/thlookupjoinslave.cpp index 3a0da40d7f3..40c0131bdac 100644 --- a/thorlcr/activities/lookupjoin/thlookupjoinslave.cpp +++ b/thorlcr/activities/lookupjoin/thlookupjoinslave.cpp @@ -1053,7 +1053,10 @@ class CInMemJoinBase : public CSlaveActivity, public CAllOrLookupHelper, { CThorSpillableRowArray *rows = rhsSlaveRows.item(a); if (rows) + { + mergeStats(inactiveStats, rows); rows->kill(); + } } rhs.kill(); } @@ -1769,6 +1772,7 @@ class CLookupJoinActivityBase : public CInMemJoinBase overflowWriteFile; Owned overflowWriteStream; + OwnedIFileIO overflowWriteFileIO; rowcount_t overflowWriteCount; OwnedMalloc channelDistributors; unsigned nextRhsToSpill = 0; @@ -2057,8 +2061,14 @@ class CLookupJoinActivityBase : public CInMemJoinBasenoteSize(overflowWriteFileIO->getStatistic(StSizeDiskWrite)); + overflowWriteFileIO.clear(); + } + overflowWriteFile.clear(); rightRowManager->addRowBuffer(this); } doBroadcastRHS(stopping); @@ -2117,6 +2127,7 @@ class CLookupJoinActivityBase : public CInMemJoinBaseinit(left, right, leftAllocator, rightAllocator, ::queryRowMetaData(leftITDL)); + mergeStats(PARENT::inactiveStats, rowLoader); } void getRHS(bool stopping) { @@ -2639,8 +2651,12 @@ class CLookupJoinActivityBase : public CInMemJoinBasequeryFromActivity()->queryContainer().queryHelper()->queryOutputMeta(); // rows may either be in separate slave row arrays or in single rhs array, or split. rowcount_t total = rightCollector ? rightCollector->numRows() : (getGlobalRHSTotal() + rhs.ordinality()); + if (rightCollector && rightCollector->hasSpilt()) + mergeStats(PARENT::inactiveStats, rightCollector); throw checkAndCreateOOMContextException(this, e, "gathering RHS rows for lookup join", total, inputOutputMeta, NULL); } + if (rightCollector && rightCollector->hasSpilt()) + mergeStats(PARENT::inactiveStats, rightCollector); } public: static bool needDedup(IHThorHashJoinArg *helper) @@ -2904,7 +2920,7 @@ class CLookupJoinActivityBase : public CInMemJoinBaseputRow(rhsInRowsTemp.getClear(r)); - overflowWriteFile->noteSize(overflowWriteStream->getStatistic(StSizeDiskWrite)); + overflowWriteFile->noteSize(overflowWriteFileIO->getStatistic(StSizeDiskWrite)); return true; } if (hasFailedOverToLocal()) @@ -2962,12 +2978,13 @@ class CLookupJoinActivityBase : public CInMemJoinBasecreateOwnedTempFile(tempFilename.str())); - overflowWriteStream.setown(createRowWriter(&(overflowWriteFile->queryIFile()), queryRowInterfaces(rightITDL), rwFlags)); + overflowWriteFileIO.setown(overflowWriteFile->queryIFile().open(IFOcreate)); + overflowWriteStream.setown(createRowWriter(overflowWriteFileIO, queryRowInterfaces(rightITDL), rwFlags)); overflowWriteCount += rhsInRowsTemp.ordinality(); ForEachItemIn(r, rhsInRowsTemp) overflowWriteStream->putRow(rhsInRowsTemp.getClear(r)); - overflowWriteFile->noteSize(overflowWriteStream->getStatistic(StSizeDiskWrite)); + overflowWriteFile->noteSize(overflowWriteFileIO->getStatistic(StSizeDiskWrite)); return true; } virtual void gatherActiveStats(CRuntimeStatisticCollection &activeStats) const @@ -2979,6 +2996,13 @@ class CLookupJoinActivityBase : public CInMemJoinBase ForEachItemIn(a, rhsSlaveRows) { CThorSpillableRowArray &rows = *rhsSlaveRows.item(a); + mergeStats(PARENT::inactiveStats, &rows); rhs.appendRows(rows, true); rows.kill(); // free up ptr table asap } diff --git a/thorlcr/thorutil/thmem.cpp b/thorlcr/thorutil/thmem.cpp index 299f1e3118a..ad51a7f3b6b 100644 --- a/thorlcr/thorutil/thmem.cpp +++ b/thorlcr/thorutil/thmem.cpp @@ -234,6 +234,7 @@ class CSpillableStreamBase : public CSpillable unsigned spillCompInfo; CThorSpillableRowArray rows; Owned spillFile; + CRuntimeStatisticCollection inactiveStats; bool spillRows() { @@ -248,13 +249,13 @@ class CSpillableStreamBase : public CSpillable spillFile.setown(activity.createOwnedTempFile(tempName.str())); VStringBuffer spillPrefixStr("SpillableStream(%u)", spillPriority); rows.save(*spillFile, spillCompInfo, false, spillPrefixStr.str()); // saves committed rows + mergeStats(inactiveStats, &rows); rows.kill(); // no longer needed, readers will pull from spillFile. NB: ok to kill array as rows is never written to or expanded - spillFile->noteSize(spillFile->queryIFile().size()); return true; } public: CSpillableStreamBase(CActivityBase &_activity, CThorSpillableRowArray &inRows, IThorRowInterfaces *_rowIf, EmptyRowSemantics _emptyRowSemantics, unsigned _spillPriority) - : CSpillable(_activity, _rowIf, _spillPriority), rows(_activity), emptyRowSemantics(_emptyRowSemantics) + : CSpillable(_activity, _rowIf, _spillPriority), rows(_activity), emptyRowSemantics(_emptyRowSemantics), inactiveStats(spillStatistics) { assertex(inRows.isFlushed()); spillCompInfo = 0x0; @@ -265,6 +266,10 @@ class CSpillableStreamBase : public CSpillable { ensureSpillingCallbackRemoved(); } + unsigned __int64 getStatistic(StatisticKind kind) const + { + return inactiveStats.getStatisticValue(kind); + } // IBufferedRowCallback virtual bool freeBufferedRows(bool critical) override { @@ -1317,13 +1322,13 @@ void CThorSpillableRowArray::safeUnregisterWriteCallback(IWritePosCallback &cb) } CThorSpillableRowArray::CThorSpillableRowArray(CActivityBase &activity) - : CThorExpandingRowArray(activity) + : CThorExpandingRowArray(activity), inactiveStats(spillStatistics) { throwOnOom = false; } CThorSpillableRowArray::CThorSpillableRowArray(CActivityBase &activity, IThorRowInterfaces *rowIf, EmptyRowSemantics emptyRowSemantics, StableSortFlag stableSort, rowidx_t initialSize, size32_t _commitDelta) - : CThorExpandingRowArray(activity, rowIf, ers_forbidden, stableSort, false, initialSize), commitDelta(_commitDelta) + : CThorExpandingRowArray(activity, rowIf, ers_forbidden, stableSort, false, initialSize), commitDelta(_commitDelta), inactiveStats(spillStatistics) { } @@ -1352,6 +1357,7 @@ void CThorSpillableRowArray::kill() { clearRows(); CThorExpandingRowArray::kill(); + inactiveStats.reset(); } void CThorSpillableRowArray::sort(ICompare &compare, unsigned maxCores) @@ -1405,7 +1411,8 @@ rowidx_t CThorSpillableRowArray::save(CFileOwner &iFileOwner, unsigned _spillCom nextCB = &cbCopy.popGet(); nextCBI = nextCB->queryRecordNumber(); } - Owned writer = createRowWriter(&iFileOwner.queryIFile(), rowIf, rwFlags, nullptr, compBlkSz); + OwnedIFileIO iFileIO = iFileOwner.queryIFile().open(IFOcreate); + Owned writer = createRowWriter(iFileIO, rowIf, rwFlags, nullptr, compBlkSz); rowidx_t i=0; rowidx_t rowsWritten=0; try @@ -1444,7 +1451,6 @@ rowidx_t CThorSpillableRowArray::save(CFileOwner &iFileOwner, unsigned _spillCom ++i; } writer->flush(NULL); - iFileOwner.noteSize(writer->getStatistic(StSizeDiskWrite)); } catch (IException *e) { @@ -1455,6 +1461,10 @@ rowidx_t CThorSpillableRowArray::save(CFileOwner &iFileOwner, unsigned _spillCom firstRow += n; offset_t bytesWritten = writer->getPosition(); writer.clear(); + mergeRemappedStats(inactiveStats, iFileIO, diskToSpillStatsMap); + offset_t sizeTempFile = iFileIO->getStatistic(StSizeDiskWrite); + iFileOwner.noteSize(sizeTempFile); + inactiveStats.addStatistic(StNumSpills, 1); ActPrintLog(&activity, "%s: CThorSpillableRowArray::save done, rows written = %" RIPF "u, bytes = %" I64F "u, firstRow = %u", _tracingPrefix, rowsWritten, (__int64)bytesWritten, firstRow); return rowsWritten; } @@ -1630,11 +1640,8 @@ class CThorRowCollectorBase : public CSpillable Owned spillableRowSet; unsigned options = 0; unsigned spillCompInfo = 0; - RelaxedAtomic statOverflowCount{0}; RelaxedAtomic statSizeSpill{0}; - RelaxedAtomic<__uint64> statSpillCycles{0}; RelaxedAtomic<__uint64> statSortCycles{0}; - bool spillRows(bool critical) { //This must only be called while a lock is held on spillableRows @@ -1660,11 +1667,6 @@ class CThorRowCollectorBase : public CSpillable spillableRows.save(*tempFileOwner, spillCompInfo, false, spillPrefixStr.str()); // saves committed rows spillFiles.append(tempFileOwner.getLink()); ++overflowCount; - statOverflowCount.fastAdd(1); // NB: this is total over multiple uses of this class - offset_t tempFileSize = tempFileOwner->queryIFile().size(); - statSizeSpill.fastAdd(tempFileSize); - tempFileOwner->noteSize(tempFileSize); - statSpillCycles.fastAdd(spillTimer.elapsedCycles()); return true; } void setEmptyRowSemantics(EmptyRowSemantics _emptyRowSemantics) @@ -1952,26 +1954,17 @@ class CThorRowCollectorBase : public CSpillable { options = _options; } - unsigned __int64 getStatistic(StatisticKind kind) + unsigned __int64 getStatistic(StatisticKind kind) const { switch (kind) { - case StCycleSpillElapsedCycles: - return statSpillCycles; case StCycleSortElapsedCycles: return statSortCycles; - case StTimeSpillElapsed: - return cycle_to_nanosec(statSpillCycles); case StTimeSortElapsed: return cycle_to_nanosec(statSortCycles); - case StNumSpills: - return statOverflowCount; - case StSizeSpillFile: - return statSizeSpill; default: - break; + return spillableRows.getStatistic(kind); } - return 0; } bool hasSpilt() const { return overflowCount >= 1; } @@ -2040,7 +2033,7 @@ class CThorRowLoader : public CThorRowCollectorBase, implements IThorRowLoader } virtual void resize(rowidx_t max) override { CThorRowCollectorBase::resize(max); } virtual void setOptions(unsigned options) override { CThorRowCollectorBase::setOptions(options); } - virtual unsigned __int64 getStatistic(StatisticKind kind) override { return CThorRowCollectorBase::getStatistic(kind); } + virtual unsigned __int64 getStatistic(StatisticKind kind) const override { return CThorRowCollectorBase::getStatistic(kind); } virtual bool hasSpilt() const override { return CThorRowCollectorBase::hasSpilt(); } virtual void setTracingPrefix(const char *tracing) override { CThorRowCollectorBase::setTracingPrefix(tracing); } virtual void reset() override { CThorRowCollectorBase::reset(); } @@ -2095,7 +2088,7 @@ class CThorRowCollector : public CThorRowCollectorBase, implements IThorRowColle } virtual void resize(rowidx_t max) override { CThorRowCollectorBase::resize(max); } virtual void setOptions(unsigned options) override { CThorRowCollectorBase::setOptions(options); } - virtual unsigned __int64 getStatistic(StatisticKind kind) override { return CThorRowCollectorBase::getStatistic(kind); } + virtual unsigned __int64 getStatistic(StatisticKind kind) const override { return CThorRowCollectorBase::getStatistic(kind); } virtual bool hasSpilt() const override { return CThorRowCollectorBase::hasSpilt(); } virtual void setTracingPrefix(const char *tracing) override { CThorRowCollectorBase::setTracingPrefix(tracing); } // IThorArrayLock diff --git a/thorlcr/thorutil/thmem.hpp b/thorlcr/thorutil/thmem.hpp index 8e4f1b896a8..bdcda28efbf 100644 --- a/thorlcr/thorutil/thmem.hpp +++ b/thorlcr/thorutil/thmem.hpp @@ -413,7 +413,7 @@ class graph_decl CThorSpillableRowArray : private CThorExpandingRowArray, implem mutable CriticalSection cs; ICopyArrayOf writeCallbacks; size32_t compBlkSz = 0; // means use default - + CRuntimeStatisticCollection inactiveStats; // reset after each kill bool _flush(bool force); void doFlush(); inline bool needToMoveRows(bool force) { return (firstRow != 0 && (force || (firstRow >= commitRows/2))); } @@ -484,6 +484,10 @@ class graph_decl CThorSpillableRowArray : private CThorExpandingRowArray, implem inline rowidx_t numCommitted() const { return commitRows - firstRow; } //MORE::Not convinced this is very safe! inline rowidx_t queryTotalRows() const { return CThorExpandingRowArray::ordinality(); } // includes uncommited rows + inline unsigned __int64 getStatistic(StatisticKind kind) const + { + return inactiveStats.getStatisticValue(kind); + } // access to void swap(CThorSpillableRowArray &src); @@ -542,7 +546,7 @@ interface IThorRowCollectorCommon : extends IInterface, extends IThorArrayLock virtual void setup(ICompare *iCompare, StableSortFlag stableSort=stableSort_none, RowCollectorSpillFlags diskMemMix=rc_mixed, unsigned spillPriority=50) = 0; virtual void resize(rowidx_t max) = 0; virtual void setOptions(unsigned options) = 0; - virtual unsigned __int64 getStatistic(StatisticKind kind) = 0; + virtual unsigned __int64 getStatistic(StatisticKind kind) const = 0; virtual bool hasSpilt() const = 0; // equivalent to numOverlows() >= 1 virtual void setTracingPrefix(const char *tracing) = 0; virtual void reset() = 0;