From 1a124262928dfefb72d7760c1bc345a9ac607da5 Mon Sep 17 00:00:00 2001 From: Shamser Ahmed Date: Wed, 3 Apr 2024 11:34:04 +0100 Subject: [PATCH] HPCC-28757 New StSizePeakEphemeralStorage and StSizePeakSubgraphTemp for graphs StSizePeakSubgraphTemp tracks the high water mark for intra-graph temp files. StSizePeakEphemeralStorage tracks the overall high water mark for temp files (i.e. both intra-graph and inter-graph temp files) Note at present, only hash dedup produces the information necessary for generating these 2 stats and so the stats will not be accurate until further work is completed. Other activities that make use of temp files(such as join and sort) will need to be amended to the info necessary to produce these stats. Signed-off-by: Shamser Ahmed --- system/jlib/jstatcodes.h | 2 + system/jlib/jstats.cpp | 4 +- .../hashdistrib/thhashdistribslave.cpp | 10 +++-- thorlcr/graph/thgraph.hpp | 12 ++++++ thorlcr/graph/thgraphslave.cpp | 22 +++++++++- thorlcr/graph/thgraphslave.hpp | 2 +- thorlcr/thorutil/thormisc.cpp | 2 +- thorlcr/thorutil/thormisc.hpp | 43 ++++++++++++++++++- 8 files changed, 87 insertions(+), 10 deletions(-) diff --git a/system/jlib/jstatcodes.h b/system/jlib/jstatcodes.h index 2314a94ab65..f8e22d19861 100644 --- a/system/jlib/jstatcodes.h +++ b/system/jlib/jstatcodes.h @@ -302,6 +302,8 @@ enum StatisticKind StSizeContinuationData, StNumContinuationRequests, StNumFailures, + StSizePeakSubgraphTemp, + StSizePeakEphemeralStorage, StMax, //For any quantity there is potentially the following variants. diff --git a/system/jlib/jstats.cpp b/system/jlib/jstats.cpp index 4dcab0b9ef1..f1c717ccaa8 100644 --- a/system/jlib/jstats.cpp +++ b/system/jlib/jstats.cpp @@ -944,7 +944,7 @@ static const constexpr StatisticMeta statsMetaData[StMax] = { { CYCLESTAT(LeafFetch) }, { TIMESTAT(BlobFetch), "Time spent reading blobs from disk (EXCLUDING the linux page cache)" }, { CYCLESTAT(BlobFetch) }, - { PEAKSIZESTAT(GraphSpill), "Peak size of spill memory usage" }, + { PEAKSIZESTAT(GraphSpill), "Peak size of graph spills" }, { TIMESTAT(AgentQueue), "Time worker items were received and queued before being processed\nThis may indicate that the primary node on a channel was down, or that the workers are overloaded with requests" }, { CYCLESTAT(AgentQueue) }, { TIMESTAT(IBYTIDelay), "Time spent waiting for another worker to start processing a request\nA non-zero value indicates that the primary node on a channel was down or very busy" }, @@ -974,6 +974,8 @@ static const constexpr StatisticMeta statsMetaData[StMax] = { { SIZESTAT(ContinuationData), "The total size of continuation data sent from agent to the server\nA large number may indicate a poor filter, or merging from many different index locations" }, { NUMSTAT(ContinuationRequests), "The number of times the agent indicated there was more data to be returned" }, { NUMSTAT(Failures), "The number of times a query has failed" }, + { PEAKSIZESTAT(PeakSubgraphTemp), "The high water mark for intra-graph temp files"}, + { PEAKSIZESTAT(PeakEphemeralStorage), "The high water mark for emphemeral storage use"}, }; static MapStringTo statisticNameMap(true); diff --git a/thorlcr/activities/hashdistrib/thhashdistribslave.cpp b/thorlcr/activities/hashdistrib/thhashdistribslave.cpp index 89b95eea189..c495e1ab2cb 100644 --- a/thorlcr/activities/hashdistrib/thhashdistribslave.cpp +++ b/thorlcr/activities/hashdistrib/thhashdistribslave.cpp @@ -2653,12 +2653,13 @@ class CSpill : implements IRowWriter, public CSimpleInterface IRowWriter *writer; StringAttr desc; unsigned bucketN, rwFlags; + Linked tempFileSizeTracker; public: IMPLEMENT_IINTERFACE_USING(CSimpleInterface); - CSpill(CActivityBase &_owner, IThorRowInterfaces *_rowIf, const char *_desc, unsigned _bucketN) - : owner(_owner), rowIf(_rowIf), desc(_desc), bucketN(_bucketN) + CSpill(CActivityBase &_owner, IThorRowInterfaces *_rowIf, const char *_desc, unsigned _bucketN, CFileSizeTracker * _tempFileSizeTracker) + : owner(_owner), rowIf(_rowIf), desc(_desc), bucketN(_bucketN), tempFileSizeTracker(_tempFileSizeTracker) { count = 0; writer = NULL; @@ -2676,7 +2677,7 @@ class CSpill : implements IRowWriter, public CSimpleInterface prefix.append(bucketN).append('_').append(desc); GetTempFilePath(tempname, prefix.str()); OwnedIFile iFile = createIFile(tempname.str()); - spillFile.setown(new CFileOwner(iFile.getLink())); + spillFile.setown(new CFileOwner(iFile.getLink(), tempFileSizeTracker)); if (owner.getOptBool(THOROPT_COMPRESS_SPILLS, true)) { rwFlags |= rw_compress; @@ -2712,6 +2713,7 @@ class CSpill : implements IRowWriter, public CSimpleInterface writer = NULL; spillFileIO->flush(); mergeStats(stats, this); + spillFile->noteSpill(getStatistic(StSizeSpillFile)); spillFileIO.clear(); } inline __int64 getStatistic(StatisticKind kind) const @@ -3373,7 +3375,7 @@ void CHashTableRowTable::rehash(const void **newRows) CBucket::CBucket(HashDedupSlaveActivityBase &_owner, IThorRowInterfaces *_rowIf, IThorRowInterfaces *_keyIf, bool _extractKey, unsigned _bucketN, CHashTableRowTable *_htRows) : owner(_owner), keyIf(_keyIf), extractKey(_extractKey), bucketN(_bucketN), htRows(_htRows), - rowSpill(owner, _rowIf, "rows", _bucketN), keySpill(owner, _keyIf, "keys", _bucketN) + rowSpill(owner, _rowIf, "rows", _bucketN, _owner.queryTempFileSizeTracker()), keySpill(owner, _keyIf, "keys", _bucketN, _owner.queryTempFileSizeTracker()) { spilt = false; diff --git a/thorlcr/graph/thgraph.hpp b/thorlcr/graph/thgraph.hpp index ab5a28fc326..b9db6d43c8f 100644 --- a/thorlcr/graph/thgraph.hpp +++ b/thorlcr/graph/thgraph.hpp @@ -1104,6 +1104,7 @@ class graph_decl CActivityBase : implements CInterfaceOf, im CSingletonLock CABserializerlock; CSingletonLock CABdeserializerlock; roxiemem::RoxieHeapFlags defaultRoxieMemHeapFlags = roxiemem::RHFnone; + Owned tempFileSizeTracker; protected: CGraphElementBase &container; @@ -1171,6 +1172,17 @@ class graph_decl CActivityBase : implements CInterfaceOf, im IThorRowInterfaces * createRowInterfaces(IOutputMetaData * meta, byte seq=0); IThorRowInterfaces * createRowInterfaces(IOutputMetaData * meta, roxiemem::RoxieHeapFlags heapFlags, byte seq=0); + CFileSizeTracker * queryTempFileSizeTracker() + { + if (!tempFileSizeTracker) + tempFileSizeTracker.setown(new CFileSizeTracker); + return tempFileSizeTracker; + } + unsigned __int64 queryActiveTempSize() const + { + return tempFileSizeTracker ? tempFileSizeTracker->queryActiveSize() : 0; + } + // IExceptionHandler bool fireException(IException *e); __declspec(noreturn) void processAndThrowOwnedException(IException * e) __attribute__((noreturn)); diff --git a/thorlcr/graph/thgraphslave.cpp b/thorlcr/graph/thgraphslave.cpp index 6c1d9868eba..83bc314b50c 100644 --- a/thorlcr/graph/thgraphslave.cpp +++ b/thorlcr/graph/thgraphslave.cpp @@ -1280,8 +1280,28 @@ bool CSlaveGraph::serializeStats(MemoryBuffer &mb) jobS->querySharedAllocator()->queryRowManager()->reportSummaryStatistics(stats); IGraphTempHandler *tempHandler = owner ? queryTempHandler(false) : queryJob().queryTempHandler(); + offset_t sizeGraphSpill = tempHandler ? tempHandler->getActiveUsageSize() : 0; if (tempHandler) - stats.mergeStatistic(StSizeGraphSpill, tempHandler->getActiveUsageSize()); + stats.mergeStatistic(StSizeGraphSpill, sizeGraphSpill); + + // calculate peak spill size + if (started&&initialized) + { + unsigned __int64 activeTempSize = 0; + Owned iter = getConnectedIterator(); + ForEach (*iter) + { + CGraphElementBase &element = iter->query(); + CSlaveActivity &activity = (CSlaveActivity &)*element.queryActivity(); + activeTempSize += activity.queryActiveTempSize(); + } + if (activeTempSize > peakTempSize) + peakTempSize = activeTempSize; + } + if (peakTempSize) + stats.mergeStatistic(StSizePeakSubgraphTemp, peakTempSize); + if (peakTempSize + sizeGraphSpill) + stats.mergeStatistic(StSizePeakEphemeralStorage, peakTempSize + sizeGraphSpill); stats.serialize(mb); unsigned cPos = mb.length(); diff --git a/thorlcr/graph/thgraphslave.hpp b/thorlcr/graph/thgraphslave.hpp index 7b43999dbab..88db0c7d6bf 100644 --- a/thorlcr/graph/thgraphslave.hpp +++ b/thorlcr/graph/thgraphslave.hpp @@ -262,7 +262,6 @@ class graphslave_decl CSlaveActivity : public CActivityBase, public CEdgeProgres bool canStall() const; bool isFastThrough() const; - // IThorDataLink virtual CSlaveActivity *queryFromActivity() override { return this; } virtual IStrandJunction *getOutputStreams(CActivityBase &_ctx, unsigned idx, PointerArrayOf &streams, const CThorStrandOptions * consumerOptions, bool consumerOrdered, IOrderedCallbackCollection * orderedCallbacks) override; @@ -516,6 +515,7 @@ class graphslave_decl CSlaveGraph : public CGraphBase bool doneInit = false; std::atomic_bool progressActive; ProcessInfo processStartInfo; + unsigned __int64 peakTempSize = 0; public: diff --git a/thorlcr/thorutil/thormisc.cpp b/thorlcr/thorutil/thormisc.cpp index c39a612e50b..8a651c5b0e1 100644 --- a/thorlcr/thorutil/thormisc.cpp +++ b/thorlcr/thorutil/thormisc.cpp @@ -88,7 +88,7 @@ const StatisticsMapping joinActivityStatistics({StNumLeftRows, StNumRightRows}, const StatisticsMapping diskReadActivityStatistics({StNumDiskRowsRead, }, basicActivityStatistics, diskReadRemoteStatistics); const StatisticsMapping diskWriteActivityStatistics({StPerReplicated}, basicActivityStatistics, diskWriteRemoteStatistics); const StatisticsMapping sortActivityStatistics({}, basicActivityStatistics, spillStatistics); -const StatisticsMapping graphStatistics({StNumExecutions, StSizeSpillFile, StSizeGraphSpill, StTimeUser, StTimeSystem, StNumContextSwitches, StSizeMemory, StSizePeakMemory, StSizeRowMemory, StSizePeakRowMemory}, basicActivityStatistics); +const StatisticsMapping graphStatistics({StNumExecutions, StSizeSpillFile, StSizeGraphSpill, StSizePeakSubgraphTemp, StSizePeakEphemeralStorage, StTimeUser, StTimeSystem, StNumContextSwitches, StSizeMemory, StSizePeakMemory, StSizeRowMemory, StSizePeakRowMemory}, basicActivityStatistics); const StatisticsMapping diskReadPartStatistics({StNumDiskRowsRead}, diskReadRemoteStatistics); const StatisticsMapping indexDistribActivityStatistics({}, basicActivityStatistics, jhtreeCacheStatistics); const StatisticsMapping soapcallActivityStatistics({}, basicActivityStatistics, soapcallStatistics); diff --git a/thorlcr/thorutil/thormisc.hpp b/thorlcr/thorutil/thormisc.hpp index 21cae128b7f..069829cc7f4 100644 --- a/thorlcr/thorutil/thormisc.hpp +++ b/thorlcr/thorutil/thormisc.hpp @@ -306,22 +306,62 @@ class graph_decl CTimeoutTrigger : public CInterface, implements IThreaded virtual bool action() = 0; }; +// Tracks the current and peak storage used for some files +class CFileSizeTracker: public CInterface +{ + RelaxedAtomic activeSize{0}; + RelaxedAtomic peakSize{0}; +public: + void growSize(unsigned __int64 size) + { + if (size) + { + unsigned __int64 newActiveSize = activeSize.add_fetch(size); + peakSize.store_max(newActiveSize); + } + } + void shrinkSize(unsigned __int64 size) + { + if (size) + activeSize.fetch_sub(size); + } + unsigned __int64 queryActiveSize() const + { + return activeSize.load(); + } + unsigned __int64 queryPeakSize() const + { + return peakSize.load(); + } +}; + // simple class which takes ownership of the underlying file and deletes it on destruction class graph_decl CFileOwner : public CSimpleInterface, implements IInterface { OwnedIFile iFile; + Linked fileSizeTracker; + offset_t fileSize = 0; public: IMPLEMENT_IINTERFACE_USING(CSimpleInterface); - CFileOwner(IFile *_iFile) : iFile(_iFile) + CFileOwner(IFile *_iFile, CFileSizeTracker * _fileSizeTracker=nullptr) : iFile(_iFile), fileSizeTracker(_fileSizeTracker) { } ~CFileOwner() { + if (fileSizeTracker) + fileSizeTracker->shrinkSize(fileSize); iFile->remove(); } + void noteSpill(offset_t size) + { + fileSize = size; + if (fileSizeTracker) + fileSizeTracker->growSize(fileSize); + } IFile &queryIFile() const { return *iFile; } }; + // stream wrapper, that takes ownership of a CFileOwner class graph_decl CStreamFileOwner : public CSimpleInterfaceOf { @@ -359,7 +399,6 @@ class graph_decl CStreamFileOwner : public CSimpleInterfaceOf } }; - #define DEFAULT_THORMASTERPORT 20000 #define DEFAULT_THORSLAVEPORT 20100 #define DEFAULT_SLAVEPORTINC 20