diff --git a/system/jlib/jstatcodes.h b/system/jlib/jstatcodes.h index 9995b70ddfd..aa133ad5ba2 100644 --- a/system/jlib/jstatcodes.h +++ b/system/jlib/jstatcodes.h @@ -307,6 +307,10 @@ enum StatisticKind StSizeRemoteWrite, StSizePeakTempDisk, StSizePeakEphemeralDisk, + StNumMatchLeftRowsMax, + StNumMatchRightRowsMax, + StNumMatchCandidates, + StNumMatchCandidatesMax, StMax, //For any quantity there is potentially the following variants. diff --git a/system/jlib/jstats.cpp b/system/jlib/jstats.cpp index 57733e6c259..92f02d1bdec 100644 --- a/system/jlib/jstats.cpp +++ b/system/jlib/jstats.cpp @@ -979,6 +979,10 @@ static const constexpr StatisticMeta statsMetaData[StMax] = { { SIZESTAT(RemoteWrite), "Size of data sent to remote workers"}, { PEAKSIZESTAT(PeakTempDisk), "High water mark for temporary files"}, { PEAKSIZESTAT(PeakEphemeralDisk), "High water mark for emphemeral storage use"}, + { NUMSTAT(MatchLeftRowsMax), "The largest number of left rows in a join group" }, + { NUMSTAT(MatchRightRowsMax), "The largest number of right rows in a join group" }, + { NUMSTAT(MatchCandidates), "The number of candidate combinations of left and right rows forming join groups" }, + { NUMSTAT(MatchCandidatesMax), "The largest number of candidate combinations of left and right rows in a single group" }, }; static MapStringTo statisticNameMap(true); @@ -3105,6 +3109,9 @@ static bool isWorthReportingMergedValue(StatisticKind kind) { case StSizePeakMemory: case StSizePeakRowMemory: + case StNumMatchLeftRowsMax: + case StNumMatchRightRowsMax: + case StNumMatchCandidatesMax: //These only make sense for individual nodes, the aggregated value is meaningless return false; } diff --git a/thorlcr/activities/hashdistrib/thhashdistribslave.cpp b/thorlcr/activities/hashdistrib/thhashdistribslave.cpp index 0b814f94b63..61a49ced9a9 100644 --- a/thorlcr/activities/hashdistrib/thhashdistribslave.cpp +++ b/thorlcr/activities/hashdistrib/thhashdistribslave.cpp @@ -4042,6 +4042,7 @@ class HashJoinSlaveActivity : public CSlaveActivity, implements IStopInput strmR.clear(); { CriticalBlock b(joinHelperCrit); + joinhelper->gatherStats(inactiveStats); joinhelper.clear(); } PARENT::stop(); @@ -4087,6 +4088,7 @@ class HashJoinSlaveActivity : public CSlaveActivity, implements IStopInput } else { + joinhelper->gatherStats(activeStats); activeStats.setStatistic(StNumLeftRows, joinhelper->getLhsProgress()); activeStats.setStatistic(StNumRightRows, joinhelper->getRhsProgress()); } diff --git a/thorlcr/activities/join/thjoinslave.cpp b/thorlcr/activities/join/thjoinslave.cpp index f7a07ee9f69..f2262256ab7 100644 --- a/thorlcr/activities/join/thjoinslave.cpp +++ b/thorlcr/activities/join/thjoinslave.cpp @@ -378,6 +378,7 @@ class JoinSlaveActivity : public CSlaveActivity, implements ILookAheadStopNotify rhsProgressCount = joinhelper->getRhsProgress(); { CriticalBlock b(joinHelperCrit); + joinhelper->gatherStats(inactiveStats); joinhelper.clear(); } ActPrintLog("SortJoinSlaveActivity::stop"); @@ -627,6 +628,7 @@ class JoinSlaveActivity : public CSlaveActivity, implements ILookAheadStopNotify } else { + joinhelper->gatherStats(activeStats); activeStats.setStatistic(StNumLeftRows, joinhelper->getLhsProgress()); if (!isSelfJoin) activeStats.setStatistic(StNumRightRows, joinhelper->getRhsProgress()); diff --git a/thorlcr/activities/lookupjoin/thlookupjoinslave.cpp b/thorlcr/activities/lookupjoin/thlookupjoinslave.cpp index 3db86c42edc..94d702c14b2 100644 --- a/thorlcr/activities/lookupjoin/thlookupjoinslave.cpp +++ b/thorlcr/activities/lookupjoin/thlookupjoinslave.cpp @@ -806,6 +806,7 @@ class CInMemJoinBase : public CSlaveActivity, public CAllOrLookupHelper, { typedef CSlaveActivity PARENT; + JoinMatchStats matchStats; Owned leftexception; bool eos, eog, someSinceEog; @@ -949,6 +950,7 @@ class CInMemJoinBase : public CSlaveActivity, public CAllOrLookupHelper, unsigned keepLimit; unsigned joined; unsigned joinCounter; + unsigned candidateCounter; OwnedConstThorRow defaultLeft; bool leftMatch, grouped; @@ -1165,10 +1167,12 @@ class CInMemJoinBase : public CSlaveActivity, public CAllOrLookupHelper, inline const void *denormalizeNextRow() { ConstPointerArray filteredRhs; + unsigned candidates = 0; while (rhsNext) { if (abortSoon) return NULL; + candidates++; if (!fuzzyMatch || (HELPERBASE::match(leftRow, rhsNext))) { leftMatch = true; @@ -1187,6 +1191,7 @@ class CInMemJoinBase : public CSlaveActivity, public CAllOrLookupHelper, } rhsNext = tableProxy->getNextRHS(currentHashEntry); // NB: currentHashEntry only used for Lookup,Many case } + matchStats.noteGroup(1, candidates); if (filteredRhs.ordinality() || (!leftMatch && 0!=(flags & JFleftouter))) { unsigned rcCount = 0; @@ -1238,6 +1243,7 @@ class CInMemJoinBase : public CSlaveActivity, public CAllOrLookupHelper, { leftRow.setown(left->nextRow()); joinCounter = 0; + candidateCounter = 0; if (leftRow) { eog = false; @@ -1273,6 +1279,7 @@ class CInMemJoinBase : public CSlaveActivity, public CAllOrLookupHelper, RtlDynamicRowBuilder rowBuilder(allocator); while (rhsNext) { + candidateCounter++; if (!fuzzyMatch || HELPERBASE::match(leftRow, rhsNext)) { leftMatch = true; @@ -1295,6 +1302,7 @@ class CInMemJoinBase : public CSlaveActivity, public CAllOrLookupHelper, } rhsNext = tableProxy->getNextRHS(currentHashEntry); // NB: currentHashEntry used for Lookup,Many or All cases } + matchStats.noteGroup(1, candidateCounter); if (!leftMatch && NULL == rhsNext && 0!=(flags & JFleftouter)) { size32_t sz = HELPERBASE::joinTransform(rowBuilder, leftRow, defaultRight, 0, JTFmatchedleft); @@ -1330,6 +1338,7 @@ class CInMemJoinBase : public CSlaveActivity, public CAllOrLookupHelper, joined = 0; joinCounter = 0; + candidateCounter = 0; leftMatch = false; returnMany = false; @@ -1472,6 +1481,7 @@ class CInMemJoinBase : public CSlaveActivity, public CAllOrLookupHelper, { joined = 0; joinCounter = 0; + candidateCounter = 0; leftMatch = false; rhsNext = NULL; @@ -1631,6 +1641,11 @@ class CInMemJoinBase : public CSlaveActivity, public CAllOrLookupHelper, { ActPrintLog("LHS input finished, %" RCPF "d rows read", count); } + virtual void gatherActiveStats(CRuntimeStatisticCollection &activeStats) const override + { + PARENT::gatherActiveStats(activeStats); + matchStats.gatherStats(activeStats); + } }; @@ -3357,7 +3372,7 @@ class CAllJoinSlaveActivity : public CInMemJoinBase } } public: - CAllJoinSlaveActivity(CGraphElementBase *_container) : PARENT(_container) + CAllJoinSlaveActivity(CGraphElementBase *_container) : PARENT(_container, allJoinActivityStatistics) { returnMany = true; } diff --git a/thorlcr/activities/msort/thsortu.cpp b/thorlcr/activities/msort/thsortu.cpp index 0caf778d494..004ad74efa4 100644 --- a/thorlcr/activities/msort/thsortu.cpp +++ b/thorlcr/activities/msort/thsortu.cpp @@ -276,6 +276,7 @@ void swapRows(RtlDynamicRowBuilder &row1, RtlDynamicRowBuilder &row2) row1.swapWith(row2); } + class CJoinHelper : implements IJoinHelper, public CSimpleInterface { CActivityBase &activity; @@ -314,11 +315,13 @@ class CJoinHelper : implements IJoinHelper, public CSimpleInterface OwnedConstThorRow defaultRight; Linked strmL; Linked strmR; + JoinMatchStats matchStats; bool abort = false; bool nextleftgot = false; bool nextrightgot = false; unsigned atmost = (unsigned)-1; rowcount_t lhsProgressCount = 0, rhsProgressCount = 0; + rowcount_t startMatchLhsProgressCount = 0; unsigned keepmax = (unsigned)-1; unsigned abortlimit = (unsigned)-1; unsigned keepremaining = (unsigned)-1; @@ -819,8 +822,11 @@ class CJoinHelper : implements IJoinHelper, public CSimpleInterface } } while (state == JSonfail); + matchStats.noteGroup((lhsProgressCount - 1) - startMatchLhsProgressCount, 0); + startMatchLhsProgressCount = (lhsProgressCount - 1); // fall through case JScompare: + //Need to create a new match group when the right has been completely processed if (getL()) { rightidx = 0; rightgroupmatched = NULL; @@ -896,14 +902,29 @@ class CJoinHelper : implements IJoinHelper, public CSimpleInterface if (!hitatmost&&rightgroup.ordinality()) state = JSmatch; else if (cmp<0) + { + //Left row and no match right row + matchStats.noteGroup(1, 0); // This will not spot large left groups + startMatchLhsProgressCount = lhsProgressCount; ret.setown(outrow(Onext,Oouter)); + } else + { + //Right row with no matching left rows. + //This will not spot large right groups since it processes a row at a time + matchStats.noteGroup(0, 1); ret.setown(outrow(Oouter,Onext)); + } } } - else if (getR()) + else if (getR()) + { + //We would miss tracking a very large trailing right group, but it is not worth + //the extra work to spot it + //FUTURE: if (!rightouter) we could return null and stop reading the rhs. ret.setown(outrow(Oouter,Onext)); + } else return NULL; break; @@ -920,6 +941,7 @@ class CJoinHelper : implements IJoinHelper, public CSimpleInterface nextL(); } mcoreintercept->addWork(&leftgroup,&rightgroup); + startMatchLhsProgressCount = (lhsProgressCount - 1); // Never used, but keep consistent with other cases state = JScompare; } else if (rightidxdocompare(nextleft,prevleft); - if (cmp>0) + if (cmp>0) + { + //Finished processing this group -> gather the stats for the number of join candidates. + //lhsProgressCount is one higher than the the row count that follows the end of group + rowcount_t numLeftRows = (lhsProgressCount - 1) - startMatchLhsProgressCount; + matchStats.noteGroup(numLeftRows, rightgroup.ordinality()); + startMatchLhsProgressCount = (lhsProgressCount - 1); state = JSrightgrouponly; + } else if (cmp<0) { activity.logRow("prev: ", *allocatorL->queryOutputMeta(), prevleft); @@ -942,10 +971,17 @@ class CJoinHelper : implements IJoinHelper, public CSimpleInterface } } else + { + //Finished processing this group -> gather the stats for the number of join candidates. + rowcount_t numLeftRows = lhsProgressCount - startMatchLhsProgressCount; + matchStats.noteGroup(numLeftRows, rightgroup.ordinality()); + startMatchLhsProgressCount = lhsProgressCount; state = JSrightgrouponly; + } } break; - case JSrightgrouponly: + case JSrightgrouponly: + //FUTURE: Avoid walking the right group if it is an inner/left only join. // right group if (rightidx INITIAL_SELFJOIN_MATCH_WARNING_LEVEL) { Owned e = MakeActivityWarning(&activity, TE_SelfJoinMatchWarning, "Exceeded initial match limit"); e->queryData().append((unsigned)curgroup.ordinality()); @@ -1299,6 +1343,12 @@ class SelfJoinHelper: implements IJoinHelper, public CSimpleInterface virtual void stop() { abort = true; } virtual rowcount_t getLhsProgress() const { return progressCount; } virtual rowcount_t getRhsProgress() const { return progressCount; } + virtual void gatherStats(CRuntimeStatisticCollection & stats) const override + { + //Left and right progress could be added here. + matchStats.gatherStats(stats); + } + }; IJoinHelper *createDenormalizeHelper(CActivityBase &activity, IHThorDenormalizeArg *helper, IThorRowInterfaces *rowIf) @@ -1464,7 +1514,7 @@ class CMultiCoreJoinHelperBase: implements IJoinHelper, implements IMulticoreInt Owned exc; CriticalSection sect; bool eos, selfJoin; - + JoinMatchStats matchStats; void setException(IException *e,const char *title) { @@ -1561,6 +1611,18 @@ class CMultiCoreJoinHelperBase: implements IJoinHelper, implements IMulticoreInt } } + void noteGroupSizes(CThorExpandingRowArray *lgroup,CThorExpandingRowArray *rgroup) + { + rowidx_t numLeft = lgroup ? lgroup->ordinality() : 0; + rowidx_t numRight = lgroup ? lgroup->ordinality() : 0; + matchStats.noteGroup(numLeft, numRight); + } + + virtual void gatherStats(CRuntimeStatisticCollection & stats) const override + { + matchStats.gatherStats(stats); + } + CMultiCoreJoinHelperBase(CActivityBase &_activity, unsigned numthreads, bool _selfJoin, IJoinHelper *_jhelper, IHThorJoinArg *_helper, IThorRowInterfaces *_rowIf) : activity(_activity), rowIf(_rowIf) { @@ -1804,6 +1866,8 @@ class CMultiCoreJoinHelper: public CMultiCoreJoinHelperBase * The pull side, also pulls from the workers in sequence * This ensures the output is return in input order. */ + noteGroupSizes(lgroup, rgroup); + cWorker *worker = workers[curin]; worker->workready.wait(); workers[curin]->work.set(lgroup,rgroup); @@ -1987,6 +2051,7 @@ class CMultiCoreUnorderedJoinHelper: public CMultiCoreJoinHelperBase // IMulticoreIntercept impl. virtual void addWork(CThorExpandingRowArray *lgroup,CThorExpandingRowArray *rgroup) { + noteGroupSizes(lgroup, rgroup); cWorkItem *item = new cWorkItem(activity, lgroup, rgroup); workqueue.enqueue(item); } diff --git a/thorlcr/activities/msort/thsortu.hpp b/thorlcr/activities/msort/thsortu.hpp index eef6aa1c929..006b3f4c7b6 100644 --- a/thorlcr/activities/msort/thsortu.hpp +++ b/thorlcr/activities/msort/thsortu.hpp @@ -60,18 +60,49 @@ interface IJoinHelper: public IRowStream virtual rowcount_t getRhsProgress() const = 0; virtual const void *nextRow() = 0; virtual void stop() = 0; + virtual void gatherStats(CRuntimeStatisticCollection & stats) const = 0; }; IJoinHelper *createJoinHelper(CActivityBase &activity, IHThorJoinArg *helper, IThorRowInterfaces *rowIf, bool parallelmatch, bool unsortedoutput); IJoinHelper *createSelfJoinHelper(CActivityBase &activity, IHThorJoinArg *helper, IThorRowInterfaces *rowIf, bool parallelmatch, bool unsortedoutput); IJoinHelper *createDenormalizeHelper(CActivityBase &activity, IHThorDenormalizeArg *helper, IThorRowInterfaces *rowIf); - - ILimitedCompareHelper *createLimitedCompareHelper(); - - - +//Included here so this can be shared between join and lookup join. +class JoinMatchStats +{ +public: + void gatherStats(CRuntimeStatisticCollection & stats) const + { + //Left and right progress could be added here. + if (maxLeftGroupSize) + stats.addStatistic(StNumMatchLeftRowsMax, maxLeftGroupSize); + if (maxRightGroupSize) + stats.addStatistic(StNumMatchRightRowsMax, maxRightGroupSize); + if (numMatchCandidates) + stats.addStatistic(StNumMatchCandidates, numMatchCandidates); + if (maxMatchCandidates) + stats.addStatistic(StNumMatchCandidatesMax, maxMatchCandidates); + } + + void noteGroup(rowcount_t numLeft, rowcount_t numRight) + { + rowcount_t numCandidates = numLeft * numRight; + if (numLeft > maxLeftGroupSize) + maxLeftGroupSize = numLeft; + if (numRight > maxRightGroupSize) + maxRightGroupSize = numRight; + numMatchCandidates += numCandidates; + if (numCandidates > maxMatchCandidates) + maxMatchCandidates = numCandidates; + } + +public: + stat_type maxLeftGroupSize = 0; + stat_type maxRightGroupSize = 0; + stat_type numMatchCandidates = 0; + stat_type maxMatchCandidates = 0; +}; #endif diff --git a/thorlcr/activities/selfjoin/thselfjoinslave.cpp b/thorlcr/activities/selfjoin/thselfjoinslave.cpp index 8a951a8fe06..5b76eba370a 100644 --- a/thorlcr/activities/selfjoin/thselfjoinslave.cpp +++ b/thorlcr/activities/selfjoin/thselfjoinslave.cpp @@ -195,6 +195,7 @@ class SelfJoinSlaveActivity : public CSlaveActivity } { CriticalBlock b(joinHelperCrit); + joinhelper->gatherStats(inactiveStats); joinhelper.clear(); } if (strm) @@ -231,8 +232,12 @@ class SelfJoinSlaveActivity : public CSlaveActivity { PARENT::gatherActiveStats(activeStats); CriticalBlock b(joinHelperCrit); - rowcount_t p = joinhelper?joinhelper->getLhsProgress():0; - activeStats.setStatistic(StNumLeftRows, p); + if (joinhelper) + { + joinhelper->gatherStats(activeStats); + rowcount_t p = joinhelper->getLhsProgress(); + activeStats.setStatistic(StNumLeftRows, p); + } mergeStats(activeStats, sorter, spillStatistics); // No danger of a race with reset() because that never replaces a valid sorter } }; diff --git a/thorlcr/thorutil/thormisc.cpp b/thorlcr/thorutil/thormisc.cpp index e3759811a48..6b6151eea37 100644 --- a/thorlcr/thorutil/thormisc.cpp +++ b/thorlcr/thorutil/thormisc.cpp @@ -83,8 +83,10 @@ const StatisticsMapping indexReadActivityStatistics({StNumRowsProcessed}, indexR const StatisticsMapping indexWriteActivityStatistics({StPerReplicated, StNumLeafCacheAdds, StNumNodeCacheAdds, StNumBlobCacheAdds }, basicActivityStatistics, diskWriteRemoteStatistics); const StatisticsMapping keyedJoinActivityStatistics({ StNumIndexAccepted, StNumPreFiltered, StNumDiskSeeks, StNumDiskAccepted, StNumDiskRejected}, basicActivityStatistics, indexReadFileStatistics); const StatisticsMapping loopActivityStatistics({StNumIterations}, basicActivityStatistics); -const StatisticsMapping lookupJoinActivityStatistics({StNumSmartJoinSlavesDegradedToStd, StNumSmartJoinDegradedToLocal}, basicActivityStatistics); -const StatisticsMapping joinActivityStatistics({StNumLeftRows, StNumRightRows}, basicActivityStatistics, spillStatistics); +const StatisticsMapping commonJoinActivityStatistics({StNumMatchLeftRowsMax, StNumMatchRightRowsMax, StNumMatchCandidates, StNumMatchCandidatesMax}, basicActivityStatistics); +const StatisticsMapping allJoinActivityStatistics({}, commonJoinActivityStatistics); +const StatisticsMapping lookupJoinActivityStatistics({StNumSmartJoinSlavesDegradedToStd, StNumSmartJoinDegradedToLocal}, commonJoinActivityStatistics); +const StatisticsMapping joinActivityStatistics({StNumLeftRows, StNumRightRows}, commonJoinActivityStatistics, spillStatistics); const StatisticsMapping diskReadActivityStatistics({StNumDiskRowsRead, }, basicActivityStatistics, diskReadRemoteStatistics); const StatisticsMapping diskWriteActivityStatistics({StPerReplicated}, basicActivityStatistics, diskWriteRemoteStatistics); const StatisticsMapping sortActivityStatistics({}, basicActivityStatistics, spillStatistics); diff --git a/thorlcr/thorutil/thormisc.hpp b/thorlcr/thorutil/thormisc.hpp index cb259a7053a..4f3b54189d5 100644 --- a/thorlcr/thorutil/thormisc.hpp +++ b/thorlcr/thorutil/thormisc.hpp @@ -147,6 +147,7 @@ extern graph_decl const StatisticsMapping indexReadActivityStatistics; extern graph_decl const StatisticsMapping indexWriteActivityStatistics; extern graph_decl const StatisticsMapping joinActivityStatistics; extern graph_decl const StatisticsMapping keyedJoinActivityStatistics; +extern graph_decl const StatisticsMapping allJoinActivityStatistics; extern graph_decl const StatisticsMapping lookupJoinActivityStatistics; extern graph_decl const StatisticsMapping loopActivityStatistics; extern graph_decl const StatisticsMapping diskReadActivityStatistics;