Skip to content

Commit

Permalink
HPCC-30599 Fix file access costs for keyed join
Browse files Browse the repository at this point in the history
Signed-off-by: Shamser Ahmed <[email protected]>
  • Loading branch information
jakesmith authored and shamser committed Nov 24, 2023
1 parent d00eb36 commit 9f160f7
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 25 deletions.
7 changes: 7 additions & 0 deletions dali/base/dadfs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,13 @@ extern da_decl double calcFileAccessCost(IDistributedFile *f, __int64 numDiskWri
return calcFileAccessCost(clusterName, numDiskWrites, numDiskReads);
}

extern da_decl cost_type calcFileAccessCost(IDistributedFile *f, __int64 numDiskWrites, __int64 numDiskReads)
{
StringBuffer clusterName;
f->getClusterName(0, clusterName);
return money2cost_type(calcFileAccessCost(clusterName, numDiskWrites, numDiskReads));
}

RemoteFilename &constructPartFilename(IGroup *grp,unsigned partno,unsigned partmax,const char *name,const char *partmask,const char *partdir,unsigned copy,ClusterPartDiskMapSpec &mspec,RemoteFilename &rfn)
{
partno--;
Expand Down
6 changes: 4 additions & 2 deletions system/jhtree/jhtree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -573,10 +573,12 @@ class jhtree_decl CKeyLevelManager : implements IKeyManager, public CInterface
filter->describe(out);
}

virtual void mergeStats(CRuntimeStatisticCollection & stats) const
virtual void mergeStats(CRuntimeStatisticCollection & targetStats) const
{
if (keyCursor)
keyCursor->mergeStats(stats);
keyCursor->mergeStats(targetStats);
if (stats.ctx)
targetStats.merge(stats.ctx->queryStats());
}
};

Expand Down
3 changes: 1 addition & 2 deletions system/jlib/jstats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1322,8 +1322,7 @@ const StatisticsMapping noStatistics({});
const StatisticsMapping jhtreeCacheStatistics({ StNumIndexSeeks, StNumIndexScans, StNumPostFiltered, StNumIndexWildSeeks,
StNumNodeCacheAdds, StNumLeafCacheAdds, StNumBlobCacheAdds, StNumNodeCacheHits, StNumLeafCacheHits, StNumBlobCacheHits, StCycleNodeLoadCycles, StCycleLeafLoadCycles,
StCycleBlobLoadCycles, StCycleNodeReadCycles, StCycleLeafReadCycles, StCycleBlobReadCycles, StNumNodeDiskFetches, StNumLeafDiskFetches, StNumBlobDiskFetches,
StCycleNodeFetchCycles, StCycleLeafFetchCycles, StCycleBlobFetchCycles, StCycleIndexCacheBlockedCycles, StNumIndexMergeCompares, StNumIndexMerges, StNumIndexSkips, StNumIndexNullSkips,
StTimeLeafLoad, StTimeLeafRead, StTimeLeafFetch, StTimeIndexCacheBlocked, StTimeNodeFetch});
StCycleNodeFetchCycles, StCycleLeafFetchCycles, StCycleBlobFetchCycles, StCycleIndexCacheBlockedCycles, StNumIndexMergeCompares, StNumIndexMerges, StNumIndexSkips, StNumIndexNullSkips, StTimeLeafLoad, StTimeLeafRead, StTimeLeafFetch, StTimeIndexCacheBlocked, StTimeNodeFetch, StTimeNodeLoad, StTimeNodeRead});

const StatisticsMapping allStatistics(StKindAll);
const StatisticsMapping heapStatistics({StNumAllocations, StNumAllocationScans});
Expand Down
8 changes: 6 additions & 2 deletions thorlcr/activities/indexread/thindexreadslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -350,17 +350,21 @@ class CIndexReadSlaveBase : public CSlaveActivity
}
void mergeFileStats(IPartDescriptor *partDesc, IFileIO *partIO)
{
if (!currentManager)
return;
if (fileStats.size()>0)
{
ISuperFileDescriptor * superFDesc = partDesc->queryOwner().querySuperFileDescriptor();
if (superFDesc)
{
unsigned subfile, lnum;
if(superFDesc->mapSubPart(partDesc->queryPartIndex(), subfile, lnum))
mergeStats(*fileStats[fileTableStart+subfile], partIO);
currentManager->mergeStats(*fileStats[fileTableStart+subfile]);
}
else
mergeStats(*fileStats[fileTableStart], partIO);
{
currentManager->mergeStats(*fileStats[fileTableStart]);
}
}
}
void updateStats()
Expand Down
31 changes: 19 additions & 12 deletions thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -959,6 +959,8 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
{
protected:
CKeyedJoinSlave &activity;
CStatsContextLogger contextLogger;
CStatsCtxLoggerDeltaUpdater statsUpdater;
IThorRowInterfaces *rowIf;
IHThorKeyedJoinArg *helper = nullptr;
std::vector<CThorExpandingRowArray *> queues;
Expand Down Expand Up @@ -995,7 +997,8 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
}
public:
CLookupHandler(CKeyedJoinSlave &_activity, IThorRowInterfaces *_rowIf, unsigned _batchProcessLimit) : threaded("CLookupHandler", this),
activity(_activity), rowIf(_rowIf), batchProcessLimit(_batchProcessLimit)
activity(_activity), rowIf(_rowIf), batchProcessLimit(_batchProcessLimit),
contextLogger(jhtreeCacheStatistics, thorJob), statsUpdater(jhtreeCacheStatistics, _activity, contextLogger)
{
helper = activity.helper;
}
Expand Down Expand Up @@ -1084,6 +1087,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
}
void stop()
{
CStatsScopedDeltaUpdater scoped(statsUpdater);
stopped = true;
join();
for (auto &queue : queues)
Expand Down Expand Up @@ -1155,6 +1159,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
}
void flush()
{
CStatsScopedThresholdDeltaUpdater scoped(statsUpdater);
// NB: queueLookup() must be protected from re-entry by caller
for (unsigned b=0; b<batchArrays.size(); b++)
{
Expand Down Expand Up @@ -1283,7 +1288,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
}
void processRows(CThorExpandingRowArray &processing, unsigned partNo, IKeyManager *keyManager)
{
CStatsScopedThresholdDeltaUpdater scoped(activity.statsUpdater);
CStatsScopedThresholdDeltaUpdater scoped(statsUpdater);
for (unsigned r=0; r<processing.ordinality() && !stopped; r++)
{
OwnedConstThorRow row = processing.getClear(r);
Expand All @@ -1308,7 +1313,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
joinGroup->setAtMostLimitHit(); // also clears existing rows
break;
}
KLBlobProviderAdapter adapter(keyManager, &activity.contextLogger);
KLBlobProviderAdapter adapter(keyManager, &contextLogger);
byte const * keyRow = keyManager->queryKeyBuffer();
size_t fposOffset = keyManager->queryRowSize() - sizeof(offset_t);
offset_t fpos = rtlReadBigUInt8(keyRow + fposOffset);
Expand Down Expand Up @@ -1381,7 +1386,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
std::atomic<IKeyManager *> & keyManager = (*keyManagers)[selected];
if (!keyManager) // delayed until actually needed
{
keyManager = activity.createPartKeyManager(partNo, copy);
keyManager = activity.createPartKeyManager(partNo, copy, &contextLogger);
// NB: potentially translation per part could be different if dealing with superkeys
setupTranslation(partNo, selected, *keyManager);
}
Expand Down Expand Up @@ -1749,10 +1754,15 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
joinGroup->decPending(); // Every queued lookup row triggered an inc., this is the corresponding dec.
}
unsigned __int64 seeks, scans, wildseeks;
unsigned __int64 nodeDiskFetches, leafDiskFetches, blobDiskFetches;
mb.read(seeks).read(scans).read(wildseeks);
mb.read(nodeDiskFetches).read(leafDiskFetches).read(blobDiskFetches);
activity.inactiveStats.sumStatistic(StNumIndexSeeks, seeks);
activity.inactiveStats.sumStatistic(StNumIndexScans, scans);
activity.inactiveStats.sumStatistic(StNumIndexWildSeeks, wildseeks);
activity.inactiveStats.sumStatistic(StNumNodeDiskFetches, nodeDiskFetches);
activity.inactiveStats.sumStatistic(StNumLeafDiskFetches, leafDiskFetches);
activity.inactiveStats.sumStatistic(StNumBlobDiskFetches, blobDiskFetches);
if (received == numRows)
break;
}
Expand Down Expand Up @@ -1802,6 +1812,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
}
virtual void process(CThorExpandingRowArray &processing, unsigned selected) override
{
CStatsScopedThresholdDeltaUpdater scopedStats(statsUpdater);
unsigned partCopy = parts[selected];
unsigned partNo = partCopy & partMask;
unsigned copy = partCopy >> partBits;
Expand Down Expand Up @@ -2222,8 +2233,6 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
CPartDescriptorArray allIndexParts;
std::vector<unsigned> localIndexParts, localFetchPartMap;
IArrayOf<IKeyIndex> tlkKeyIndexes;
CStatsContextLogger contextLogger;
CStatsCtxLoggerDeltaUpdater statsUpdater;
Owned<IEngineRowAllocator> joinFieldsAllocator;
OwnedConstThorRow defaultRight;
unsigned joinFlags = 0;
Expand Down Expand Up @@ -2419,7 +2428,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
{
IKeyIndex *tlkKeyIndex = &tlkKeyIndexes.item(i);
const RtlRecord &keyRecInfo = helper->queryIndexRecordSize()->queryRecordAccessor(true);
Owned<IKeyManager> tlkManager = createLocalKeyManager(keyRecInfo, nullptr, &contextLogger, helper->hasNewSegmentMonitors(), false);
Owned<IKeyManager> tlkManager = createLocalKeyManager(keyRecInfo, nullptr, nullptr, helper->hasNewSegmentMonitors(), false);
tlkManager->setKey(tlkKeyIndex);
keyManagers.append(*tlkManager.getClear());
}
Expand Down Expand Up @@ -2451,10 +2460,10 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
return createKeyIndex(filename, crc, *lazyIFileIO, (unsigned) -1, false);
}
}
IKeyManager *createPartKeyManager(unsigned partNo, unsigned copy)
IKeyManager *createPartKeyManager(unsigned partNo, unsigned copy, IContextLogger *ctx)
{
Owned<IKeyIndex> keyIndex = createPartKeyIndex(partNo, copy, false);
return createLocalKeyManager(helper->queryIndexRecordSize()->queryRecordAccessor(true), keyIndex, &contextLogger, helper->hasNewSegmentMonitors(), false);
return createLocalKeyManager(helper->queryIndexRecordSize()->queryRecordAccessor(true), keyIndex, ctx, helper->hasNewSegmentMonitors(), false);
}
const void *preparePendingLookupRow(void *row, size32_t maxSz, const void *lhsRow, size32_t keySz)
{
Expand Down Expand Up @@ -2616,7 +2625,6 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
}
void stopReadAhead()
{
CStatsScopedThresholdDeltaUpdater scoped(statsUpdater);
keyLookupHandlers.flush();
keyLookupHandlers.join(); // wait for pending handling, there may be more fetch items as a result
fetchLookupHandlers.flushTS();
Expand Down Expand Up @@ -2938,7 +2946,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
public:
IMPLEMENT_IINTERFACE_USING(PARENT);

CKeyedJoinSlave(CGraphElementBase *_container) : PARENT(_container, keyedJoinActivityStatistics), readAheadThread(*this), contextLogger(jhtreeCacheStatistics, thorJob), statsUpdater(jhtreeCacheStatistics, *this, contextLogger)
CKeyedJoinSlave(CGraphElementBase *_container) : PARENT(_container, keyedJoinActivityStatistics), readAheadThread(*this)
{
helper = static_cast <IHThorKeyedJoinArg *> (queryHelper());
reInit = 0 != (helper->getFetchFlags() & (FFvarfilename|FFdynamicfilename)) || (helper->getJoinFlags() & JFvarindexfilename);
Expand Down Expand Up @@ -3368,7 +3376,6 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
}
virtual void stop() override
{
CStatsScopedDeltaUpdater scoped(statsUpdater);
endOfInput = true; // signals to readAhead which is reading input, that is should stop asap.

// could be blocked in readAhead(), because CJoinGroup's are no longer being processed
Expand Down
26 changes: 19 additions & 7 deletions thorlcr/graph/thgraphmaster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -650,21 +650,29 @@ void CMasterActivity::done()
void CMasterActivity::updateFileReadCostStats()
{
// Returns updates numDiskReads & readCost in the file attributes and returns the readCost
auto updateReadCosts = [](IDistributedFile *file, CThorStatsCollection &stats)
auto updateReadCosts = [](bool useJhtreeCacheStats, IDistributedFile *file, CThorStatsCollection &stats)
{
stat_type curDiskReads = stats.getStatisticSum(StNumDiskReads);
StringBuffer clusterName;
file->getClusterName(0, clusterName);
cost_type legacyReadCost = 0, curReadCost = 0;
IPropertyTree & fileAttr = file->queryAttributes();
cost_type legacyReadCost = 0, curReadCost = 0;
// Legacy files will not have the readCost stored as an attribute
if (!hasReadWriteCostFields(&fileAttr) && fileAttr.hasProp(getDFUQResultFieldName(DFUQRFnumDiskReads)))
{
// Legacy file: calculate readCost using prev disk reads and new disk reads
stat_type prevDiskReads = fileAttr.getPropInt64(getDFUQResultFieldName(DFUQRFnumDiskReads), 0);
legacyReadCost = money2cost_type(calcFileAccessCost(clusterName, 0, prevDiskReads));
}
curReadCost = money2cost_type(calcFileAccessCost(clusterName, 0, curDiskReads));
stat_type curDiskReads = stats.getStatisticSum(StNumDiskReads);
if(useJhtreeCacheStats)
{
stat_type numActualReads = stats.getStatisticSum(StNumNodeDiskFetches)
+ stats.getStatisticSum(StNumLeafDiskFetches)
+ stats.getStatisticSum(StNumBlobDiskFetches);
curReadCost = money2cost_type(calcFileAccessCost(clusterName, 0, numActualReads));
}
else
curReadCost = money2cost_type(calcFileAccessCost(clusterName, 0, curDiskReads));
file->addAttrValue(getDFUQResultFieldName(DFUQRFreadCost), legacyReadCost + curReadCost);
file->addAttrValue(getDFUQResultFieldName(DFUQRFnumDiskReads), curDiskReads);
return curReadCost;
Expand All @@ -676,6 +684,10 @@ void CMasterActivity::updateFileReadCostStats()
for (unsigned i=0; i<readFiles.size();i++)
{
IDistributedFile *file = queryReadFile(i);
bool useJhtreeCache = false;
// Index uses jhtree caches, so use actual fetches to calculate cost
if ((TAKkeyedjoin == activityKind || TAKindexread == activityKind) && (0 == i))
useJhtreeCache = true;
if (file)
{
IDistributedSuperFile *super = file->querySuperFile();
Expand All @@ -685,13 +697,13 @@ void CMasterActivity::updateFileReadCostStats()
for (unsigned i=0; i<numSubFiles; i++)
{
IDistributedFile &subFile = super->querySubFile(i, true);
diskAccessCost += updateReadCosts(&subFile, *fileStats[fileIndex]);
diskAccessCost += updateReadCosts(useJhtreeCache, &subFile, *fileStats[fileIndex]);
fileIndex++;
}
}
else
{
diskAccessCost += updateReadCosts(file, *fileStats[fileIndex]);
diskAccessCost += updateReadCosts(useJhtreeCache, file, *fileStats[fileIndex]);
fileIndex++;
}
}
Expand All @@ -701,7 +713,7 @@ void CMasterActivity::updateFileReadCostStats()
{
IDistributedFile *file = queryReadFile(0);
if (file)
diskAccessCost += updateReadCosts(file, statsCollection);
diskAccessCost += updateReadCosts(useJhtreeCache, file, statsCollection);
}
}

Expand Down
6 changes: 6 additions & 0 deletions thorlcr/slave/slavmain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -761,6 +761,9 @@ class CKJService : public CSimpleInterfaceOf<IKJService>, implements IThreaded,
unsigned __int64 startSeeks = stats.getStatisticValue(StNumIndexSeeks);
unsigned __int64 startScans = stats.getStatisticValue(StNumIndexScans);
unsigned __int64 startWildSeeks = stats.getStatisticValue(StNumIndexWildSeeks);
unsigned __int64 startNodeDiskFetches = stats.getStatisticValue(StNumNodeDiskFetches);
unsigned __int64 startLeafDiskFetches = stats.getStatisticValue(StNumLeafDiskFetches);
unsigned __int64 startBlobDiskFetches = stats.getStatisticValue(StNumBlobDiskFetches);
while (!abortSoon)
{
OwnedConstThorRow row = getRowClear(rowNum++);
Expand All @@ -773,6 +776,9 @@ class CKJService : public CSimpleInterfaceOf<IKJService>, implements IThreaded,
replyMb.append(stats.getStatisticValue(StNumIndexSeeks)-startSeeks);
replyMb.append(stats.getStatisticValue(StNumIndexScans)-startScans);
replyMb.append(stats.getStatisticValue(StNumIndexWildSeeks)-startWildSeeks);
replyMb.append(stats.getStatisticValue(StNumNodeDiskFetches)-startNodeDiskFetches);
replyMb.append(stats.getStatisticValue(StNumLeafDiskFetches)-startLeafDiskFetches);
replyMb.append(stats.getStatisticValue(StNumBlobDiskFetches)-startBlobDiskFetches);
if (activityCtx->useMessageCompression())
{
fastLZCompressToBuffer(replyMsg, tmpMB.length(), tmpMB.toByteArray());
Expand Down

0 comments on commit 9f160f7

Please sign in to comment.