diff --git a/system/jhtree/jhtree.cpp b/system/jhtree/jhtree.cpp index 4a245bb24a7..0a2944cbf53 100644 --- a/system/jhtree/jhtree.cpp +++ b/system/jhtree/jhtree.cpp @@ -370,6 +370,7 @@ class jhtree_decl CKeyLevelManager : implements IKeyManager, public CInterface CKeyLevelManager(const RtlRecord &_recInfo, IKeyIndex * _key, IContextLogger *_ctx, bool _newFilters, bool _logExcessiveSeeks) : stats(_ctx), newFilters(_newFilters), logExcessiveSeeks(_logExcessiveSeeks) { + DBGLOG("CKeyLevelManager this %p ctx %p", this, _ctx); if (newFilters) filter.setown(new IndexRowFilter(_recInfo)); else @@ -497,6 +498,7 @@ class jhtree_decl CKeyLevelManager : implements IKeyManager, public CInterface virtual bool lookup(bool exact) { + DBGLOG("CKeyLevelManager this %p ctx %p", this, stats.ctx); if (keyCursor) return keyCursor->lookup(exact, stats); else @@ -2642,8 +2644,13 @@ const CJHTreeNode *CNodeCache::getNode(const INodeLoader *keyIndex, unsigned iD, } else { - if (ctx) ctx->noteStatistic(addStatId[cacheType], 1); - (*addMetric[cacheType])++; + try { + if (ctx) ctx->noteStatistic(addStatId[cacheType], 1); + (*addMetric[cacheType])++; + } catch (...) + { + DBGLOG("CNodeCache::getNode ctx %p", ctx); + } } //The common case is that this flag has already been set (by a previous add). diff --git a/thorlcr/activities/indexread/thindexreadslave.cpp b/thorlcr/activities/indexread/thindexreadslave.cpp index b4c7d0cd7cc..e4b13a4a425 100644 --- a/thorlcr/activities/indexread/thindexreadslave.cpp +++ b/thorlcr/activities/indexread/thindexreadslave.cpp @@ -44,6 +44,7 @@ class CIndexReadSlaveBase : public CSlaveActivity protected: StringAttr logicalFilename; IArrayOf partDescs; + bool isSuperFile = false; IHThorIndexReadBaseArg *helper; IHThorSourceLimitTransformExtra * limitTransformExtra; Owned allocator; @@ -76,10 +77,9 @@ class CIndexReadSlaveBase : public CSlaveActivity rowcount_t rowLimit = RCMAX; bool useRemoteStreaming = false; Owned lazyIFileIO; - mutable CriticalSection ioStatsCS; + mutable CriticalSection keyManagersCS; // CS for any updates to keyManagers unsigned fileTableStart = NotFound; - CStatsContextLogger contextLogger; - CStatsCtxLoggerDeltaUpdater statsUpdater; + std::vector> contextLoggers; class TransformCallback : implements IThorIndexCallback , public CSimpleInterface { @@ -98,7 +98,7 @@ class CIndexReadSlaveBase : public CSlaveActivity if (!keyManager) throw MakeActivityException(&activity, 0, "Callback attempting to read blob with no key manager - index being read remotely?"); needsBlobCleaning = true; - return (byte *) keyManager->loadBlob(id, dummy, &activity.contextLogger); + return (byte *) keyManager->loadBlob(id, dummy, nullptr); } void prepareManager(IKeyManager *_keyManager) { @@ -166,10 +166,9 @@ class CIndexReadSlaveBase : public CSlaveActivity unsigned projectedFormatCrc = helper->getProjectedFormatCrc(); IOutputMetaData *projectedFormat = helper->queryProjectedDiskRecordSize(); - unsigned p = partNum; - while (p keyIndex = createKeyIndex(path, crc, *lazyIFileIO, (unsigned) -1, false); - Owned klManager = createLocalKeyManager(helper->queryDiskRecordSize()->queryRecordAccessor(true), keyIndex, &contextLogger, helper->hasNewSegmentMonitors(), false); + Owned klManager = createLocalKeyManager(helper->queryDiskRecordSize()->queryRecordAccessor(true), keyIndex, contextLoggers[p], helper->hasNewSegmentMonitors(), false); if (localMerge) { if (!keyIndexSet) @@ -315,7 +314,10 @@ class CIndexReadSlaveBase : public CSlaveActivity translators.append(translator.getClear()); } keyIndexSet->addIndex(keyIndex.getClear()); - keyManagers.append(*klManager.getLink()); + { + CriticalBlock b(keyManagersCS); + keyManagers.append(*klManager.getLink()); + } keyManager = klManager; } else @@ -325,13 +327,17 @@ class CIndexReadSlaveBase : public CSlaveActivity if (translator) klManager->setLayoutTranslator(&translator->queryTranslator()); translators.append(translator.getClear()); - keyManagers.append(*klManager.getLink()); + { + CriticalBlock b(keyManagersCS); + keyManagers.append(*klManager.getLink()); + } keyManager = klManager; - partNum = p; + partNum = (p+1); return createIndexLookup(keyManager); } } - keyMergerManager.setown(createKeyMerger(helper->queryDiskRecordSize()->queryRecordAccessor(true), keyIndexSet, seekGEOffset, &contextLogger, helper->hasNewSegmentMonitors(), false)); + //Not tracking jhtree cache stats in KeyMerger at the moment. Future: something to consider + keyMergerManager.setown(createKeyMerger(helper->queryDiskRecordSize()->queryRecordAccessor(true), keyIndexSet, seekGEOffset, nullptr, helper->hasNewSegmentMonitors(), false)); const ITranslator *translator = translators.item(0); if (translator) keyMergerManager->setLayoutTranslator(&translator->queryTranslator()); @@ -348,40 +354,12 @@ class CIndexReadSlaveBase : public CSlaveActivity else return nullptr; } - void mergeFileStats(IPartDescriptor *partDesc, IFileIO *partIO) - { - if (fileStats.size()>0) - { - ISuperFileDescriptor * superFDesc = partDesc->queryOwner().querySuperFileDescriptor(); - if (superFDesc) - { - unsigned subfile, lnum; - if(superFDesc->mapSubPart(partDesc->queryPartIndex(), subfile, lnum)) - mergeStats(*fileStats[fileTableStart+subfile], partIO); - } - else - mergeStats(*fileStats[fileTableStart], partIO); - } - } - void updateStats() - { - // NB: updateStats() should always be called whilst ioStatsCS is held. - if (lazyIFileIO) - { - mergeStats(inactiveStats, lazyIFileIO); - if (currentPartqueryHelper(); limitTransformExtra = nullptr; @@ -555,7 +532,6 @@ class CIndexReadSlaveBase : public CSlaveActivity break; if (keyManager) prepareManager(keyManager); - CStatsScopedThresholdDeltaUpdater scoped(statsUpdater); if (hard) // checkCount checks hard key count only. count += indexInput->checkCount(keyedLimit-count); // part max, is total limit [keyedLimit] minus total so far [count] else @@ -589,7 +565,10 @@ class CIndexReadSlaveBase : public CSlaveActivity partDescs.kill(); keyIndexSet.clear(); translators.kill(); - keyManagers.kill(); + { + CriticalBlock b(keyManagersCS); + keyManagers.kill(); + } keyMergerManager.clear(); } else @@ -607,6 +586,7 @@ class CIndexReadSlaveBase : public CSlaveActivity IPartDescriptor &part0 = partDescs.item(0); IFileDescriptor &fileDesc = part0.queryOwner(); ISuperFileDescriptor *super = fileDesc.querySuperFileDescriptor(); + isSuperFile = super != nullptr; if ((0 == (helper->getFlags() & TIRusesblob)) && !localMerge) { @@ -684,7 +664,15 @@ class CIndexReadSlaveBase : public CSlaveActivity } } data.read(fileTableStart); - setupSpace4FileStats(fileTableStart, reInit, super!=nullptr, super?super->querySubFiles():0, indexReadActivityStatistics); + setupSpace4FileStats(fileTableStart, reInit, isSuperFile, isSuperFile?super->querySubFiles():0, indexReadFileStatistics); + // One contextLoggers per part required: + // 1) superfile: multiple contextLoggers required to allow stats to be tracked at subfile level + // 2) non-superfile: although all stats doesn't need to be tracked at part level, having separate contextLoggers is needed to + // merge both io stats and jhtree stats. (Without multiple contextLoggers, when ForEach(keyManagers)mergeStats() is called, + // the same jhtree stats would be merged multiple times. And ForEach(keyManagers)->mergeStats needs to be called to ensure + // that io stats are merged) + for(unsigned i = 0; i < parts; ++i) + contextLoggers.push_back(new CStatsContextLogger(jhtreeCacheStatistics, thorJob)); } } // IThorDataLink @@ -719,10 +707,42 @@ class CIndexReadSlaveBase : public CSlaveActivity virtual void gatherActiveStats(CRuntimeStatisticCollection &activeStats) const { PARENT::gatherActiveStats(activeStats); + if (partDescs.ordinality()) { - CriticalBlock b(ioStatsCS); - if (lazyIFileIO) - mergeStats(activeStats, lazyIFileIO); + // reset required because within loop below, mergeStats() is used to build up stats for each file + for (auto & fileStatItem: fileStats) + fileStatItem->reset(); + ISuperFileDescriptor *superFDesc = partDescs.item(0).queryOwner().querySuperFileDescriptor(); + for (unsigned partNum=0; partNummapSubPart(partDescs.item(partNum).queryPartIndex(), subfile, lnum)) + continue; // should not happen + } + IKeyManager * keyManager; + { + CriticalBlock b(keyManagersCS); + if (!keyManagers.isItem(partNum)) + continue; + keyManager = &keyManagers.item(partNum); + } + if (fileStats.size()>0) + { + CRuntimeStatisticCollection * fileStatItem = fileStats[fileTableStart+subfile]; + keyManager->mergeStats(*fileStatItem); // for file level stats + activeStats.merge(*fileStatItem); // for activity level stats + } + else + { + // when just 1 file, merge into activeStats (can use activeStats for file level stats) + keyManager->mergeStats(activeStats); + } + } } activeStats.setStatistic(StNumRowsProcessed, progress); } @@ -735,11 +755,7 @@ class CIndexReadSlaveBase : public CSlaveActivity } virtual void done() override { - { - CriticalBlock b(ioStatsCS); - updateStats(); - lazyIFileIO.clear(); - } + lazyIFileIO.clear(); PARENT::done(); } }; @@ -819,7 +835,6 @@ class CIndexReadSlaveActivity : public CIndexReadSlaveBase helper->mapOutputToInput(tempBuilder, seek, numFields); // NOTE - weird interface to mapOutputToInput means that it STARTS writing at seekGEOffset... rawSeek = (byte *)temp; } - CStatsScopedThresholdDeltaUpdater scoped(statsUpdater); if (!currentManager->lookupSkip(rawSeek, seekGEOffset, seekSize)) return NULL; const byte *row = currentManager->queryKeyBuffer(); @@ -972,7 +987,6 @@ class CIndexReadSlaveActivity : public CIndexReadSlaveBase // IRowStream virtual void stop() override { - CStatsScopedDeltaUpdater scoped(statsUpdater); if (RCMAX != keyedLimit) // NB: will not be true if nextRow() has handled { keyedLimitCount = sendGetCount(keyedProcessed); @@ -1142,7 +1156,6 @@ class CIndexGroupAggregateSlaveActivity : public CIndexReadSlaveBase, implements if (keyManager) prepareManager(keyManager); - CStatsScopedThresholdDeltaUpdater scoped(statsUpdater); while (true) { const void *key = indexInput->nextKey(); @@ -1301,7 +1314,6 @@ class CIndexCountSlaveActivity : public CIndexReadSlaveBase if (keyManager) prepareManager(keyManager); - CStatsScopedThresholdDeltaUpdater scoped(statsUpdater); while (true) { const void *key = indexInput->nextKey();