From 162c2a6c265255a6304a601403243b07e9dbebee Mon Sep 17 00:00:00 2001 From: Shamser Ahmed Date: Tue, 14 Nov 2023 14:16:25 +0000 Subject: [PATCH] Changes following review Signed-off-by: Shamser Ahmed --- common/thorhelper/thorcommon.hpp | 4 + dali/base/dadfs.cpp | 3 +- system/jhtree/jhtree.cpp | 5 +- .../activities/indexread/thindexreadslave.cpp | 53 +++---- thorlcr/activities/keyedjoin/thkeyedjoin.cpp | 2 +- .../activities/keyedjoin/thkeyedjoinslave.cpp | 142 +++++++++++------- thorlcr/graph/thgraphmaster.cpp | 6 +- thorlcr/graph/thgraphslave.hpp | 5 +- thorlcr/thorutil/thormisc.cpp | 3 +- thorlcr/thorutil/thormisc.hpp | 1 + 10 files changed, 129 insertions(+), 95 deletions(-) diff --git a/common/thorhelper/thorcommon.hpp b/common/thorhelper/thorcommon.hpp index 9edd2ea3741..7a6e8c7d597 100644 --- a/common/thorhelper/thorcommon.hpp +++ b/common/thorhelper/thorcommon.hpp @@ -750,6 +750,10 @@ class CStatsContextLogger : public CSimpleInterfaceOf previous.updateDelta(to, stats); } virtual const LogMsgJobInfo & queryJob() const override { return job; } + void reset() + { + stats.reset(); + } }; extern THORHELPER_API bool isActivitySink(ThorActivityKind kind); diff --git a/dali/base/dadfs.cpp b/dali/base/dadfs.cpp index 08543f4825c..965cc10d057 100644 --- a/dali/base/dadfs.cpp +++ b/dali/base/dadfs.cpp @@ -226,8 +226,7 @@ extern da_decl double calcFileAccessCost(const char * cluster, __int64 numDiskWr extern da_decl double calcFileAccessCost(IDistributedFile *f, __int64 numDiskWrites, __int64 numDiskReads) { StringBuffer clusterName; - // Should really specify the cluster number too, but this is the best we can do for now - f->getClusterName(0, clusterName); + f->getClusterName(0, clusterName); // Should really specify the cluster number too, but good enough for now return calcFileAccessCost(clusterName, numDiskWrites, numDiskReads); } diff --git a/system/jhtree/jhtree.cpp b/system/jhtree/jhtree.cpp index 99bbd5d5684..0afdaedaf9f 100644 --- a/system/jhtree/jhtree.cpp +++ b/system/jhtree/jhtree.cpp @@ -575,10 +575,11 @@ class jhtree_decl CKeyLevelManager : implements IKeyManager, public CInterface virtual void mergeStats(CRuntimeStatisticCollection & targetStats) const { + // IO Stats comming from the keyCursor and jhtree cache stats coming from this class's stats if (keyCursor) - keyCursor->mergeStats(targetStats); + keyCursor->mergeStats(targetStats); // merge IO stats if (stats.ctx) - targetStats.merge(stats.ctx->queryStats()); + targetStats.merge(stats.ctx->queryStats()); // merge jhtree cache stats } }; diff --git a/thorlcr/activities/indexread/thindexreadslave.cpp b/thorlcr/activities/indexread/thindexreadslave.cpp index 38df9cc39de..cd23817b677 100644 --- a/thorlcr/activities/indexread/thindexreadslave.cpp +++ b/thorlcr/activities/indexread/thindexreadslave.cpp @@ -98,7 +98,7 @@ class CIndexReadSlaveBase : public CSlaveActivity if (!keyManager) throw MakeActivityException(&activity, 0, "Callback attempting to read blob with no key manager - index being read remotely?"); needsBlobCleaning = true; - return (byte *) keyManager->loadBlob(id, dummy, &activity.contextLogger); + return (byte *) keyManager->loadBlob(id, dummy, nullptr); } void prepareManager(IKeyManager *_keyManager) { @@ -296,7 +296,7 @@ class CIndexReadSlaveBase : public CSlaveActivity part.queryOwner().getClusterLabel(0, planeName); blockedSize = getBlockedFileIOSize(planeName); } - lazyIFileIO.setown(queryThor().queryFileCache().lookupIFileIO(*this, logicalFilename, part, nullptr, indexReadActivityStatistics, blockedSize)); + lazyIFileIO.setown(queryThor().queryFileCache().lookupIFileIO(*this, logicalFilename, part, nullptr, indexReadFileStatistics, blockedSize)); RemoteFilename rfn; part.getFilename(0, rfn); @@ -348,44 +348,39 @@ class CIndexReadSlaveBase : public CSlaveActivity else return nullptr; } - void mergeFileStats(IPartDescriptor *partDesc, IFileIO *partIO) - { - if (!currentManager) - return; - if (fileStats.size()>0) - { - ISuperFileDescriptor * superFDesc = partDesc->queryOwner().querySuperFileDescriptor(); - if (superFDesc) - { - unsigned subfile, lnum; - if(superFDesc->mapSubPart(partDesc->queryPartIndex(), subfile, lnum)) - currentManager->mergeStats(*fileStats[fileTableStart+subfile]); - } - else - { - currentManager->mergeStats(*fileStats[fileTableStart]); - } - } - } void updateStats() { + // Merge jhtree cache stats and file io stats for current part // NB: updateStats() should always be called whilst ioStatsCS is held. - if (lazyIFileIO) + if (currentManager) { - mergeStats(inactiveStats, lazyIFileIO); - if (currentPartmergeStats(inactiveStats); + // merge stats for part + if (currentPart0) + { + IPartDescriptor & curPartDesc = partDescs.item(currentPart); + ISuperFileDescriptor * superFDesc = curPartDesc.queryOwner().querySuperFileDescriptor(); + if (superFDesc) + { + unsigned subfile, lnum; + if(superFDesc->mapSubPart(curPartDesc.queryPartIndex(), subfile, lnum)) + currentManager->mergeStats(*fileStats[fileTableStart+subfile]); + } + else + { + currentManager->mergeStats(*fileStats[fileTableStart]); + } + } } } void configureNextInput() { if (currentManager) { - resetManager(currentManager); - currentManager = nullptr; - CriticalBlock b(ioStatsCS); updateStats(); + resetManager(currentManager); + currentManager = nullptr; lazyIFileIO.clear(); } IKeyManager *keyManager = nullptr; @@ -688,7 +683,7 @@ class CIndexReadSlaveBase : public CSlaveActivity } } data.read(fileTableStart); - setupSpace4FileStats(fileTableStart, reInit, super!=nullptr, super?super->querySubFiles():0, indexReadActivityStatistics); + setupSpace4FileStats(fileTableStart, reInit, super!=nullptr, super?super->querySubFiles():0, indexReadFileStatistics); } } // IThorDataLink diff --git a/thorlcr/activities/keyedjoin/thkeyedjoin.cpp b/thorlcr/activities/keyedjoin/thkeyedjoin.cpp index 8a687180a98..8130a5d9d2f 100644 --- a/thorlcr/activities/keyedjoin/thkeyedjoin.cpp +++ b/thorlcr/activities/keyedjoin/thkeyedjoin.cpp @@ -304,7 +304,7 @@ class CKeyedJoinMaster : public CMasterActivity totalIndexParts = 0; Owned dataFile; - Owned indexFile = lookupReadFile(indexFileName, AccessMode::readRandom, false, false, 0 != (helper->getJoinFlags() & JFindexoptional), true, indexReadActivityStatistics, &indexFileStatsTableEntry); + Owned indexFile = lookupReadFile(indexFileName, AccessMode::readRandom, false, false, 0 != (helper->getJoinFlags() & JFindexoptional), true, indexReadFileStatistics, &indexFileStatsTableEntry); if (indexFile) { if (!isFileKey(indexFile)) diff --git a/thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp b/thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp index a973d9e74d8..ea766f28df5 100644 --- a/thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp +++ b/thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp @@ -959,8 +959,6 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem { protected: CKeyedJoinSlave &activity; - CStatsContextLogger contextLogger; - CStatsCtxLoggerDeltaUpdater statsUpdater; IThorRowInterfaces *rowIf; IHThorKeyedJoinArg *helper = nullptr; std::vector queues; @@ -997,8 +995,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem } public: CLookupHandler(CKeyedJoinSlave &_activity, IThorRowInterfaces *_rowIf, unsigned _batchProcessLimit) : threaded("CLookupHandler", this), - activity(_activity), rowIf(_rowIf), batchProcessLimit(_batchProcessLimit), - contextLogger(jhtreeCacheStatistics, thorJob), statsUpdater(jhtreeCacheStatistics, _activity, contextLogger) + activity(_activity), rowIf(_rowIf), batchProcessLimit(_batchProcessLimit) { helper = activity.helper; } @@ -1087,7 +1084,6 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem } void stop() { - CStatsScopedDeltaUpdater scoped(statsUpdater); stopped = true; join(); for (auto &queue : queues) @@ -1159,7 +1155,6 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem } void flush() { - CStatsScopedThresholdDeltaUpdater scoped(statsUpdater); // NB: queueLookup() must be protected from re-entry by caller for (unsigned b=0; bdec(); // unblocks any requests to start lookup threads } - virtual void getFileStats(std::vector> & fileStats, unsigned startOffset) = 0; + virtual void getFileStats(std::vector> & fileStats, unsigned startOffset) const = 0; }; class CKeyLookupLocalBase : public CLookupHandler @@ -1288,7 +1283,6 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem } void processRows(CThorExpandingRowArray &processing, unsigned partNo, IKeyManager *keyManager) { - CStatsScopedThresholdDeltaUpdater scoped(statsUpdater); for (unsigned r=0; rsetAtMostLimitHit(); // also clears existing rows break; } - KLBlobProviderAdapter adapter(keyManager, &contextLogger); + KLBlobProviderAdapter adapter(keyManager, nullptr); byte const * keyRow = keyManager->queryKeyBuffer(); size_t fposOffset = keyManager->queryRowSize() - sizeof(offset_t); offset_t fpos = rtlReadBigUInt8(keyRow + fposOffset); @@ -1354,6 +1348,8 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem //Need ptr to std::vector as std::atomic's are not constructable(doesn't have copy constructor) std::unique_ptr>> keyManagers; + // One context logger per part. (Extract stats for logical file by mapping part to logical file/subfile) + std::vector> contextLoggers; public: CKeyLookupLocalHandler(CKeyedJoinSlave &_activity) : CKeyLookupLocalBase(_activity) { @@ -1372,6 +1368,15 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem keyManagers.reset(new std::vector>(parts.size())); for (auto & k: *keyManagers) k = nullptr; + contextLoggers.clear(); + for (unsigned i=0; ireset(); } virtual void addPartNum(unsigned partNum) override { @@ -1386,36 +1391,33 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem std::atomic & keyManager = (*keyManagers)[selected]; if (!keyManager) // delayed until actually needed { - keyManager = activity.createPartKeyManager(partNo, copy, &contextLogger); + keyManager = activity.createPartKeyManager(partNo, copy, contextLoggers[selected]); // NB: potentially translation per part could be different if dealing with superkeys setupTranslation(partNo, selected, *keyManager); } processRows(processing, partNo, keyManager); } - virtual void getFileStats(std::vector> & fileStats, unsigned startOffset) override + virtual void getFileStats(std::vector> & fileStats, unsigned startOffset) const override { for (size_t i=0; imergeStats(*fileStats[startOffset+subfile]); - } - else - keyManager->mergeStats(*fileStats[startOffset]); + assertex(parts.size()==contextLoggers.size()); + CStatsContextLogger * contextLogger = contextLoggers[i]; + // Superfile: entry is StartOffset + subfileNum + // Otherwise, use 'startOffset' entry for the stats + unsigned fileEntry = isSuper ? (startOffset+subFileNum[i]) : startOffset; + assertex(fileEntry < fileStats.size()); + fileStats[fileEntry]->merge(contextLogger->queryStats()); } } }; class CKeyLookupMergeHandler : public CKeyLookupLocalBase { typedef CKeyLookupLocalBase PARENT; - + CStatsContextLogger contextLogger; std::atomic keyManager{nullptr}; public: - CKeyLookupMergeHandler(CKeyedJoinSlave &_activity) : CKeyLookupLocalBase(_activity) + CKeyLookupMergeHandler(CKeyedJoinSlave &_activity) : CKeyLookupLocalBase(_activity), contextLogger(jhtreeCacheStatistics, thorJob) { limiter = &activity.lookupThreadLimiter; translators.push_back(nullptr); @@ -1437,33 +1439,22 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem Owned keyIndex = activity.createPartKeyIndex(partNo, copy, false); partKeySet->addIndex(keyIndex.getClear()); } - keyManager = createKeyMerger(helper->queryIndexRecordSize()->queryRecordAccessor(true), partKeySet, 0, nullptr, helper->hasNewSegmentMonitors(), false); + keyManager = createKeyMerger(helper->queryIndexRecordSize()->queryRecordAccessor(true), partKeySet, 0, &contextLogger, helper->hasNewSegmentMonitors(), false); setupTranslation(0, 0, *keyManager); } processRows(processing, 0, keyManager); } - virtual void getFileStats(std::vector> & fileStats, unsigned startOffset) override + virtual void getFileStats(std::vector> & fileStats, unsigned startOffset) const override { - if (keyManager) - { - for (size_t i=0; imerge(contextLogger.queryStats()); } - }; class CRemoteLookupHandler : public CLookupHandler { typedef CLookupHandler PARENT; - protected: rank_t lookupSlave = RANK_NULL; mptag_t replyTag = TAG_NULL; @@ -1547,14 +1538,12 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem for (auto &h: handles) h = 0; } - virtual void getFileStats(std::vector> & fileStats, unsigned startOffset) override - { - /* Note: currently, stats from remote file not tracked */ - } }; class CKeyLookupRemoteHandler : public CRemoteLookupHandler { typedef CRemoteLookupHandler PARENT; + // One context logger per part. (Extract stats for logical file by mapping part to logical file/subfile) + std::vector> contextLoggers; void initRead(CMessageBuffer &msg, unsigned selected, unsigned partNo, unsigned copy) { @@ -1637,6 +1626,19 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem limiter = &activity.lookupThreadLimiter; allParts = &activity.allIndexParts; } + virtual void init() override + { + PARENT::init(); + contextLoggers.clear(); + for (unsigned i=0; ireset(); + } virtual StringBuffer &getInfo(StringBuffer &info) const override { return PARENT::getInfo(info).append(", lookupSlave=").append(lookupSlave); @@ -1757,12 +1759,13 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem unsigned __int64 nodeDiskFetches, leafDiskFetches, blobDiskFetches; mb.read(seeks).read(scans).read(wildseeks); mb.read(nodeDiskFetches).read(leafDiskFetches).read(blobDiskFetches); - activity.inactiveStats.sumStatistic(StNumIndexSeeks, seeks); - activity.inactiveStats.sumStatistic(StNumIndexScans, scans); - activity.inactiveStats.sumStatistic(StNumIndexWildSeeks, wildseeks); - activity.inactiveStats.sumStatistic(StNumNodeDiskFetches, nodeDiskFetches); - activity.inactiveStats.sumStatistic(StNumLeafDiskFetches, leafDiskFetches); - activity.inactiveStats.sumStatistic(StNumBlobDiskFetches, blobDiskFetches); + CStatsContextLogger * contextLogger(contextLoggers[selected]); + contextLogger->noteStatistic(StNumIndexSeeks, seeks); + contextLogger->noteStatistic(StNumIndexScans, scans); + contextLogger->noteStatistic(StNumIndexWildSeeks, wildseeks); + contextLogger->noteStatistic(StNumNodeDiskFetches, nodeDiskFetches); + contextLogger->noteStatistic(StNumLeafDiskFetches, leafDiskFetches); + contextLogger->noteStatistic(StNumBlobDiskFetches, blobDiskFetches); if (received == numRows) break; } @@ -1772,6 +1775,19 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem PARENT::end(); doClose(kjs_keyclose); } + virtual void getFileStats(std::vector> & fileStats, unsigned startOffset) const override + { + for (size_t i=0; imerge(contextLogger->queryStats()); + } + } }; class CFetchLocalLookupHandler : public CLookupHandler { @@ -1812,7 +1828,6 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem } virtual void process(CThorExpandingRowArray &processing, unsigned selected) override { - CStatsScopedThresholdDeltaUpdater scopedStats(statsUpdater); unsigned partCopy = parts[selected]; unsigned partNo = partCopy & partMask; unsigned copy = partCopy >> partBits; @@ -1873,13 +1888,13 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem diskSeeks++; } } - virtual void getFileStats(std::vector> & fileStats, unsigned startOffset) override + virtual void getFileStats(std::vector> & fileStats, unsigned startOffset) const override { for (size_t i=0; i> & fileStats, unsigned startOffset) const override {} }; class CReadAheadThread : implements IThreaded { @@ -2199,7 +2216,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem lookupHandler->end(); } } - void getFileStats(std::vector> & fileStats, unsigned startOffset) + void getFileStats(std::vector> & fileStats, unsigned startOffset) const { ForEachItemIn(h, handlers) handlers.item(h)->getFileStats(fileStats, startOffset); @@ -3120,7 +3137,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem } ISuperFileDescriptor *superFdesc = numIndexParts?allIndexParts.item(0).queryOwner().querySuperFileDescriptor():nullptr; unsigned numIndexSubFiles = superFdesc?superFdesc->querySubFiles():0; - setupSpace4FileStats(indexFileStatsTableEntry, true, superFdesc!=nullptr, numIndexSubFiles, indexReadActivityStatistics); + setupSpace4FileStats(indexFileStatsTableEntry, true, superFdesc!=nullptr, numIndexSubFiles, indexReadFileStatistics); setupLookupHandlers(keyLookupHandlers, totalIndexParts, superFdesc, localIndexParts, indexPartToSlaveMap, localKey, forceRemoteKeyedLookup ? ht_remotekeylookup : ht_localkeylookup, ht_remotekeylookup); data.read(totalDataParts); if (totalDataParts) @@ -3791,11 +3808,22 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem { return container.queryId(); } - virtual void serializeStats(MemoryBuffer &mb) override + virtual void gatherActiveStats(CRuntimeStatisticCollection &activeStats) const { + PARENT::gatherActiveStats(activeStats); + // fileStats gather per logical file. Handlers track stats per part. Multiple handlers could be updating the same logical file. + // To allow each handler to update the same logical file without overwriting the other handlers updates, the fileStats are 'merged' into fileStats[] + // So, reset required because the handlers merge stats rather than sets them + for (auto & fileStatItem: fileStats) + fileStatItem->reset(); + keyLookupHandlers.getFileStats(fileStats, indexFileStatsTableEntry); fetchLookupHandlers.getFileStats(fileStats, dataFileStatsTableEntry); - + for (auto & fileStatItem: fileStats) + activeStats.merge(*fileStatItem); + } + virtual void serializeStats(MemoryBuffer &mb) override + { PARENT::serializeStats(mb); mb.append((unsigned)fileStats.size()); for (auto &stats: fileStats) diff --git a/thorlcr/graph/thgraphmaster.cpp b/thorlcr/graph/thgraphmaster.cpp index 8d9f85a2001..f4dd1734768 100644 --- a/thorlcr/graph/thgraphmaster.cpp +++ b/thorlcr/graph/thgraphmaster.cpp @@ -680,12 +680,14 @@ void CMasterActivity::updateFileReadCostStats() if (fileStats.size()>0) { + ThorActivityKind activityKind = container.getKind(); unsigned fileIndex = 0; - for (unsigned i=0; i> fileStats; + // fileStats is mutable as it is updated by gatherActiveStats (const member func) + // fileStats is in this base class as it used by multiple derived classes (both slave and master) but not all. + // (Having it in the base class aids setup and resizing.) + mutable std::vector> fileStats; protected: unsigned __int64 queryLocalCycles() const; diff --git a/thorlcr/thorutil/thormisc.cpp b/thorlcr/thorutil/thormisc.cpp index 9eb7d43f8bf..c82f2bce2a4 100644 --- a/thorlcr/thorutil/thormisc.cpp +++ b/thorlcr/thorutil/thormisc.cpp @@ -78,7 +78,8 @@ const StatisticsMapping soapcallStatistics({StTimeSoapcall}); const StatisticsMapping basicActivityStatistics({StTimeTotalExecute, StTimeLocalExecute, StTimeBlocked}); const StatisticsMapping groupActivityStatistics({StNumGroups, StNumGroupMax}, basicActivityStatistics); const StatisticsMapping hashJoinActivityStatistics({StNumLeftRows, StNumRightRows}, basicActivityStatistics); -const StatisticsMapping indexReadActivityStatistics({StNumRowsProcessed}, diskReadRemoteStatistics, basicActivityStatistics, jhtreeCacheStatistics); +const StatisticsMapping indexReadFileStatistics(diskReadRemoteStatistics, jhtreeCacheStatistics); +const StatisticsMapping indexReadActivityStatistics({StNumRowsProcessed}, indexReadFileStatistics, basicActivityStatistics); const StatisticsMapping indexWriteActivityStatistics({StPerReplicated, StNumLeafCacheAdds, StNumNodeCacheAdds, StNumBlobCacheAdds }, basicActivityStatistics, diskWriteRemoteStatistics); const StatisticsMapping keyedJoinActivityStatistics({ StNumIndexAccepted, StNumPreFiltered, StNumDiskSeeks, StNumDiskAccepted, StNumDiskRejected}, basicActivityStatistics, jhtreeCacheStatistics); const StatisticsMapping loopActivityStatistics({StNumIterations}, basicActivityStatistics); diff --git a/thorlcr/thorutil/thormisc.hpp b/thorlcr/thorutil/thormisc.hpp index ee667d4246c..5801109bf7c 100644 --- a/thorlcr/thorutil/thormisc.hpp +++ b/thorlcr/thorutil/thormisc.hpp @@ -154,6 +154,7 @@ extern graph_decl const StatisticsMapping graphStatistics; extern graph_decl const StatisticsMapping indexDistribActivityStatistics; extern graph_decl const StatisticsMapping soapcallActivityStatistics; +extern graph_decl const StatisticsMapping indexReadFileStatistics; class BooleanOnOff { bool &tf;