Skip to content

Commit

Permalink
HPCC-30599 Fix file access costs for keyed join
Browse files Browse the repository at this point in the history
Signed-off-by: Shamser Ahmed <[email protected]>
  • Loading branch information
jakesmith authored and shamser committed Nov 13, 2023
1 parent bd658b4 commit 9742b67
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 23 deletions.
7 changes: 7 additions & 0 deletions dali/base/dadfs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,13 @@ extern da_decl double calcFileAccessCost(const char * cluster, __int64 numDiskWr
return accessCost;
}

extern da_decl cost_type calcFileAccessCost(IDistributedFile *f, __int64 numDiskWrites, __int64 numDiskReads)
{
StringBuffer clusterName;
f->getClusterName(0, clusterName);
return money2cost_type(calcFileAccessCost(clusterName, numDiskWrites, numDiskReads));
}

RemoteFilename &constructPartFilename(IGroup *grp,unsigned partno,unsigned partmax,const char *name,const char *partmask,const char *partdir,unsigned copy,ClusterPartDiskMapSpec &mspec,RemoteFilename &rfn)
{
partno--;
Expand Down
1 change: 1 addition & 0 deletions dali/base/dadfs.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -888,6 +888,7 @@ extern da_decl void ensureFileScope(const CDfsLogicalFileName &dlfn, unsigned ti
extern da_decl bool checkLogicalName(const char *lfn,IUserDescriptor *user,bool readreq,bool createreq,bool allowquery,const char *specialnotallowedmsg);
extern da_decl void calcFileCost(const char * cluster, double sizeGB, double fileAgeDays, __int64 numDiskWrites, __int64 numDiskReads, double & atRestCost, double & accessCost);
extern da_decl double calcFileAccessCost(const char * cluster, __int64 numDiskWrites, __int64 numDiskReads);
extern da_decl cost_type calcFileAccessCost(IDistributedFile *f, __int64 numDiskWrites, __int64 numDiskReads);
constexpr bool defaultPrivilegedUser = true;
constexpr bool defaultNonPrivilegedUser = false;

Expand Down
6 changes: 4 additions & 2 deletions system/jhtree/jhtree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -573,10 +573,12 @@ class jhtree_decl CKeyLevelManager : implements IKeyManager, public CInterface
filter->describe(out);
}

virtual void mergeStats(CRuntimeStatisticCollection & stats) const
virtual void mergeStats(CRuntimeStatisticCollection & targetStats) const
{
if (keyCursor)
keyCursor->mergeStats(stats);
keyCursor->mergeStats(targetStats);
if (stats.ctx)
targetStats.merge(stats.ctx->queryStats());
}
};

Expand Down
3 changes: 1 addition & 2 deletions system/jlib/jstats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1322,8 +1322,7 @@ const StatisticsMapping noStatistics({});
const StatisticsMapping jhtreeCacheStatistics({ StNumIndexSeeks, StNumIndexScans, StNumPostFiltered, StNumIndexWildSeeks,
StNumNodeCacheAdds, StNumLeafCacheAdds, StNumBlobCacheAdds, StNumNodeCacheHits, StNumLeafCacheHits, StNumBlobCacheHits, StCycleNodeLoadCycles, StCycleLeafLoadCycles,
StCycleBlobLoadCycles, StCycleNodeReadCycles, StCycleLeafReadCycles, StCycleBlobReadCycles, StNumNodeDiskFetches, StNumLeafDiskFetches, StNumBlobDiskFetches,
StCycleNodeFetchCycles, StCycleLeafFetchCycles, StCycleBlobFetchCycles, StCycleIndexCacheBlockedCycles, StNumIndexMergeCompares, StNumIndexMerges, StNumIndexSkips, StNumIndexNullSkips,
StTimeLeafLoad, StTimeLeafRead, StTimeLeafFetch, StTimeIndexCacheBlocked, StTimeNodeFetch});
StCycleNodeFetchCycles, StCycleLeafFetchCycles, StCycleBlobFetchCycles, StCycleIndexCacheBlockedCycles, StNumIndexMergeCompares, StNumIndexMerges, StNumIndexSkips, StNumIndexNullSkips, StTimeLeafLoad, StTimeLeafRead, StTimeLeafFetch, StTimeIndexCacheBlocked, StTimeNodeFetch, StTimeNodeLoad, StTimeNodeRead});

const StatisticsMapping allStatistics(StKindAll);
const StatisticsMapping heapStatistics({StNumAllocations, StNumAllocationScans});
Expand Down
2 changes: 1 addition & 1 deletion system/jlib/jstats.h
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,7 @@ class jlib_decl CRuntimeStatistic
};

class CNestedRuntimeStatisticMap;

extern jlib_decl const char * queryStatisticName(StatisticKind kind);
//The CRuntimeStatisticCollection used to gather statistics for an activity - it has no notion of its scope, but can contain nested scopes.
//Some of the functions have node parameters which have no meaning for the base implementation, but are used by the derived class
//CRuntimeSummaryStatisticCollection which is used fro summarising stats from multiple different worker nodes.
Expand Down
33 changes: 21 additions & 12 deletions thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -959,6 +959,8 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
{
protected:
CKeyedJoinSlave &activity;
CStatsContextLogger contextLogger;
CStatsCtxLoggerDeltaUpdater statsUpdater;
IThorRowInterfaces *rowIf;
IHThorKeyedJoinArg *helper = nullptr;
std::vector<CThorExpandingRowArray *> queues;
Expand Down Expand Up @@ -995,7 +997,8 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
}
public:
CLookupHandler(CKeyedJoinSlave &_activity, IThorRowInterfaces *_rowIf, unsigned _batchProcessLimit) : threaded("CLookupHandler", this),
activity(_activity), rowIf(_rowIf), batchProcessLimit(_batchProcessLimit)
activity(_activity), rowIf(_rowIf), batchProcessLimit(_batchProcessLimit),
contextLogger(jhtreeCacheStatistics, thorJob), statsUpdater(jhtreeCacheStatistics, _activity, contextLogger)
{
helper = activity.helper;
}
Expand Down Expand Up @@ -1084,6 +1087,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
}
void stop()
{
CStatsScopedDeltaUpdater scoped(statsUpdater);
stopped = true;
join();
for (auto &queue : queues)
Expand Down Expand Up @@ -1155,6 +1159,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
}
void flush()
{
CStatsScopedThresholdDeltaUpdater scoped(statsUpdater);
// NB: queueLookup() must be protected from re-entry by caller
for (unsigned b=0; b<batchArrays.size(); b++)
{
Expand Down Expand Up @@ -1283,7 +1288,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
}
void processRows(CThorExpandingRowArray &processing, unsigned partNo, IKeyManager *keyManager)
{
CStatsScopedThresholdDeltaUpdater scoped(activity.statsUpdater);
CStatsScopedThresholdDeltaUpdater scoped(statsUpdater);
for (unsigned r=0; r<processing.ordinality() && !stopped; r++)
{
OwnedConstThorRow row = processing.getClear(r);
Expand All @@ -1308,7 +1313,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
joinGroup->setAtMostLimitHit(); // also clears existing rows
break;
}
KLBlobProviderAdapter adapter(keyManager, &activity.contextLogger);
KLBlobProviderAdapter adapter(keyManager, &contextLogger);
byte const * keyRow = keyManager->queryKeyBuffer();
size_t fposOffset = keyManager->queryRowSize() - sizeof(offset_t);
offset_t fpos = rtlReadBigUInt8(keyRow + fposOffset);
Expand Down Expand Up @@ -1381,7 +1386,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
std::atomic<IKeyManager *> & keyManager = (*keyManagers)[selected];
if (!keyManager) // delayed until actually needed
{
keyManager = activity.createPartKeyManager(partNo, copy);
keyManager = activity.createPartKeyManager(partNo, copy, &contextLogger);
// NB: potentially translation per part could be different if dealing with superkeys
setupTranslation(partNo, selected, *keyManager);
}
Expand All @@ -1400,7 +1405,9 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
keyManager->mergeStats(*fileStats[startOffset+subfile]);
}
else
{
keyManager->mergeStats(*fileStats[startOffset]);
}
}
}
};
Expand Down Expand Up @@ -1749,10 +1756,15 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
joinGroup->decPending(); // Every queued lookup row triggered an inc., this is the corresponding dec.
}
unsigned __int64 seeks, scans, wildseeks;
unsigned __int64 nodeDiskFetches, leafDiskFetches, blobDiskFetches;
mb.read(seeks).read(scans).read(wildseeks);
mb.read(nodeDiskFetches).read(leafDiskFetches).read(blobDiskFetches);
activity.inactiveStats.sumStatistic(StNumIndexSeeks, seeks);
activity.inactiveStats.sumStatistic(StNumIndexScans, scans);
activity.inactiveStats.sumStatistic(StNumIndexWildSeeks, wildseeks);
activity.inactiveStats.sumStatistic(StNumNodeDiskFetches, nodeDiskFetches);
activity.inactiveStats.sumStatistic(StNumLeafDiskFetches, leafDiskFetches);
activity.inactiveStats.sumStatistic(StNumBlobDiskFetches, blobDiskFetches);
if (received == numRows)
break;
}
Expand Down Expand Up @@ -1802,6 +1814,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
}
virtual void process(CThorExpandingRowArray &processing, unsigned selected) override
{
CStatsScopedThresholdDeltaUpdater scopedStats(statsUpdater);
unsigned partCopy = parts[selected];
unsigned partNo = partCopy & partMask;
unsigned copy = partCopy >> partBits;
Expand Down Expand Up @@ -2222,8 +2235,6 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
CPartDescriptorArray allIndexParts;
std::vector<unsigned> localIndexParts, localFetchPartMap;
IArrayOf<IKeyIndex> tlkKeyIndexes;
CStatsContextLogger contextLogger;
CStatsCtxLoggerDeltaUpdater statsUpdater;
Owned<IEngineRowAllocator> joinFieldsAllocator;
OwnedConstThorRow defaultRight;
unsigned joinFlags = 0;
Expand Down Expand Up @@ -2419,7 +2430,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
{
IKeyIndex *tlkKeyIndex = &tlkKeyIndexes.item(i);
const RtlRecord &keyRecInfo = helper->queryIndexRecordSize()->queryRecordAccessor(true);
Owned<IKeyManager> tlkManager = createLocalKeyManager(keyRecInfo, nullptr, &contextLogger, helper->hasNewSegmentMonitors(), false);
Owned<IKeyManager> tlkManager = createLocalKeyManager(keyRecInfo, nullptr, nullptr, helper->hasNewSegmentMonitors(), false);
tlkManager->setKey(tlkKeyIndex);
keyManagers.append(*tlkManager.getClear());
}
Expand Down Expand Up @@ -2451,10 +2462,10 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
return createKeyIndex(filename, crc, *lazyIFileIO, (unsigned) -1, false);
}
}
IKeyManager *createPartKeyManager(unsigned partNo, unsigned copy)
IKeyManager *createPartKeyManager(unsigned partNo, unsigned copy, IContextLogger *ctx)
{
Owned<IKeyIndex> keyIndex = createPartKeyIndex(partNo, copy, false);
return createLocalKeyManager(helper->queryIndexRecordSize()->queryRecordAccessor(true), keyIndex, &contextLogger, helper->hasNewSegmentMonitors(), false);
return createLocalKeyManager(helper->queryIndexRecordSize()->queryRecordAccessor(true), keyIndex, ctx, helper->hasNewSegmentMonitors(), false);
}
const void *preparePendingLookupRow(void *row, size32_t maxSz, const void *lhsRow, size32_t keySz)
{
Expand Down Expand Up @@ -2616,7 +2627,6 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
}
void stopReadAhead()
{
CStatsScopedThresholdDeltaUpdater scoped(statsUpdater);
keyLookupHandlers.flush();
keyLookupHandlers.join(); // wait for pending handling, there may be more fetch items as a result
fetchLookupHandlers.flushTS();
Expand Down Expand Up @@ -2938,7 +2948,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
public:
IMPLEMENT_IINTERFACE_USING(PARENT);

CKeyedJoinSlave(CGraphElementBase *_container) : PARENT(_container, keyedJoinActivityStatistics), readAheadThread(*this), contextLogger(jhtreeCacheStatistics, thorJob), statsUpdater(jhtreeCacheStatistics, *this, contextLogger)
CKeyedJoinSlave(CGraphElementBase *_container) : PARENT(_container, keyedJoinActivityStatistics), readAheadThread(*this)
{
helper = static_cast <IHThorKeyedJoinArg *> (queryHelper());
reInit = 0 != (helper->getFetchFlags() & (FFvarfilename|FFdynamicfilename)) || (helper->getJoinFlags() & JFvarindexfilename);
Expand Down Expand Up @@ -3368,7 +3378,6 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
}
virtual void stop() override
{
CStatsScopedDeltaUpdater scoped(statsUpdater);
endOfInput = true; // signals to readAhead which is reading input, that is should stop asap.

// could be blocked in readAhead(), because CJoinGroup's are no longer being processed
Expand Down
25 changes: 19 additions & 6 deletions thorlcr/graph/thgraphmaster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,20 @@ void CMasterActivity::updateFileReadCostStats()
IDistributedFile *file = queryReadFile(i);
if (file)
{
ThorActivityKind activityKind = container.getKind();
bool usesJhtreeCache = false;
if (TAKkeyedjoin == activityKind || TAKindexread == activityKind)
{
// Index uses jhtree caches, so use actual fetches to calculate cost
usesJhtreeCache = true;
if (0 == i)
{
stat_type numDiskReads = fileStats[fileIndex]->getStatisticSum(StNumNodeDiskFetches)
+ fileStats[fileIndex]->getStatisticSum(StNumLeafDiskFetches)
+ fileStats[fileIndex]->getStatisticSum(StNumBlobDiskFetches);
diskAccessCost += calcFileAccessCost(file, 0, numDiskReads);
}
}
IDistributedSuperFile *super = file->querySuperFile();
if (super)
{
Expand All @@ -665,19 +679,18 @@ void CMasterActivity::updateFileReadCostStats()
{
IDistributedFile &subFile = super->querySubFile(i, true);
stat_type numDiskReads = fileStats[fileIndex]->getStatisticSum(StNumDiskReads);
StringBuffer clusterName;
subFile.getClusterName(0, clusterName);
diskAccessCost += money2cost_type(calcFileAccessCost(clusterName, 0, numDiskReads));
if (!usesJhtreeCache)
diskAccessCost += calcFileAccessCost(&subFile, 0, numDiskReads);
subFile.addAttrValue("@numDiskReads", numDiskReads);
fileIndex++;
}
}
else
{
stat_type numDiskReads = fileStats[fileIndex]->getStatisticSum(StNumDiskReads);
StringBuffer clusterName;
file->getClusterName(0, clusterName);
diskAccessCost += money2cost_type(calcFileAccessCost(clusterName, 0, numDiskReads));
if (!usesJhtreeCache)
diskAccessCost += calcFileAccessCost(file, 0, numDiskReads);

file->addAttrValue("@numDiskReads", numDiskReads);
fileIndex++;
}
Expand Down
6 changes: 6 additions & 0 deletions thorlcr/slave/slavmain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -761,6 +761,9 @@ class CKJService : public CSimpleInterfaceOf<IKJService>, implements IThreaded,
unsigned __int64 startSeeks = stats.getStatisticValue(StNumIndexSeeks);
unsigned __int64 startScans = stats.getStatisticValue(StNumIndexScans);
unsigned __int64 startWildSeeks = stats.getStatisticValue(StNumIndexWildSeeks);
unsigned __int64 startNodeDiskFetches = stats.getStatisticValue(StNumNodeDiskFetches);
unsigned __int64 startLeafDiskFetches = stats.getStatisticValue(StNumLeafDiskFetches);
unsigned __int64 startBlobDiskFetches = stats.getStatisticValue(StNumBlobDiskFetches);
while (!abortSoon)
{
OwnedConstThorRow row = getRowClear(rowNum++);
Expand All @@ -773,6 +776,9 @@ class CKJService : public CSimpleInterfaceOf<IKJService>, implements IThreaded,
replyMb.append(stats.getStatisticValue(StNumIndexSeeks)-startSeeks);
replyMb.append(stats.getStatisticValue(StNumIndexScans)-startScans);
replyMb.append(stats.getStatisticValue(StNumIndexWildSeeks)-startWildSeeks);
replyMb.append(stats.getStatisticValue(StNumNodeDiskFetches)-startNodeDiskFetches);
replyMb.append(stats.getStatisticValue(StNumLeafDiskFetches)-startLeafDiskFetches);
replyMb.append(stats.getStatisticValue(StNumBlobDiskFetches)-startBlobDiskFetches);
if (activityCtx->useMessageCompression())
{
fastLZCompressToBuffer(replyMsg, tmpMB.length(), tmpMB.toByteArray());
Expand Down

0 comments on commit 9742b67

Please sign in to comment.