Skip to content

Commit

Permalink
HPCC-30590 Report warning on any activity that has a large skew in ex…
Browse files Browse the repository at this point in the history
…ecution worktime

Signed-off-by: Shamser Ahmed <[email protected]>
  • Loading branch information
shamser committed Dec 7, 2023
1 parent a3306dd commit fa63f90
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 1 deletion.
2 changes: 1 addition & 1 deletion common/wuanalysis/anaerrorcodes.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ typedef enum
ANA_DISTRIB_SKEW_INPUT_ID,
ANA_DISTRIB_SKEW_OUTPUT_ID,
ANA_IOSKEW_RECORDS_ID,
ANA_UNUSED_ID, /* May re-use but don't remove to avoid changing later id's */
ANA_EXECUTE_SKEW_ID,
ANA_KJ_EXCESS_PREFILTER_ID
} AnalyzerErrorCode;

Expand Down
52 changes: 52 additions & 0 deletions common/wuanalysis/anarule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,57 @@ class IoSkewRule : public AActivityRule
const char * category;
};

class LocalExecuteSkewRule : public AActivityRule
{
public:
virtual bool isCandidate(IWuActivity & activity) const override
{
switch (activity.getAttr(WaKind))
{
case TAKfirstn: // skew is expected, so ignore
case TAKtopn:
case TAKsort:
return false;
}
return true;
}

virtual bool check(PerformanceIssue & result, IWuActivity & activity, const IAnalyserOptions & options) override
{
stat_type localExecuteMaxSkew = activity.getStatRaw(StTimeLocalExecute, StSkewMax);
if (localExecuteMaxSkew<options.queryOption(watOptSkewThreshold))
return false;

stat_type timeMaxLocalExecute = activity.getStatRaw(StTimeLocalExecute, StMaxX);
stat_type timeAvgLocalExecute = activity.getStatRaw(StTimeLocalExecute, StAvgX);
stat_type timePenalty = (timeMaxLocalExecute - timeAvgLocalExecute);;
if (timePenalty<options.queryOption(watOptMinInterestingTime))
return false;

bool inputSkewed = false;
for(unsigned edgeNo = 0; IWuEdge *wuInputEdge = activity.queryInput(edgeNo); edgeNo++)
{
if (wuInputEdge->getStatRaw(StNumRowsProcessed, StSkewMax)>options.queryOption(watOptSkewThreshold))
{
inputSkewed = true;
break;
}
}
bool outputSkewed = false;
IWuEdge *wuOutputEdge = activity.queryOutput(0);
if (wuOutputEdge && (wuOutputEdge->getStatRaw(StNumRowsProcessed, StSkewMax)>options.queryOption(watOptSkewThreshold)))
outputSkewed = true;

if (inputSkewed)
result.set(ANA_EXECUTE_SKEW_ID, timePenalty, "Significant skew in local execute time caused by uneven input");
else if (outputSkewed)
result.set(ANA_EXECUTE_SKEW_ID, timePenalty, "Significant skew in local execute time caused by uneven output");
else
result.set(ANA_EXECUTE_SKEW_ID, timePenalty, "Significant skew in local execute time");
return true;
}
};

class KeyedJoinExcessRejectedRowsRule : public ActivityKindRule
{
public:
Expand Down Expand Up @@ -221,4 +272,5 @@ void gatherRules(CIArrayOf<AActivityRule> & rules)
rules.append(*new IoSkewRule(StTimeDiskWriteIO, "disk write"));
rules.append(*new IoSkewRule(StTimeSpillElapsed, "spill"));
rules.append(*new KeyedJoinExcessRejectedRowsRule);
rules.append(*new LocalExecuteSkewRule);
}

0 comments on commit fa63f90

Please sign in to comment.