Skip to content

Commit

Permalink
HPCC-30433 Add match statistics to join activities in thor
Browse files Browse the repository at this point in the history
Signed-off-by: Gavin Halliday <[email protected]>
  • Loading branch information
ghalliday committed Jun 27, 2024
1 parent 30cae58 commit 78d8841
Show file tree
Hide file tree
Showing 10 changed files with 152 additions and 14 deletions.
4 changes: 4 additions & 0 deletions system/jlib/jstatcodes.h
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,10 @@ enum StatisticKind
StSizeRemoteWrite,
StSizePeakTempDisk,
StSizePeakEphemeralDisk,
StNumMatchLeftRowsMax,
StNumMatchRightRowsMax,
StNumMatchCandidates,
StNumMatchCandidatesMax,
StMax,

//For any quantity there is potentially the following variants.
Expand Down
7 changes: 7 additions & 0 deletions system/jlib/jstats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -979,6 +979,10 @@ static const constexpr StatisticMeta statsMetaData[StMax] = {
{ SIZESTAT(RemoteWrite), "Size of data sent to remote workers"},
{ PEAKSIZESTAT(PeakTempDisk), "High water mark for temporary files"},
{ PEAKSIZESTAT(PeakEphemeralDisk), "High water mark for emphemeral storage use"},
{ NUMSTAT(MatchLeftRowsMax), "The largest number of left rows in a join group" },
{ NUMSTAT(MatchRightRowsMax), "The largest number of right rows in a join group" },
{ NUMSTAT(MatchCandidates), "The number of candidate combinations of left and right rows forming join groups" },
{ NUMSTAT(MatchCandidatesMax), "The largest number of candidate combinations of left and right rows in a single group" },
};

static MapStringTo<StatisticKind, StatisticKind> statisticNameMap(true);
Expand Down Expand Up @@ -3105,6 +3109,9 @@ static bool isWorthReportingMergedValue(StatisticKind kind)
{
case StSizePeakMemory:
case StSizePeakRowMemory:
case StNumMatchLeftRowsMax:
case StNumMatchRightRowsMax:
case StNumMatchCandidatesMax:
//These only make sense for individual nodes, the aggregated value is meaningless
return false;
}
Expand Down
2 changes: 2 additions & 0 deletions thorlcr/activities/hashdistrib/thhashdistribslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4042,6 +4042,7 @@ class HashJoinSlaveActivity : public CSlaveActivity, implements IStopInput
strmR.clear();
{
CriticalBlock b(joinHelperCrit);
joinhelper->gatherStats(inactiveStats);
joinhelper.clear();
}
PARENT::stop();
Expand Down Expand Up @@ -4087,6 +4088,7 @@ class HashJoinSlaveActivity : public CSlaveActivity, implements IStopInput
}
else
{
joinhelper->gatherStats(activeStats);
activeStats.setStatistic(StNumLeftRows, joinhelper->getLhsProgress());
activeStats.setStatistic(StNumRightRows, joinhelper->getRhsProgress());
}
Expand Down
2 changes: 2 additions & 0 deletions thorlcr/activities/join/thjoinslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,7 @@ class JoinSlaveActivity : public CSlaveActivity, implements ILookAheadStopNotify
rhsProgressCount = joinhelper->getRhsProgress();
{
CriticalBlock b(joinHelperCrit);
joinhelper->gatherStats(inactiveStats);
joinhelper.clear();
}
ActPrintLog("SortJoinSlaveActivity::stop");
Expand Down Expand Up @@ -627,6 +628,7 @@ class JoinSlaveActivity : public CSlaveActivity, implements ILookAheadStopNotify
}
else
{
joinhelper->gatherStats(activeStats);
activeStats.setStatistic(StNumLeftRows, joinhelper->getLhsProgress());
if (!isSelfJoin)
activeStats.setStatistic(StNumRightRows, joinhelper->getRhsProgress());
Expand Down
19 changes: 18 additions & 1 deletion thorlcr/activities/lookupjoin/thlookupjoinslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -806,6 +806,7 @@ class CInMemJoinBase : public CSlaveActivity, public CAllOrLookupHelper<HELPER>,
{
typedef CSlaveActivity PARENT;

JoinMatchStats matchStats;
Owned<IException> leftexception;

bool eos, eog, someSinceEog;
Expand Down Expand Up @@ -949,6 +950,7 @@ class CInMemJoinBase : public CSlaveActivity, public CAllOrLookupHelper<HELPER>,
unsigned keepLimit;
unsigned joined;
unsigned joinCounter;
unsigned candidateCounter;
OwnedConstThorRow defaultLeft;

bool leftMatch, grouped;
Expand Down Expand Up @@ -1165,10 +1167,12 @@ class CInMemJoinBase : public CSlaveActivity, public CAllOrLookupHelper<HELPER>,
inline const void *denormalizeNextRow()
{
ConstPointerArray filteredRhs;
unsigned candidates = 0;
while (rhsNext)
{
if (abortSoon)
return NULL;
candidates++;
if (!fuzzyMatch || (HELPERBASE::match(leftRow, rhsNext)))
{
leftMatch = true;
Expand All @@ -1187,6 +1191,7 @@ class CInMemJoinBase : public CSlaveActivity, public CAllOrLookupHelper<HELPER>,
}
rhsNext = tableProxy->getNextRHS(currentHashEntry); // NB: currentHashEntry only used for Lookup,Many case
}
matchStats.noteGroup(1, candidates);
if (filteredRhs.ordinality() || (!leftMatch && 0!=(flags & JFleftouter)))
{
unsigned rcCount = 0;
Expand Down Expand Up @@ -1238,6 +1243,7 @@ class CInMemJoinBase : public CSlaveActivity, public CAllOrLookupHelper<HELPER>,
{
leftRow.setown(left->nextRow());
joinCounter = 0;
candidateCounter = 0;
if (leftRow)
{
eog = false;
Expand Down Expand Up @@ -1273,6 +1279,7 @@ class CInMemJoinBase : public CSlaveActivity, public CAllOrLookupHelper<HELPER>,
RtlDynamicRowBuilder rowBuilder(allocator);
while (rhsNext)
{
candidateCounter++;
if (!fuzzyMatch || HELPERBASE::match(leftRow, rhsNext))
{
leftMatch = true;
Expand All @@ -1289,12 +1296,15 @@ class CInMemJoinBase : public CSlaveActivity, public CAllOrLookupHelper<HELPER>,
rhsNext = NULL;
else
rhsNext = tableProxy->getNextRHS(currentHashEntry); // NB: currentHashEntry only used for Lookup,Many case
if (!rhsNext)
matchStats.noteGroup(1, candidateCounter);
return row.getClear();
}
}
}
rhsNext = tableProxy->getNextRHS(currentHashEntry); // NB: currentHashEntry used for Lookup,Many or All cases
}
matchStats.noteGroup(1, candidateCounter);
if (!leftMatch && NULL == rhsNext && 0!=(flags & JFleftouter))
{
size32_t sz = HELPERBASE::joinTransform(rowBuilder, leftRow, defaultRight, 0, JTFmatchedleft);
Expand Down Expand Up @@ -1330,6 +1340,7 @@ class CInMemJoinBase : public CSlaveActivity, public CAllOrLookupHelper<HELPER>,

joined = 0;
joinCounter = 0;
candidateCounter = 0;
leftMatch = false;
returnMany = false;

Expand Down Expand Up @@ -1472,6 +1483,7 @@ class CInMemJoinBase : public CSlaveActivity, public CAllOrLookupHelper<HELPER>,
{
joined = 0;
joinCounter = 0;
candidateCounter = 0;
leftMatch = false;
rhsNext = NULL;

Expand Down Expand Up @@ -1631,6 +1643,11 @@ class CInMemJoinBase : public CSlaveActivity, public CAllOrLookupHelper<HELPER>,
{
ActPrintLog("LHS input finished, %" RCPF "d rows read", count);
}
virtual void gatherActiveStats(CRuntimeStatisticCollection &activeStats) const override
{
PARENT::gatherActiveStats(activeStats);
matchStats.gatherStats(activeStats);
}
};


Expand Down Expand Up @@ -3359,7 +3376,7 @@ class CAllJoinSlaveActivity : public CInMemJoinBase<CAllTable, IHThorAllJoinArg>
}
}
public:
CAllJoinSlaveActivity(CGraphElementBase *_container) : PARENT(_container)
CAllJoinSlaveActivity(CGraphElementBase *_container) : PARENT(_container, allJoinActivityStatistics)
{
returnMany = true;
}
Expand Down
75 changes: 71 additions & 4 deletions thorlcr/activities/msort/thsortu.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ void swapRows(RtlDynamicRowBuilder &row1, RtlDynamicRowBuilder &row2)
row1.swapWith(row2);
}


class CJoinHelper : implements IJoinHelper, public CSimpleInterface
{
CActivityBase &activity;
Expand Down Expand Up @@ -314,11 +315,13 @@ class CJoinHelper : implements IJoinHelper, public CSimpleInterface
OwnedConstThorRow defaultRight;
Linked<IRowStream> strmL;
Linked<IRowStream> strmR;
JoinMatchStats matchStats;
bool abort = false;
bool nextleftgot = false;
bool nextrightgot = false;
unsigned atmost = (unsigned)-1;
rowcount_t lhsProgressCount = 0, rhsProgressCount = 0;
rowcount_t startMatchLhsProgressCount = 0;
unsigned keepmax = (unsigned)-1;
unsigned abortlimit = (unsigned)-1;
unsigned keepremaining = (unsigned)-1;
Expand Down Expand Up @@ -819,8 +822,13 @@ class CJoinHelper : implements IJoinHelper, public CSimpleInterface
}
}
while (state == JSonfail);
//We have read a row that does not match, so decrement by 1 to get the count for the row that mismatched
rowcount_t nextStartMatchLhsProgressCount = lhsProgressCount - 1;
matchStats.noteGroup(nextStartMatchLhsProgressCount - startMatchLhsProgressCount, 0);
startMatchLhsProgressCount = nextStartMatchLhsProgressCount;
// fall through
case JScompare:
//Need to create a new match group when the right has been completely processed
if (getL()) {
rightidx = 0;
rightgroupmatched = NULL;
Expand Down Expand Up @@ -896,14 +904,29 @@ class CJoinHelper : implements IJoinHelper, public CSimpleInterface
if (!hitatmost&&rightgroup.ordinality())
state = JSmatch;
else if (cmp<0)
{
//Left row and no match right row
matchStats.noteGroup(1, 0); // This will not spot large left groups
startMatchLhsProgressCount = lhsProgressCount;
ret.setown(outrow(Onext,Oouter));
}
else
{
//Right row with no matching left rows.
//This will not spot large right groups since it processes a row at a time
matchStats.noteGroup(0, 1);
ret.setown(outrow(Oouter,Onext));
}
}

}
else if (getR())
else if (getR())
{
//We would miss tracking a very large trailing right group, but it is not worth
//the extra work to spot it
//FUTURE: if (!rightouter) we could return null and stop reading the rhs.
ret.setown(outrow(Oouter,Onext));
}
else
return NULL;
break;
Expand All @@ -920,6 +943,7 @@ class CJoinHelper : implements IJoinHelper, public CSimpleInterface
nextL();
}
mcoreintercept->addWork(&leftgroup,&rightgroup);
startMatchLhsProgressCount = (lhsProgressCount - 1); // Never used, but keep consistent with other cases
state = JScompare;
}
else if (rightidx<rightgroup.ordinality()) {
Expand All @@ -932,8 +956,15 @@ class CJoinHelper : implements IJoinHelper, public CSimpleInterface
rightidx = 0;
if (getL()) {
int cmp = compareL->docompare(nextleft,prevleft);
if (cmp>0)
if (cmp>0)
{
//Finished processing this group -> gather the stats for the number of join candidates.
//lhsProgressCount is one higher than the the row count that follows the end of group
rowcount_t numLeftRows = (lhsProgressCount - 1) - startMatchLhsProgressCount;
matchStats.noteGroup(numLeftRows, rightgroup.ordinality());
startMatchLhsProgressCount = (lhsProgressCount - 1);
state = JSrightgrouponly;
}
else if (cmp<0)
{
activity.logRow("prev: ", *allocatorL->queryOutputMeta(), prevleft);
Expand All @@ -942,10 +973,17 @@ class CJoinHelper : implements IJoinHelper, public CSimpleInterface
}
}
else
{
//Finished processing this group -> gather the stats for the number of join candidates.
rowcount_t numLeftRows = lhsProgressCount - startMatchLhsProgressCount;
matchStats.noteGroup(numLeftRows, rightgroup.ordinality());
startMatchLhsProgressCount = lhsProgressCount;
state = JSrightgrouponly;
}
}
break;
case JSrightgrouponly:
case JSrightgrouponly:
//FUTURE: Avoid walking the right group if it is an inner/left only join.
// right group
if (rightidx<rightgroup.ordinality())
ret.setown(outrow(Oouter,Ogroup));
Expand All @@ -965,6 +1003,12 @@ class CJoinHelper : implements IJoinHelper, public CSimpleInterface
virtual void stop() { abort = true; }
virtual rowcount_t getLhsProgress() const { return lhsProgressCount; }
virtual rowcount_t getRhsProgress() const { return rhsProgressCount; }

virtual void gatherStats(CRuntimeStatisticCollection & stats) const override
{
//Left and right progress could be added here.
matchStats.gatherStats(stats);
}
};

class SelfJoinHelper: implements IJoinHelper, public CSimpleInterface
Expand All @@ -975,6 +1019,7 @@ class SelfJoinHelper: implements IJoinHelper, public CSimpleInterface
CThorExpandingRowArray curgroup;
unsigned leftidx = 0;
unsigned rightidx = 0;
JoinMatchStats matchStats;
bool leftmatched = false;
MemoryBuffer rightmatchedbuf;
bool *rightmatched = nullptr;
Expand Down Expand Up @@ -1193,6 +1238,7 @@ class SelfJoinHelper: implements IJoinHelper, public CSimpleInterface
eof = 0;
return NULL;
}
matchStats.noteGroup(curgroup.ordinality(), curgroup.ordinality());
if (curgroup.ordinality() > INITIAL_SELFJOIN_MATCH_WARNING_LEVEL) {
Owned<IThorException> e = MakeActivityWarning(&activity, TE_SelfJoinMatchWarning, "Exceeded initial match limit");
e->queryData().append((unsigned)curgroup.ordinality());
Expand Down Expand Up @@ -1299,6 +1345,12 @@ class SelfJoinHelper: implements IJoinHelper, public CSimpleInterface
virtual void stop() { abort = true; }
virtual rowcount_t getLhsProgress() const { return progressCount; }
virtual rowcount_t getRhsProgress() const { return progressCount; }
virtual void gatherStats(CRuntimeStatisticCollection & stats) const override
{
//Left and right progress could be added here.
matchStats.gatherStats(stats);
}

};

IJoinHelper *createDenormalizeHelper(CActivityBase &activity, IHThorDenormalizeArg *helper, IThorRowInterfaces *rowIf)
Expand Down Expand Up @@ -1464,7 +1516,7 @@ class CMultiCoreJoinHelperBase: implements IJoinHelper, implements IMulticoreInt
Owned<IException> exc;
CriticalSection sect;
bool eos, selfJoin;

JoinMatchStats matchStats;

void setException(IException *e,const char *title)
{
Expand Down Expand Up @@ -1561,6 +1613,18 @@ class CMultiCoreJoinHelperBase: implements IJoinHelper, implements IMulticoreInt
}
}

void noteGroupSizes(CThorExpandingRowArray *lgroup,CThorExpandingRowArray *rgroup)
{
rowidx_t numLeft = lgroup ? lgroup->ordinality() : 0;
rowidx_t numRight = lgroup ? lgroup->ordinality() : 0;
matchStats.noteGroup(numLeft, numRight);
}

virtual void gatherStats(CRuntimeStatisticCollection & stats) const override
{
matchStats.gatherStats(stats);
}

CMultiCoreJoinHelperBase(CActivityBase &_activity, unsigned numthreads, bool _selfJoin, IJoinHelper *_jhelper, IHThorJoinArg *_helper, IThorRowInterfaces *_rowIf)
: activity(_activity), rowIf(_rowIf)
{
Expand Down Expand Up @@ -1804,6 +1868,8 @@ class CMultiCoreJoinHelper: public CMultiCoreJoinHelperBase
* The pull side, also pulls from the workers in sequence
* This ensures the output is return in input order.
*/
noteGroupSizes(lgroup, rgroup);

cWorker *worker = workers[curin];
worker->workready.wait();
workers[curin]->work.set(lgroup,rgroup);
Expand Down Expand Up @@ -1987,6 +2053,7 @@ class CMultiCoreUnorderedJoinHelper: public CMultiCoreJoinHelperBase
// IMulticoreIntercept impl.
virtual void addWork(CThorExpandingRowArray *lgroup,CThorExpandingRowArray *rgroup)
{
noteGroupSizes(lgroup, rgroup);
cWorkItem *item = new cWorkItem(activity, lgroup, rgroup);
workqueue.enqueue(item);
}
Expand Down
Loading

0 comments on commit 78d8841

Please sign in to comment.