From 5b67f034e088ae5ca01b2ff643c9eeee6a84fc63 Mon Sep 17 00:00:00 2001 From: Shamser Ahmed Date: Wed, 3 Jul 2024 12:04:04 +0100 Subject: [PATCH] HPCC-32193 Fix some issues with spill stats in smart join activity The current temp file statistics for smartjoin did not include all stats from all temp files: 1) temp files were closed before its sizes were recorded in the stats 2) stats from some types of temp files were not being tracked such as overflowWriteFile from RHS 3) stats from temp files that were closed in CSpillableStreamBase were not preserved 4) peak temp file size was not tracked in CThorSpillableRowArray 5) make CThorRowCollectorBase use of stats from CThorSpillableRowArray for more accurate and simpler temp stats tracking. Signed-off-by: Shamser Ahmed HPCC-32193 Close overflowWriteFileIO when it's no longer needed Signed-off-by: Shamser Ahmed --- .../lookupjoin/thlookupjoinslave.cpp | 59 +++++++++++++++---- .../activities/nsplitter/thnsplitterslave.cpp | 2 +- thorlcr/msort/tsorts.cpp | 2 +- thorlcr/thorutil/thbuf.cpp | 14 ++--- thorlcr/thorutil/thmem.cpp | 48 +++++++-------- thorlcr/thorutil/thmem.hpp | 8 ++- thorlcr/thorutil/thormisc.cpp | 1 + thorlcr/thorutil/thormisc.hpp | 2 + 8 files changed, 86 insertions(+), 50 deletions(-) diff --git a/thorlcr/activities/lookupjoin/thlookupjoinslave.cpp b/thorlcr/activities/lookupjoin/thlookupjoinslave.cpp index 0441534af72..189e39c56e3 100644 --- a/thorlcr/activities/lookupjoin/thlookupjoinslave.cpp +++ b/thorlcr/activities/lookupjoin/thlookupjoinslave.cpp @@ -950,7 +950,7 @@ class CInMemJoinBase : public CSlaveActivity, public CAllOrLookupHelper, } } *rowProcessor; - CriticalSection rhsRowLock; + mutable CriticalSection rhsRowLock; Owned broadcaster; CBroadcaster *channel0Broadcaster; CriticalSection *broadcastLock; @@ -1092,7 +1092,10 @@ class CInMemJoinBase : public CSlaveActivity, public CAllOrLookupHelper, { CThorSpillableRowArray *rows = rhsSlaveRows.item(a); if (rows) + { + mergeRemappedStats(inactiveStats, rows, diskToTempStatsMap); rows->kill(); + } } rhs.kill(); } @@ -1808,6 +1811,8 @@ class CLookupJoinActivityBase : public CInMemJoinBase overflowWriteFile; Owned overflowWriteStream; + OwnedIFileIO overflowWriteFileIO; + mutable CriticalSection critOverflowWriteFileIO; rowcount_t overflowWriteCount; OwnedMalloc channelDistributors; unsigned nextRhsToSpill = 0; @@ -2096,7 +2101,6 @@ class CLookupJoinActivityBase : public CInMemJoinBaseaddRowBuffer(this); } @@ -2107,7 +2111,15 @@ class CLookupJoinActivityBase : public CInMemJoinBasenoteSize(overflowWriteFileIO->getStatistic(StSizeDiskWrite)); + overflowWriteFileIO.clear(); + } overflowWriteStream.clear(); // broadcast has finished, no more can be written + } } if (!hasFailedOverToLocal()) { @@ -2155,6 +2167,7 @@ class CLookupJoinActivityBase : public CInMemJoinBase rowLoader = createThorRowLoader(*this, queryRowInterfaces(leftITDL), helper->isLeftAlreadyLocallySorted() ? NULL : compareLeft); rowLoader->setTracingPrefix("Join left"); left.setown(rowLoader->load(left, abortSoon, false)); + mergeRemappedStats(PARENT::inactiveStats, rowLoader, diskToTempStatsMap); leftITDL = queryInput(0); // reset ActPrintLog("LHS loaded/sorted"); @@ -2550,6 +2564,7 @@ class CLookupJoinActivityBase : public CInMemJoinBase rightCollector; + Owned exception; try { CMarker marker(*this); @@ -2674,12 +2689,19 @@ class CLookupJoinActivityBase : public CInMemJoinBasequeryFromActivity()->queryContainer().queryHelper()->queryOutputMeta(); - // rows may either be in separate slave row arrays or in single rhs array, or split. - rowcount_t total = rightCollector ? rightCollector->numRows() : (getGlobalRHSTotal() + rhs.ordinality()); - throw checkAndCreateOOMContextException(this, e, "gathering RHS rows for lookup join", total, inputOutputMeta, NULL); + exception.setown(e); + else + { + IOutputMetaData *inputOutputMeta = rightITDL->queryFromActivity()->queryContainer().queryHelper()->queryOutputMeta(); + // rows may either be in separate slave row arrays or in single rhs array, or split. + rowcount_t total = rightCollector ? rightCollector->numRows() : (getGlobalRHSTotal() + rhs.ordinality()); + exception.setown(checkAndCreateOOMContextException(this, e, "gathering RHS rows for lookup join", total, inputOutputMeta, NULL)); + } } + if (rightCollector && rightCollector->hasSpilt()) + mergeRemappedStats(PARENT::inactiveStats, rightCollector, diskToTempStatsMap); + if (exception) + throw exception.getClear(); } public: static bool needDedup(IHThorHashJoinArg *helper) @@ -2943,7 +2965,7 @@ class CLookupJoinActivityBase : public CInMemJoinBaseputRow(rhsInRowsTemp.getClear(r)); - overflowWriteFile->noteSize(overflowWriteStream->getStatistic(StSizeDiskWrite)); + overflowWriteFile->noteSize(overflowWriteFileIO->getStatistic(StSizeDiskWrite)); return true; } if (hasFailedOverToLocal()) @@ -3001,12 +3023,13 @@ class CLookupJoinActivityBase : public CInMemJoinBasecreateOwnedTempFile(tempFilename.str())); - overflowWriteStream.setown(createRowWriter(&(overflowWriteFile->queryIFile()), queryRowInterfaces(rightITDL), rwFlags)); + overflowWriteFileIO.setown(overflowWriteFile->queryIFile().open(IFOcreate)); + overflowWriteStream.setown(createRowWriter(overflowWriteFileIO, queryRowInterfaces(rightITDL), rwFlags)); overflowWriteCount += rhsInRowsTemp.ordinality(); ForEachItemIn(r, rhsInRowsTemp) overflowWriteStream->putRow(rhsInRowsTemp.getClear(r)); - overflowWriteFile->noteSize(overflowWriteStream->getStatistic(StSizeDiskWrite)); + overflowWriteFile->noteSize(overflowWriteFileIO->getStatistic(StSizeDiskWrite)); return true; } virtual void gatherActiveStats(CRuntimeStatisticCollection &activeStats) const @@ -3018,6 +3041,19 @@ class CLookupJoinActivityBase : public CInMemJoinBase ForEachItemIn(a, rhsSlaveRows) { CThorSpillableRowArray &rows = *rhsSlaveRows.item(a); + mergeRemappedStats(PARENT::inactiveStats, &rows, diskToTempStatsMap); rhs.appendRows(rows, true); rows.kill(); // free up ptr table asap } diff --git a/thorlcr/activities/nsplitter/thnsplitterslave.cpp b/thorlcr/activities/nsplitter/thnsplitterslave.cpp index ef47bcc27bf..50da35c613f 100644 --- a/thorlcr/activities/nsplitter/thnsplitterslave.cpp +++ b/thorlcr/activities/nsplitter/thnsplitterslave.cpp @@ -405,7 +405,7 @@ class NSplitterSlaveActivity : public CSlaveActivity, implements ISharedSmartBuf { PARENT::gatherActiveStats(activeStats); if (sharedRowStream) - ::mergeStats(activeStats, sharedRowStream); + mergeRemappedStats(activeStats, sharedRowStream, diskToTempStatsMap); } // ISharedSmartBufferCallback impl. virtual void paged() { pagedOut = true; } diff --git a/thorlcr/msort/tsorts.cpp b/thorlcr/msort/tsorts.cpp index 7e58fa50ea1..10a95e6af0d 100644 --- a/thorlcr/msort/tsorts.cpp +++ b/thorlcr/msort/tsorts.cpp @@ -1338,7 +1338,7 @@ class CThorSorter : public CSimpleInterface, implements IThorSorter, implements throw; } - mergeStats(spillStats, sortedloader); + mergeRemappedStats(spillStats, sortedloader, diskToTempStatsMap); if (!abort) { diff --git a/thorlcr/thorutil/thbuf.cpp b/thorlcr/thorutil/thbuf.cpp index 2e45c7c78d8..ad2d42bdb60 100644 --- a/thorlcr/thorutil/thbuf.cpp +++ b/thorlcr/thorutil/thbuf.cpp @@ -2449,7 +2449,7 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOfflush(); tempFileOwner->noteSize(iFileIO->getStatistic(StSizeDiskWrite)); - updateRemappedStatsDelta(inactiveStats, previousFileStats, iFileIO, diskToTempStatsMap); // NB: also updates prev to current + updateStatsDelta(stats, previousFileStats, iFileIO); // NB: also updates prev to current previousFileStats.reset(); iFileIO.clear(); } } void createOutputStream() { - closeWriter(); // Ensure stats from closing files are preserved in inactiveStats + closeWriter(); // Ensure stats from closing files are preserved in stats // NB: Called once, when spilling starts. tempFileOwner.setown(activity.createOwnedTempFile(baseTmpFilename)); auto res = createSerialOutputStream(&(tempFileOwner->queryIFile()), compressHandler, options, numOutputs + 1); outputStream.setown(std::get<0>(res)); iFileIO.setown(std::get<1>(res)); totalInputRowsRead = inMemTotalRows; - inactiveStats.addStatistic(StNumSpills, 1); + stats.addStatistic(StNumSpills, 1); } void writeRowsFromInput() { @@ -2539,7 +2539,7 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOfflush(); totalInputRowsRead.fetch_add(newRowsWritten); tempFileOwner->noteSize(iFileIO->getStatistic(StSizeDiskWrite)); - updateRemappedStatsDelta(inactiveStats, previousFileStats, iFileIO, diskToTempStatsMap); // NB: also updates prev to current + updateStatsDelta(stats, previousFileStats, iFileIO); // NB: also updates prev to current // JCSMORE - could track size written, and start new file at this point (e.g. every 100MB), // and track their starting points (by row #) in a vector // We could then tell if/when the readers catch up, and remove consumed files as they do. @@ -2553,7 +2553,7 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOfqueryRowMetaData()), serializer(rowIf->queryRowSerializer()), allocator(rowIf->queryRowAllocator()), deserializer(rowIf->queryRowDeserializer()), - inactiveStats(spillStatistics), previousFileStats(spillStatistics) + stats(tempFileStatistics), previousFileStats(tempFileStatistics) { assertex(input); @@ -2717,7 +2717,7 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf spillFile; + CRuntimeStatisticCollection stats; bool spillRows() { @@ -248,13 +249,13 @@ class CSpillableStreamBase : public CSpillable spillFile.setown(activity.createOwnedTempFile(tempName.str())); VStringBuffer spillPrefixStr("SpillableStream(%u)", spillPriority); rows.save(*spillFile, spillCompInfo, false, spillPrefixStr.str()); // saves committed rows + mergeStats(stats, &rows); rows.kill(); // no longer needed, readers will pull from spillFile. NB: ok to kill array as rows is never written to or expanded - spillFile->noteSize(spillFile->queryIFile().size()); return true; } public: CSpillableStreamBase(CActivityBase &_activity, CThorSpillableRowArray &inRows, IThorRowInterfaces *_rowIf, EmptyRowSemantics _emptyRowSemantics, unsigned _spillPriority) - : CSpillable(_activity, _rowIf, _spillPriority), rows(_activity), emptyRowSemantics(_emptyRowSemantics) + : CSpillable(_activity, _rowIf, _spillPriority), rows(_activity), emptyRowSemantics(_emptyRowSemantics), stats(diskRemoteStatistics) { assertex(inRows.isFlushed()); spillCompInfo = 0x0; @@ -265,6 +266,10 @@ class CSpillableStreamBase : public CSpillable { ensureSpillingCallbackRemoved(); } + unsigned __int64 getStatistic(StatisticKind kind) const + { + return stats.getStatisticValue(kind); + } // IBufferedRowCallback virtual bool freeBufferedRows(bool critical) override { @@ -1328,13 +1333,13 @@ void CThorSpillableRowArray::safeUnregisterWriteCallback(IWritePosCallback &cb) } CThorSpillableRowArray::CThorSpillableRowArray(CActivityBase &activity) - : CThorExpandingRowArray(activity) + : CThorExpandingRowArray(activity), stats(tempFileStatistics) { throwOnOom = false; } CThorSpillableRowArray::CThorSpillableRowArray(CActivityBase &activity, IThorRowInterfaces *rowIf, EmptyRowSemantics emptyRowSemantics, StableSortFlag stableSort, rowidx_t initialSize, size32_t _commitDelta) - : CThorExpandingRowArray(activity, rowIf, ers_forbidden, stableSort, false, initialSize), commitDelta(_commitDelta) + : CThorExpandingRowArray(activity, rowIf, ers_forbidden, stableSort, false, initialSize), commitDelta(_commitDelta), stats(tempFileStatistics) { } @@ -1363,6 +1368,7 @@ void CThorSpillableRowArray::kill() { clearRows(); CThorExpandingRowArray::kill(); + stats.reset(); } void CThorSpillableRowArray::sort(ICompare &compare, unsigned maxCores) @@ -1413,7 +1419,8 @@ rowidx_t CThorSpillableRowArray::save(CFileOwner &iFileOwner, unsigned _spillCom nextCB = &cbCopy.popGet(); nextCBI = nextCB->queryRecordNumber(); } - Owned writer = createRowWriter(&iFileOwner.queryIFile(), rowIf, rwFlags, nullptr, compBlkSz); + OwnedIFileIO iFileIO = iFileOwner.queryIFile().open(IFOcreate); + Owned writer = createRowWriter(iFileIO, rowIf, rwFlags, nullptr, compBlkSz); rowidx_t i=0; rowidx_t rowsWritten=0; try @@ -1452,7 +1459,6 @@ rowidx_t CThorSpillableRowArray::save(CFileOwner &iFileOwner, unsigned _spillCom ++i; } writer->flush(NULL); - iFileOwner.noteSize(writer->getStatistic(StSizeDiskWrite)); } catch (IException *e) { @@ -1463,6 +1469,10 @@ rowidx_t CThorSpillableRowArray::save(CFileOwner &iFileOwner, unsigned _spillCom firstRow += n; offset_t bytesWritten = writer->getPosition(); writer.clear(); + mergeStats(stats, iFileIO); + offset_t sizeTempFile = iFileIO->getStatistic(StSizeDiskWrite); + iFileOwner.noteSize(sizeTempFile); + stats.addStatistic(StNumSpills, 1); ActPrintLog(&activity, "%s: CThorSpillableRowArray::save done, rows written = %" RIPF "u, bytes = %" I64F "u, firstRow = %u", _tracingPrefix, rowsWritten, (__int64)bytesWritten, firstRow); return rowsWritten; } @@ -1638,11 +1648,7 @@ class CThorRowCollectorBase : public CSpillable Owned spillableRowSet; unsigned options = 0; unsigned spillCompInfo = 0; - RelaxedAtomic statOverflowCount{0}; - RelaxedAtomic statSizeSpill{0}; - RelaxedAtomic<__uint64> statSpillCycles{0}; RelaxedAtomic<__uint64> statSortCycles{0}; - bool spillRows(bool critical) { //This must only be called while a lock is held on spillableRows @@ -1668,11 +1674,6 @@ class CThorRowCollectorBase : public CSpillable spillableRows.save(*tempFileOwner, spillCompInfo, false, spillPrefixStr.str()); // saves committed rows spillFiles.append(tempFileOwner.getLink()); ++overflowCount; - statOverflowCount.fastAdd(1); // NB: this is total over multiple uses of this class - offset_t tempFileSize = tempFileOwner->queryIFile().size(); - statSizeSpill.fastAdd(tempFileSize); - tempFileOwner->noteSize(tempFileSize); - statSpillCycles.fastAdd(spillTimer.elapsedCycles()); return true; } void setEmptyRowSemantics(EmptyRowSemantics _emptyRowSemantics) @@ -1960,26 +1961,17 @@ class CThorRowCollectorBase : public CSpillable { options = _options; } - unsigned __int64 getStatistic(StatisticKind kind) + unsigned __int64 getStatistic(StatisticKind kind) const { switch (kind) { - case StCycleSpillElapsedCycles: - return statSpillCycles; case StCycleSortElapsedCycles: return statSortCycles; - case StTimeSpillElapsed: - return cycle_to_nanosec(statSpillCycles); case StTimeSortElapsed: return cycle_to_nanosec(statSortCycles); - case StNumSpills: - return statOverflowCount; - case StSizeSpillFile: - return statSizeSpill; default: - break; + return spillableRows.getStatistic(kind); } - return 0; } bool hasSpilt() const { return overflowCount >= 1; } @@ -2048,7 +2040,7 @@ class CThorRowLoader : public CThorRowCollectorBase, implements IThorRowLoader } virtual void resize(rowidx_t max) override { CThorRowCollectorBase::resize(max); } virtual void setOptions(unsigned options) override { CThorRowCollectorBase::setOptions(options); } - virtual unsigned __int64 getStatistic(StatisticKind kind) override { return CThorRowCollectorBase::getStatistic(kind); } + virtual unsigned __int64 getStatistic(StatisticKind kind) const override { return CThorRowCollectorBase::getStatistic(kind); } virtual bool hasSpilt() const override { return CThorRowCollectorBase::hasSpilt(); } virtual void setTracingPrefix(const char *tracing) override { CThorRowCollectorBase::setTracingPrefix(tracing); } virtual void reset() override { CThorRowCollectorBase::reset(); } @@ -2103,7 +2095,7 @@ class CThorRowCollector : public CThorRowCollectorBase, implements IThorRowColle } virtual void resize(rowidx_t max) override { CThorRowCollectorBase::resize(max); } virtual void setOptions(unsigned options) override { CThorRowCollectorBase::setOptions(options); } - virtual unsigned __int64 getStatistic(StatisticKind kind) override { return CThorRowCollectorBase::getStatistic(kind); } + virtual unsigned __int64 getStatistic(StatisticKind kind) const override { return CThorRowCollectorBase::getStatistic(kind); } virtual bool hasSpilt() const override { return CThorRowCollectorBase::hasSpilt(); } virtual void setTracingPrefix(const char *tracing) override { CThorRowCollectorBase::setTracingPrefix(tracing); } // IThorArrayLock diff --git a/thorlcr/thorutil/thmem.hpp b/thorlcr/thorutil/thmem.hpp index 8e4f1b896a8..a89d4cdd0f2 100644 --- a/thorlcr/thorutil/thmem.hpp +++ b/thorlcr/thorutil/thmem.hpp @@ -413,7 +413,7 @@ class graph_decl CThorSpillableRowArray : private CThorExpandingRowArray, implem mutable CriticalSection cs; ICopyArrayOf writeCallbacks; size32_t compBlkSz = 0; // means use default - + CRuntimeStatisticCollection stats; // reset after each kill bool _flush(bool force); void doFlush(); inline bool needToMoveRows(bool force) { return (firstRow != 0 && (force || (firstRow >= commitRows/2))); } @@ -484,6 +484,10 @@ class graph_decl CThorSpillableRowArray : private CThorExpandingRowArray, implem inline rowidx_t numCommitted() const { return commitRows - firstRow; } //MORE::Not convinced this is very safe! inline rowidx_t queryTotalRows() const { return CThorExpandingRowArray::ordinality(); } // includes uncommited rows + inline unsigned __int64 getStatistic(StatisticKind kind) const + { + return stats.getStatisticValue(kind); + } // access to void swap(CThorSpillableRowArray &src); @@ -542,7 +546,7 @@ interface IThorRowCollectorCommon : extends IInterface, extends IThorArrayLock virtual void setup(ICompare *iCompare, StableSortFlag stableSort=stableSort_none, RowCollectorSpillFlags diskMemMix=rc_mixed, unsigned spillPriority=50) = 0; virtual void resize(rowidx_t max) = 0; virtual void setOptions(unsigned options) = 0; - virtual unsigned __int64 getStatistic(StatisticKind kind) = 0; + virtual unsigned __int64 getStatistic(StatisticKind kind) const = 0; virtual bool hasSpilt() const = 0; // equivalent to numOverlows() >= 1 virtual void setTracingPrefix(const char *tracing) = 0; virtual void reset() = 0; diff --git a/thorlcr/thorutil/thormisc.cpp b/thorlcr/thorutil/thormisc.cpp index a9e493e02e4..70bdedb9361 100644 --- a/thorlcr/thorutil/thormisc.cpp +++ b/thorlcr/thorutil/thormisc.cpp @@ -97,6 +97,7 @@ const StatisticsMapping hashDedupActivityStatistics({}, spillStatistics, diskWri const StatisticsMapping hashDistribActivityStatistics({StNumLocalRows, StNumRemoteRows, StSizeRemoteWrite}, basicActivityStatistics); const StatisticsMapping loopActivityStatistics({StNumIterations}, basicActivityStatistics); const StatisticsMapping graphStatistics({StNumExecutions, StSizeSpillFile, StSizeGraphSpill, StSizePeakTempDisk, StSizePeakEphemeralDisk, StTimeUser, StTimeSystem, StNumContextSwitches, StSizeMemory, StSizePeakMemory, StSizeRowMemory, StSizePeakRowMemory}, executeStatistics); +const StatisticsMapping tempFileStatistics({StNumSpills}, diskRemoteStatistics); const StatKindMap diskToTempStatsMap ={ {StSizeDiskWrite, StSizeSpillFile}, diff --git a/thorlcr/thorutil/thormisc.hpp b/thorlcr/thorutil/thormisc.hpp index 3159d58ad48..17ebe967e8b 100644 --- a/thorlcr/thorutil/thormisc.hpp +++ b/thorlcr/thorutil/thormisc.hpp @@ -167,6 +167,8 @@ extern graph_decl const StatisticsMapping soapcallActivityStatistics; extern graph_decl const StatisticsMapping indexReadFileStatistics; extern graph_decl const StatisticsMapping hashDedupActivityStatistics; extern graph_decl const StatisticsMapping hashDistribActivityStatistics; +extern graph_decl const StatisticsMapping tempFileStatistics; + // Maps disk related stats to spill stats extern graph_decl const std::map diskToTempStatsMap;