Skip to content

Commit

Permalink
HPCC-30993 Fix file access for index read activity
Browse files Browse the repository at this point in the history
Signed-off-by: Shamser Ahmed <[email protected]>
  • Loading branch information
jakesmith authored and shamser committed Dec 13, 2023
1 parent 8681024 commit e7c67fe
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 61 deletions.
11 changes: 9 additions & 2 deletions system/jhtree/jhtree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ class jhtree_decl CKeyLevelManager : implements IKeyManager, public CInterface
CKeyLevelManager(const RtlRecord &_recInfo, IKeyIndex * _key, IContextLogger *_ctx, bool _newFilters, bool _logExcessiveSeeks)
: stats(_ctx), newFilters(_newFilters), logExcessiveSeeks(_logExcessiveSeeks)
{
DBGLOG("CKeyLevelManager this %p ctx %p", this, _ctx);
if (newFilters)
filter.setown(new IndexRowFilter(_recInfo));
else
Expand Down Expand Up @@ -497,6 +498,7 @@ class jhtree_decl CKeyLevelManager : implements IKeyManager, public CInterface

virtual bool lookup(bool exact)
{
DBGLOG("CKeyLevelManager this %p ctx %p", this, stats.ctx);
if (keyCursor)
return keyCursor->lookup(exact, stats);
else
Expand Down Expand Up @@ -2642,8 +2644,13 @@ const CJHTreeNode *CNodeCache::getNode(const INodeLoader *keyIndex, unsigned iD,
}
else
{
if (ctx) ctx->noteStatistic(addStatId[cacheType], 1);
(*addMetric[cacheType])++;
try {
if (ctx) ctx->noteStatistic(addStatId[cacheType], 1);
(*addMetric[cacheType])++;
} catch (...)
{
DBGLOG("CNodeCache::getNode ctx %p", ctx);
}
}

//The common case is that this flag has already been set (by a previous add).
Expand Down
130 changes: 71 additions & 59 deletions thorlcr/activities/indexread/thindexreadslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class CIndexReadSlaveBase : public CSlaveActivity
protected:
StringAttr logicalFilename;
IArrayOf<IPartDescriptor> partDescs;
bool isSuperFile = false;
IHThorIndexReadBaseArg *helper;
IHThorSourceLimitTransformExtra * limitTransformExtra;
Owned<IEngineRowAllocator> allocator;
Expand Down Expand Up @@ -76,10 +77,9 @@ class CIndexReadSlaveBase : public CSlaveActivity
rowcount_t rowLimit = RCMAX;
bool useRemoteStreaming = false;
Owned<IFileIO> lazyIFileIO;
mutable CriticalSection ioStatsCS;
mutable CriticalSection keyManagersCS; // CS for any updates to keyManagers
unsigned fileTableStart = NotFound;
CStatsContextLogger contextLogger;
CStatsCtxLoggerDeltaUpdater statsUpdater;
std::vector<Owned<CStatsContextLogger>> contextLoggers;

class TransformCallback : implements IThorIndexCallback , public CSimpleInterface
{
Expand All @@ -98,7 +98,7 @@ class CIndexReadSlaveBase : public CSlaveActivity
if (!keyManager)
throw MakeActivityException(&activity, 0, "Callback attempting to read blob with no key manager - index being read remotely?");
needsBlobCleaning = true;
return (byte *) keyManager->loadBlob(id, dummy, &activity.contextLogger);
return (byte *) keyManager->loadBlob(id, dummy, nullptr);
}
void prepareManager(IKeyManager *_keyManager)
{
Expand Down Expand Up @@ -166,10 +166,9 @@ class CIndexReadSlaveBase : public CSlaveActivity
unsigned projectedFormatCrc = helper->getProjectedFormatCrc();
IOutputMetaData *projectedFormat = helper->queryProjectedDiskRecordSize();

unsigned p = partNum;
while (p<partDescs.ordinality()) // will process all parts if localMerge
for (unsigned p = partNum; p<partDescs.ordinality(); p++) // will process all parts if localMerge
{
IPartDescriptor &part = partDescs.item(p++);
IPartDescriptor &part = partDescs.item(p);
unsigned crc=0;
part.getCrc(crc);

Expand Down Expand Up @@ -273,7 +272,7 @@ class CIndexReadSlaveBase : public CSlaveActivity
continue; // try next copy and ultimately failover to local when no more copies
}
ActPrintLog("[part=%d]: reading remote dafilesrv index '%s' (logical file = %s)", partNum, path.str(), logicalFilename.get());
partNum = p;
partNum = (p+1);
return indexLookup.getClear();
}
}
Expand All @@ -296,15 +295,15 @@ class CIndexReadSlaveBase : public CSlaveActivity
part.queryOwner().getClusterLabel(0, planeName);
blockedSize = getBlockedFileIOSize(planeName);
}
lazyIFileIO.setown(queryThor().queryFileCache().lookupIFileIO(*this, logicalFilename, part, nullptr, indexReadActivityStatistics, blockedSize));
lazyIFileIO.setown(queryThor().queryFileCache().lookupIFileIO(*this, logicalFilename, part, nullptr, indexReadFileStatistics, blockedSize));

RemoteFilename rfn;
part.getFilename(0, rfn);
StringBuffer path;
rfn.getPath(path); // NB: use for tracing only, IDelayedFile uses IPartDescriptor and any copy

Owned<IKeyIndex> keyIndex = createKeyIndex(path, crc, *lazyIFileIO, (unsigned) -1, false);
Owned<IKeyManager> klManager = createLocalKeyManager(helper->queryDiskRecordSize()->queryRecordAccessor(true), keyIndex, &contextLogger, helper->hasNewSegmentMonitors(), false);
Owned<IKeyManager> klManager = createLocalKeyManager(helper->queryDiskRecordSize()->queryRecordAccessor(true), keyIndex, contextLoggers[p], helper->hasNewSegmentMonitors(), false);
if (localMerge)
{
if (!keyIndexSet)
Expand All @@ -315,7 +314,10 @@ class CIndexReadSlaveBase : public CSlaveActivity
translators.append(translator.getClear());
}
keyIndexSet->addIndex(keyIndex.getClear());
keyManagers.append(*klManager.getLink());
{
CriticalBlock b(keyManagersCS);
keyManagers.append(*klManager.getLink());
}
keyManager = klManager;
}
else
Expand All @@ -325,13 +327,17 @@ class CIndexReadSlaveBase : public CSlaveActivity
if (translator)
klManager->setLayoutTranslator(&translator->queryTranslator());
translators.append(translator.getClear());
keyManagers.append(*klManager.getLink());
{
CriticalBlock b(keyManagersCS);
keyManagers.append(*klManager.getLink());
}
keyManager = klManager;
partNum = p;
partNum = (p+1);
return createIndexLookup(keyManager);
}
}
keyMergerManager.setown(createKeyMerger(helper->queryDiskRecordSize()->queryRecordAccessor(true), keyIndexSet, seekGEOffset, &contextLogger, helper->hasNewSegmentMonitors(), false));
//Not tracking jhtree cache stats in KeyMerger at the moment. Future: something to consider
keyMergerManager.setown(createKeyMerger(helper->queryDiskRecordSize()->queryRecordAccessor(true), keyIndexSet, seekGEOffset, nullptr, helper->hasNewSegmentMonitors(), false));
const ITranslator *translator = translators.item(0);
if (translator)
keyMergerManager->setLayoutTranslator(&translator->queryTranslator());
Expand All @@ -348,40 +354,12 @@ class CIndexReadSlaveBase : public CSlaveActivity
else
return nullptr;
}
void mergeFileStats(IPartDescriptor *partDesc, IFileIO *partIO)
{
if (fileStats.size()>0)
{
ISuperFileDescriptor * superFDesc = partDesc->queryOwner().querySuperFileDescriptor();
if (superFDesc)
{
unsigned subfile, lnum;
if(superFDesc->mapSubPart(partDesc->queryPartIndex(), subfile, lnum))
mergeStats(*fileStats[fileTableStart+subfile], partIO);
}
else
mergeStats(*fileStats[fileTableStart], partIO);
}
}
void updateStats()
{
// NB: updateStats() should always be called whilst ioStatsCS is held.
if (lazyIFileIO)
{
mergeStats(inactiveStats, lazyIFileIO);
if (currentPart<partDescs.ordinality())
mergeFileStats(&partDescs.item(currentPart), lazyIFileIO);
}
}
void configureNextInput()
{
if (currentManager)
{
resetManager(currentManager);
currentManager = nullptr;

CriticalBlock b(ioStatsCS);
updateStats();
lazyIFileIO.clear();
}
IKeyManager *keyManager = nullptr;
Expand Down Expand Up @@ -433,7 +411,6 @@ class CIndexReadSlaveBase : public CSlaveActivity
if (eoi)
return nullptr;
dbgassertex(currentInput);
CStatsScopedThresholdDeltaUpdater scoped(statsUpdater);
const void *ret = nullptr;
while (true)
{
Expand Down Expand Up @@ -528,7 +505,7 @@ class CIndexReadSlaveBase : public CSlaveActivity
}
public:
CIndexReadSlaveBase(CGraphElementBase *container)
: CSlaveActivity(container, indexReadActivityStatistics), callback(*this), contextLogger(jhtreeCacheStatistics, thorJob), statsUpdater(jhtreeCacheStatistics, *this, contextLogger)
: CSlaveActivity(container, indexReadActivityStatistics), callback(*this)
{
helper = (IHThorIndexReadBaseArg *)container->queryHelper();
limitTransformExtra = nullptr;
Expand All @@ -555,7 +532,6 @@ class CIndexReadSlaveBase : public CSlaveActivity
break;
if (keyManager)
prepareManager(keyManager);
CStatsScopedThresholdDeltaUpdater scoped(statsUpdater);
if (hard) // checkCount checks hard key count only.
count += indexInput->checkCount(keyedLimit-count); // part max, is total limit [keyedLimit] minus total so far [count]
else
Expand Down Expand Up @@ -589,7 +565,10 @@ class CIndexReadSlaveBase : public CSlaveActivity
partDescs.kill();
keyIndexSet.clear();
translators.kill();
keyManagers.kill();
{
CriticalBlock b(keyManagersCS);
keyManagers.kill();
}
keyMergerManager.clear();
}
else
Expand All @@ -607,6 +586,7 @@ class CIndexReadSlaveBase : public CSlaveActivity
IPartDescriptor &part0 = partDescs.item(0);
IFileDescriptor &fileDesc = part0.queryOwner();
ISuperFileDescriptor *super = fileDesc.querySuperFileDescriptor();
isSuperFile = super != nullptr;

if ((0 == (helper->getFlags() & TIRusesblob)) && !localMerge)
{
Expand Down Expand Up @@ -684,7 +664,15 @@ class CIndexReadSlaveBase : public CSlaveActivity
}
}
data.read(fileTableStart);
setupSpace4FileStats(fileTableStart, reInit, super!=nullptr, super?super->querySubFiles():0, indexReadActivityStatistics);
setupSpace4FileStats(fileTableStart, reInit, isSuperFile, isSuperFile?super->querySubFiles():0, indexReadFileStatistics);
// One contextLoggers per part required:
// 1) superfile: multiple contextLoggers required to allow stats to be tracked at subfile level
// 2) non-superfile: although all stats doesn't need to be tracked at part level, having separate contextLoggers is needed to
// merge both io stats and jhtree stats. (Without multiple contextLoggers, when ForEach(keyManagers)mergeStats() is called,
// the same jhtree stats would be merged multiple times. And ForEach(keyManagers)->mergeStats needs to be called to ensure
// that io stats are merged)
for(unsigned i = 0; i < parts; ++i)
contextLoggers.push_back(new CStatsContextLogger(jhtreeCacheStatistics, thorJob));
}
}
// IThorDataLink
Expand Down Expand Up @@ -719,10 +707,42 @@ class CIndexReadSlaveBase : public CSlaveActivity
virtual void gatherActiveStats(CRuntimeStatisticCollection &activeStats) const
{
PARENT::gatherActiveStats(activeStats);
if (partDescs.ordinality())
{
CriticalBlock b(ioStatsCS);
if (lazyIFileIO)
mergeStats(activeStats, lazyIFileIO);
// reset required because within loop below, mergeStats() is used to build up stats for each file
for (auto & fileStatItem: fileStats)
fileStatItem->reset();
ISuperFileDescriptor *superFDesc = partDescs.item(0).queryOwner().querySuperFileDescriptor();
for (unsigned partNum=0; partNum<partDescs.ordinality(); partNum++)
{
// If it is a superfile, track stats at subfile level with fileStats[fileTableStart+subfile]
// if not, one set of stats for the whole file with fileStats[fileTableStart]
unsigned subfile = 0;
if (isSuperFile) // if it's not superfile, subfile is always 0 (otherwise it is the subfile number)
{
unsigned lnum;
if(!superFDesc->mapSubPart(partDescs.item(partNum).queryPartIndex(), subfile, lnum))
continue; // should not happen
}
IKeyManager * keyManager;
{
CriticalBlock b(keyManagersCS);
if (!keyManagers.isItem(partNum))
continue;
keyManager = &keyManagers.item(partNum);
}
if (fileStats.size()>0)
{
CRuntimeStatisticCollection * fileStatItem = fileStats[fileTableStart+subfile];
keyManager->mergeStats(*fileStatItem); // for file level stats
activeStats.merge(*fileStatItem); // for activity level stats
}
else
{
// when just 1 file, merge into activeStats (can use activeStats for file level stats)
keyManager->mergeStats(activeStats);
}
}
}
activeStats.setStatistic(StNumRowsProcessed, progress);
}
Expand All @@ -735,11 +755,7 @@ class CIndexReadSlaveBase : public CSlaveActivity
}
virtual void done() override
{
{
CriticalBlock b(ioStatsCS);
updateStats();
lazyIFileIO.clear();
}
lazyIFileIO.clear();
PARENT::done();
}
};
Expand Down Expand Up @@ -819,7 +835,6 @@ class CIndexReadSlaveActivity : public CIndexReadSlaveBase
helper->mapOutputToInput(tempBuilder, seek, numFields); // NOTE - weird interface to mapOutputToInput means that it STARTS writing at seekGEOffset...
rawSeek = (byte *)temp;
}
CStatsScopedThresholdDeltaUpdater scoped(statsUpdater);
if (!currentManager->lookupSkip(rawSeek, seekGEOffset, seekSize))
return NULL;
const byte *row = currentManager->queryKeyBuffer();
Expand Down Expand Up @@ -972,7 +987,6 @@ class CIndexReadSlaveActivity : public CIndexReadSlaveBase
// IRowStream
virtual void stop() override
{
CStatsScopedDeltaUpdater scoped(statsUpdater);
if (RCMAX != keyedLimit) // NB: will not be true if nextRow() has handled
{
keyedLimitCount = sendGetCount(keyedProcessed);
Expand Down Expand Up @@ -1142,7 +1156,6 @@ class CIndexGroupAggregateSlaveActivity : public CIndexReadSlaveBase, implements
if (keyManager)
prepareManager(keyManager);

CStatsScopedThresholdDeltaUpdater scoped(statsUpdater);
while (true)
{
const void *key = indexInput->nextKey();
Expand Down Expand Up @@ -1301,7 +1314,6 @@ class CIndexCountSlaveActivity : public CIndexReadSlaveBase
if (keyManager)
prepareManager(keyManager);

CStatsScopedThresholdDeltaUpdater scoped(statsUpdater);
while (true)
{
const void *key = indexInput->nextKey();
Expand Down

0 comments on commit e7c67fe

Please sign in to comment.