From d5f12b62c3ca91fe33c811129d8a107b89daff0f Mon Sep 17 00:00:00 2001 From: Shamser Ahmed Date: Wed, 3 Apr 2024 11:34:04 +0100 Subject: [PATCH] HPCC-28757 New StSizePeakSpill stats for hash dedup Signed-off-by: Shamser Ahmed --- system/jlib/jstatcodes.h | 1 + system/jlib/jstats.cpp | 1 + .../hashdistrib/thhashdistribslave.cpp | 21 ++++++-- thorlcr/graph/thgraphslave.cpp | 20 +++++++- thorlcr/graph/thgraphslave.hpp | 16 +++++++ thorlcr/thorutil/thormisc.cpp | 4 +- thorlcr/thorutil/thormisc.hpp | 48 +++++++++++++++++++ 7 files changed, 103 insertions(+), 8 deletions(-) diff --git a/system/jlib/jstatcodes.h b/system/jlib/jstatcodes.h index 2314a94ab65..26d7f33ee47 100644 --- a/system/jlib/jstatcodes.h +++ b/system/jlib/jstatcodes.h @@ -302,6 +302,7 @@ enum StatisticKind StSizeContinuationData, StNumContinuationRequests, StNumFailures, + StSizePeakSpill, StMax, //For any quantity there is potentially the following variants. diff --git a/system/jlib/jstats.cpp b/system/jlib/jstats.cpp index 4dcab0b9ef1..4cb25a93224 100644 --- a/system/jlib/jstats.cpp +++ b/system/jlib/jstats.cpp @@ -974,6 +974,7 @@ static const constexpr StatisticMeta statsMetaData[StMax] = { { SIZESTAT(ContinuationData), "The total size of continuation data sent from agent to the server\nA large number may indicate a poor filter, or merging from many different index locations" }, { NUMSTAT(ContinuationRequests), "The number of times the agent indicated there was more data to be returned" }, { NUMSTAT(Failures), "The number of times a query has failed" }, + { PEAKSIZESTAT(PeakSpill), "The high water mark for spill files"}, }; static MapStringTo statisticNameMap(true); diff --git a/thorlcr/activities/hashdistrib/thhashdistribslave.cpp b/thorlcr/activities/hashdistrib/thhashdistribslave.cpp index 89b95eea189..6a5558073fc 100644 --- a/thorlcr/activities/hashdistrib/thhashdistribslave.cpp +++ b/thorlcr/activities/hashdistrib/thhashdistribslave.cpp @@ -2648,17 +2648,18 @@ class CSpill : implements IRowWriter, public CSimpleInterface CActivityBase &owner; IThorRowInterfaces *rowIf; rowcount_t count; - Owned spillFile; + Owned spillFile; Owned spillFileIO; IRowWriter *writer; StringAttr desc; unsigned bucketN, rwFlags; + Linked spillsSizeTracker; public: IMPLEMENT_IINTERFACE_USING(CSimpleInterface); - CSpill(CActivityBase &_owner, IThorRowInterfaces *_rowIf, const char *_desc, unsigned _bucketN) - : owner(_owner), rowIf(_rowIf), desc(_desc), bucketN(_bucketN) + CSpill(CActivityBase &_owner, IThorRowInterfaces *_rowIf, const char *_desc, unsigned _bucketN, FilesSizeTracker * _spillsSizeTracker) + : owner(_owner), rowIf(_rowIf), desc(_desc), bucketN(_bucketN), spillsSizeTracker(_spillsSizeTracker) { count = 0; writer = NULL; @@ -2676,7 +2677,7 @@ class CSpill : implements IRowWriter, public CSimpleInterface prefix.append(bucketN).append('_').append(desc); GetTempFilePath(tempname, prefix.str()); OwnedIFile iFile = createIFile(tempname.str()); - spillFile.setown(new CFileOwner(iFile.getLink())); + spillFile.setown(new CFileOwnerSizeUpdater(iFile.getLink(), spillsSizeTracker)); if (owner.getOptBool(THOROPT_COMPRESS_SPILLS, true)) { rwFlags |= rw_compress; @@ -2712,6 +2713,7 @@ class CSpill : implements IRowWriter, public CSimpleInterface writer = NULL; spillFileIO->flush(); mergeStats(stats, this); + spillFile->noteSpill(getStatistic(StSizeSpillFile)); spillFileIO.clear(); } inline __int64 getStatistic(StatisticKind kind) const @@ -3174,6 +3176,7 @@ class HashDedupSlaveActivityBase : public CSlaveActivity bucketHandlerStack.kill(); CSlaveActivity::kill(); } + CATCH_NEXTROW() { ActivityTimer t(slaveTimerStats, timeActivities); @@ -3299,6 +3302,14 @@ class HashDedupSlaveActivityBase : public CSlaveActivity virtual bool isGrouped() const override { return grouped; } virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override = 0; + + virtual void gatherActiveStats(CRuntimeStatisticCollection &activeStats) const + { + PARENT::gatherActiveStats(activeStats); + unsigned __int64 peakSpill = queryPeakSpillSize(); + activeStats.setStatistic(StSizePeakSpill, peakSpill); + } + friend class CBucketHandler; friend class CHashTableRowTable; friend class CBucket; @@ -3373,7 +3384,7 @@ void CHashTableRowTable::rehash(const void **newRows) CBucket::CBucket(HashDedupSlaveActivityBase &_owner, IThorRowInterfaces *_rowIf, IThorRowInterfaces *_keyIf, bool _extractKey, unsigned _bucketN, CHashTableRowTable *_htRows) : owner(_owner), keyIf(_keyIf), extractKey(_extractKey), bucketN(_bucketN), htRows(_htRows), - rowSpill(owner, _rowIf, "rows", _bucketN), keySpill(owner, _keyIf, "keys", _bucketN) + rowSpill(owner, _rowIf, "rows", _bucketN, _owner.queryFileSizeTracker()), keySpill(owner, _keyIf, "keys", _bucketN, _owner.queryFileSizeTracker()) { spilt = false; diff --git a/thorlcr/graph/thgraphslave.cpp b/thorlcr/graph/thgraphslave.cpp index 6c1d9868eba..fbb137c7781 100644 --- a/thorlcr/graph/thgraphslave.cpp +++ b/thorlcr/graph/thgraphslave.cpp @@ -1280,8 +1280,26 @@ bool CSlaveGraph::serializeStats(MemoryBuffer &mb) jobS->querySharedAllocator()->queryRowManager()->reportSummaryStatistics(stats); IGraphTempHandler *tempHandler = owner ? queryTempHandler(false) : queryJob().queryTempHandler(); + offset_t sizeGraphSpill = tempHandler ? tempHandler->getActiveUsageSize() : 0; if (tempHandler) - stats.mergeStatistic(StSizeGraphSpill, tempHandler->getActiveUsageSize()); + stats.mergeStatistic(StSizeGraphSpill, sizeGraphSpill); + + // calculate peak spill size + if (started&&initialized) + { + unsigned __int64 activeSpillSize = 0; + Owned iter = getConnectedIterator(); + ForEach (*iter) + { + CGraphElementBase &element = iter->query(); + CSlaveActivity &activity = (CSlaveActivity &)*element.queryActivity(); + activeSpillSize += activity.queryActiveSpillSize(); + } + if (activeSpillSize > peakSpillSize) + peakSpillSize = activeSpillSize; + } + if (peakSpillSize + sizeGraphSpill) + stats.mergeStatistic(StSizePeakSpill, peakSpillSize + sizeGraphSpill); stats.serialize(mb); unsigned cPos = mb.length(); diff --git a/thorlcr/graph/thgraphslave.hpp b/thorlcr/graph/thgraphslave.hpp index 7b43999dbab..48a66cc9cdd 100644 --- a/thorlcr/graph/thgraphslave.hpp +++ b/thorlcr/graph/thgraphslave.hpp @@ -214,6 +214,7 @@ class graphslave_decl CSlaveActivity : public CActivityBase, public CEdgeProgres // fileStats is in this base class as it used by multiple derived classes (both slave and master) but not all. // (Having it in the base class aids setup and resizing.) mutable std::vector> fileStats; + Owned spillsSizeTracker; protected: unsigned __int64 queryLocalCycles() const; @@ -262,6 +263,20 @@ class graphslave_decl CSlaveActivity : public CActivityBase, public CEdgeProgres bool canStall() const; bool isFastThrough() const; + FilesSizeTracker * queryFileSizeTracker() + { + if (!spillsSizeTracker) + spillsSizeTracker.setown(new FilesSizeTracker); + return spillsSizeTracker; + } + unsigned __int64 queryActiveSpillSize() const + { + return spillsSizeTracker ? spillsSizeTracker->queryActiveSize() : 0; + } + unsigned __int64 queryPeakSpillSize() const + { + return spillsSizeTracker ? spillsSizeTracker->queryPeakSize() : 0; + } // IThorDataLink virtual CSlaveActivity *queryFromActivity() override { return this; } @@ -516,6 +531,7 @@ class graphslave_decl CSlaveGraph : public CGraphBase bool doneInit = false; std::atomic_bool progressActive; ProcessInfo processStartInfo; + unsigned __int64 peakSpillSize = 0; public: diff --git a/thorlcr/thorutil/thormisc.cpp b/thorlcr/thorutil/thormisc.cpp index c39a612e50b..cf3c29c13f4 100644 --- a/thorlcr/thorutil/thormisc.cpp +++ b/thorlcr/thorutil/thormisc.cpp @@ -88,11 +88,11 @@ const StatisticsMapping joinActivityStatistics({StNumLeftRows, StNumRightRows}, const StatisticsMapping diskReadActivityStatistics({StNumDiskRowsRead, }, basicActivityStatistics, diskReadRemoteStatistics); const StatisticsMapping diskWriteActivityStatistics({StPerReplicated}, basicActivityStatistics, diskWriteRemoteStatistics); const StatisticsMapping sortActivityStatistics({}, basicActivityStatistics, spillStatistics); -const StatisticsMapping graphStatistics({StNumExecutions, StSizeSpillFile, StSizeGraphSpill, StTimeUser, StTimeSystem, StNumContextSwitches, StSizeMemory, StSizePeakMemory, StSizeRowMemory, StSizePeakRowMemory}, basicActivityStatistics); +const StatisticsMapping graphStatistics({StNumExecutions, StSizeSpillFile, StSizeGraphSpill, StSizePeakSpill, StTimeUser, StTimeSystem, StNumContextSwitches, StSizeMemory, StSizePeakMemory, StSizeRowMemory, StSizePeakRowMemory}, basicActivityStatistics); const StatisticsMapping diskReadPartStatistics({StNumDiskRowsRead}, diskReadRemoteStatistics); const StatisticsMapping indexDistribActivityStatistics({}, basicActivityStatistics, jhtreeCacheStatistics); const StatisticsMapping soapcallActivityStatistics({}, basicActivityStatistics, soapcallStatistics); -const StatisticsMapping hashDedupActivityStatistics({StNumSpills, StSizeSpillFile, StTimeSortElapsed}, diskWriteRemoteStatistics, basicActivityStatistics); +const StatisticsMapping hashDedupActivityStatistics({StNumSpills, StSizeSpillFile, StTimeSortElapsed, StSizePeakSpill}, diskWriteRemoteStatistics, basicActivityStatistics); MODULE_INIT(INIT_PRIORITY_STANDARD) { diff --git a/thorlcr/thorutil/thormisc.hpp b/thorlcr/thorutil/thormisc.hpp index 21cae128b7f..c6f679baf66 100644 --- a/thorlcr/thorutil/thormisc.hpp +++ b/thorlcr/thorutil/thormisc.hpp @@ -359,6 +359,54 @@ class graph_decl CStreamFileOwner : public CSimpleInterfaceOf } }; +// Tracks the current and peak storage used for some files +class FilesSizeTracker: public CInterface +{ + RelaxedAtomic activeSize{0}; + RelaxedAtomic peakSize{0}; +public: + void growSize(unsigned __int64 size) + { + if (size) + { + unsigned __int64 newActiveSize = activeSize.add_fetch(size); + peakSize.store_max(newActiveSize); + } + } + void shrinkSize(unsigned __int64 size) + { + if (size) + activeSize.fetch_sub(size); + } + unsigned __int64 queryActiveSize() const + { + return activeSize.load(); + } + unsigned __int64 queryPeakSize() const + { + return peakSize.load(); + } +}; + +class CFileOwnerSizeUpdater : public CFileOwner +{ + Linked filesSizeTracker; + __int64 fileSize = 0; +public: + CFileOwnerSizeUpdater(IFile *_iFile, FilesSizeTracker * _filesSizeTracker): CFileOwner(_iFile), filesSizeTracker(_filesSizeTracker) + {} + + ~CFileOwnerSizeUpdater() + { + filesSizeTracker->shrinkSize(fileSize); + } + + void noteSpill(__int64 size) + { + fileSize = size; + filesSizeTracker->growSize(fileSize); + } +}; #define DEFAULT_THORMASTERPORT 20000 #define DEFAULT_THORSLAVEPORT 20100