diff --git a/thorlcr/activities/indexread/thindexreadslave.cpp b/thorlcr/activities/indexread/thindexreadslave.cpp index b4c7d0cd7cc..01bfd85b8e4 100644 --- a/thorlcr/activities/indexread/thindexreadslave.cpp +++ b/thorlcr/activities/indexread/thindexreadslave.cpp @@ -44,6 +44,7 @@ class CIndexReadSlaveBase : public CSlaveActivity protected: StringAttr logicalFilename; IArrayOf partDescs; + bool isSuperFile = false; IHThorIndexReadBaseArg *helper; IHThorSourceLimitTransformExtra * limitTransformExtra; Owned allocator; @@ -76,10 +77,9 @@ class CIndexReadSlaveBase : public CSlaveActivity rowcount_t rowLimit = RCMAX; bool useRemoteStreaming = false; Owned lazyIFileIO; - mutable CriticalSection ioStatsCS; + mutable CriticalSection keyManagersCS; // CS for any updates to keyManagers unsigned fileTableStart = NotFound; - CStatsContextLogger contextLogger; - CStatsCtxLoggerDeltaUpdater statsUpdater; + std::vector> contextLoggers; class TransformCallback : implements IThorIndexCallback , public CSimpleInterface { @@ -98,7 +98,7 @@ class CIndexReadSlaveBase : public CSlaveActivity if (!keyManager) throw MakeActivityException(&activity, 0, "Callback attempting to read blob with no key manager - index being read remotely?"); needsBlobCleaning = true; - return (byte *) keyManager->loadBlob(id, dummy, &activity.contextLogger); + return (byte *) keyManager->loadBlob(id, dummy, nullptr); } void prepareManager(IKeyManager *_keyManager) { @@ -296,7 +296,7 @@ class CIndexReadSlaveBase : public CSlaveActivity part.queryOwner().getClusterLabel(0, planeName); blockedSize = getBlockedFileIOSize(planeName); } - lazyIFileIO.setown(queryThor().queryFileCache().lookupIFileIO(*this, logicalFilename, part, nullptr, indexReadActivityStatistics, blockedSize)); + lazyIFileIO.setown(queryThor().queryFileCache().lookupIFileIO(*this, logicalFilename, part, nullptr, indexReadFileStatistics, blockedSize)); RemoteFilename rfn; part.getFilename(0, rfn); @@ -304,7 +304,7 @@ class CIndexReadSlaveBase : public CSlaveActivity rfn.getPath(path); // NB: use for tracing only, IDelayedFile uses IPartDescriptor and any copy Owned keyIndex = createKeyIndex(path, crc, *lazyIFileIO, (unsigned) -1, false); - Owned klManager = createLocalKeyManager(helper->queryDiskRecordSize()->queryRecordAccessor(true), keyIndex, &contextLogger, helper->hasNewSegmentMonitors(), false); + Owned klManager = createLocalKeyManager(helper->queryDiskRecordSize()->queryRecordAccessor(true), keyIndex, contextLoggers[p], helper->hasNewSegmentMonitors(), false); if (localMerge) { if (!keyIndexSet) @@ -315,7 +315,10 @@ class CIndexReadSlaveBase : public CSlaveActivity translators.append(translator.getClear()); } keyIndexSet->addIndex(keyIndex.getClear()); - keyManagers.append(*klManager.getLink()); + { + CriticalBlock b(keyManagersCS); + keyManagers.append(*klManager.getLink()); + } keyManager = klManager; } else @@ -325,13 +328,17 @@ class CIndexReadSlaveBase : public CSlaveActivity if (translator) klManager->setLayoutTranslator(&translator->queryTranslator()); translators.append(translator.getClear()); - keyManagers.append(*klManager.getLink()); + { + CriticalBlock b(keyManagersCS); + keyManagers.append(*klManager.getLink()); + } keyManager = klManager; partNum = p; return createIndexLookup(keyManager); } } - keyMergerManager.setown(createKeyMerger(helper->queryDiskRecordSize()->queryRecordAccessor(true), keyIndexSet, seekGEOffset, &contextLogger, helper->hasNewSegmentMonitors(), false)); + //Not tracking jhtree cache stats in KeyMerger at the moment. Future: something to consider + keyMergerManager.setown(createKeyMerger(helper->queryDiskRecordSize()->queryRecordAccessor(true), keyIndexSet, seekGEOffset, nullptr, helper->hasNewSegmentMonitors(), false)); const ITranslator *translator = translators.item(0); if (translator) keyMergerManager->setLayoutTranslator(&translator->queryTranslator()); @@ -348,40 +355,12 @@ class CIndexReadSlaveBase : public CSlaveActivity else return nullptr; } - void mergeFileStats(IPartDescriptor *partDesc, IFileIO *partIO) - { - if (fileStats.size()>0) - { - ISuperFileDescriptor * superFDesc = partDesc->queryOwner().querySuperFileDescriptor(); - if (superFDesc) - { - unsigned subfile, lnum; - if(superFDesc->mapSubPart(partDesc->queryPartIndex(), subfile, lnum)) - mergeStats(*fileStats[fileTableStart+subfile], partIO); - } - else - mergeStats(*fileStats[fileTableStart], partIO); - } - } - void updateStats() - { - // NB: updateStats() should always be called whilst ioStatsCS is held. - if (lazyIFileIO) - { - mergeStats(inactiveStats, lazyIFileIO); - if (currentPartqueryHelper(); limitTransformExtra = nullptr; @@ -555,7 +533,6 @@ class CIndexReadSlaveBase : public CSlaveActivity break; if (keyManager) prepareManager(keyManager); - CStatsScopedThresholdDeltaUpdater scoped(statsUpdater); if (hard) // checkCount checks hard key count only. count += indexInput->checkCount(keyedLimit-count); // part max, is total limit [keyedLimit] minus total so far [count] else @@ -589,7 +566,10 @@ class CIndexReadSlaveBase : public CSlaveActivity partDescs.kill(); keyIndexSet.clear(); translators.kill(); - keyManagers.kill(); + { + CriticalBlock b(keyManagersCS); + keyManagers.kill(); + } keyMergerManager.clear(); } else @@ -607,6 +587,7 @@ class CIndexReadSlaveBase : public CSlaveActivity IPartDescriptor &part0 = partDescs.item(0); IFileDescriptor &fileDesc = part0.queryOwner(); ISuperFileDescriptor *super = fileDesc.querySuperFileDescriptor(); + isSuperFile = super != nullptr; if ((0 == (helper->getFlags() & TIRusesblob)) && !localMerge) { @@ -684,7 +665,15 @@ class CIndexReadSlaveBase : public CSlaveActivity } } data.read(fileTableStart); - setupSpace4FileStats(fileTableStart, reInit, super!=nullptr, super?super->querySubFiles():0, indexReadActivityStatistics); + setupSpace4FileStats(fileTableStart, reInit, isSuperFile, isSuperFile?super->querySubFiles():0, indexReadFileStatistics); + // One contextLoggers per part required: + // 1) superfile: multiple contextLoggers required to allow stats to be tracked at subfile level + // 2) non-superfile: although all stats doesn't need to be tracked at part level, having separate contextLoggers is needed to + // merge both io stats and jhtree stats. (Without multiple contextLoggers, when ForEach(keyManagers)mergeStats() is called, + // the same jhtree stats would be merged multiple times. And ForEach(keyManagers)->mergeStats needs to be called to ensure + // that io stats are merged) + for(unsigned i = 0; i < parts; ++i) + contextLoggers.push_back(new CStatsContextLogger(jhtreeCacheStatistics, thorJob)); } } // IThorDataLink @@ -719,10 +708,39 @@ class CIndexReadSlaveBase : public CSlaveActivity virtual void gatherActiveStats(CRuntimeStatisticCollection &activeStats) const { PARENT::gatherActiveStats(activeStats); + if (partDescs.ordinality()) { - CriticalBlock b(ioStatsCS); - if (lazyIFileIO) - mergeStats(activeStats, lazyIFileIO); + // reset required because within loop below, mergeStats() is used to build up stats for each file + for (auto & fileStatItem: fileStats) + fileStatItem->reset(); + ISuperFileDescriptor *superFDesc = partDescs.item(0).queryOwner().querySuperFileDescriptor(); + for (unsigned partNum=0; partNummapSubPart(partDescs.item(partNum).queryPartIndex(), subfile, lnum)) + continue; // should not happen + } + IKeyManager * keyManager; + { + CriticalBlock b(keyManagersCS); + if (!keyManagers.isItem(partNum)) + continue; + keyManager = &keyManagers.item(partNum); + } + if (fileStats.size()>0) + keyManager->mergeStats(*fileStats[fileTableStart+subfile]); + else + keyManager->mergeStats(activeStats); // when just 1 file, merge into activeStats + } + // fileStats[] will be serialized separately so file level stats are tracked (see serializeStats() below) + // Also, merged into the activeStats for activity level stats + for (auto & fileStatItem: fileStats) + activeStats.merge(*fileStatItem); } activeStats.setStatistic(StNumRowsProcessed, progress); } @@ -733,15 +751,6 @@ class CIndexReadSlaveBase : public CSlaveActivity for (auto &stats: fileStats) stats->serialize(mb); } - virtual void done() override - { - { - CriticalBlock b(ioStatsCS); - updateStats(); - lazyIFileIO.clear(); - } - PARENT::done(); - } }; class CIndexReadSlaveActivity : public CIndexReadSlaveBase @@ -819,7 +828,6 @@ class CIndexReadSlaveActivity : public CIndexReadSlaveBase helper->mapOutputToInput(tempBuilder, seek, numFields); // NOTE - weird interface to mapOutputToInput means that it STARTS writing at seekGEOffset... rawSeek = (byte *)temp; } - CStatsScopedThresholdDeltaUpdater scoped(statsUpdater); if (!currentManager->lookupSkip(rawSeek, seekGEOffset, seekSize)) return NULL; const byte *row = currentManager->queryKeyBuffer(); @@ -972,7 +980,6 @@ class CIndexReadSlaveActivity : public CIndexReadSlaveBase // IRowStream virtual void stop() override { - CStatsScopedDeltaUpdater scoped(statsUpdater); if (RCMAX != keyedLimit) // NB: will not be true if nextRow() has handled { keyedLimitCount = sendGetCount(keyedProcessed); @@ -1142,7 +1149,6 @@ class CIndexGroupAggregateSlaveActivity : public CIndexReadSlaveBase, implements if (keyManager) prepareManager(keyManager); - CStatsScopedThresholdDeltaUpdater scoped(statsUpdater); while (true) { const void *key = indexInput->nextKey(); @@ -1301,7 +1307,6 @@ class CIndexCountSlaveActivity : public CIndexReadSlaveBase if (keyManager) prepareManager(keyManager); - CStatsScopedThresholdDeltaUpdater scoped(statsUpdater); while (true) { const void *key = indexInput->nextKey();