diff --git a/common/wuanalysis/anaerrorcodes.hpp b/common/wuanalysis/anaerrorcodes.hpp index e2503c27759..37198fb970a 100644 --- a/common/wuanalysis/anaerrorcodes.hpp +++ b/common/wuanalysis/anaerrorcodes.hpp @@ -9,7 +9,7 @@ typedef enum ANA_DISTRIB_SKEW_INPUT_ID, ANA_DISTRIB_SKEW_OUTPUT_ID, ANA_IOSKEW_RECORDS_ID, - ANA_UNUSED_ID, /* May re-use but don't remove to avoid changing later id's */ + ANA_EXECUTE_SKEW_ID, ANA_KJ_EXCESS_PREFILTER_ID } AnalyzerErrorCode; diff --git a/common/wuanalysis/anarule.cpp b/common/wuanalysis/anarule.cpp index 0191d17271c..5180c22d44d 100644 --- a/common/wuanalysis/anarule.cpp +++ b/common/wuanalysis/anarule.cpp @@ -183,6 +183,57 @@ class IoSkewRule : public AActivityRule const char * category; }; +class LocalExecuteSkewRule : public AActivityRule +{ +public: + virtual bool isCandidate(IWuActivity & activity) const override + { + switch (activity.getAttr(WaKind)) + { + case TAKfirstn: // skew is expected, so ignore + case TAKtopn: + case TAKsort: + return false; + } + return true; + } + + virtual bool check(PerformanceIssue & result, IWuActivity & activity, const IAnalyserOptions & options) override + { + stat_type localExecuteMaxSkew = activity.getStatRaw(StTimeLocalExecute, StSkewMax); + if (localExecuteMaxSkewgetStatRaw(StNumRowsProcessed, StSkewMax)>options.queryOption(watOptSkewThreshold)) + { + inputSkewed = true; + break; + } + } + bool outputSkewed = false; + IWuEdge *wuOutputEdge = activity.queryOutput(0); + if (wuOutputEdge && (wuOutputEdge->getStatRaw(StNumRowsProcessed, StSkewMax)>options.queryOption(watOptSkewThreshold))) + outputSkewed = true; + + if (inputSkewed) + result.set(ANA_EXECUTE_SKEW_ID, timePenalty, "Significant skew in local execute time caused by uneven input"); + else if (outputSkewed) + result.set(ANA_EXECUTE_SKEW_ID, timePenalty, "Significant skew in local execute time caused by uneven output"); + else + result.set(ANA_EXECUTE_SKEW_ID, timePenalty, "Significant skew in local execute time"); + return true; + } +}; + class KeyedJoinExcessRejectedRowsRule : public ActivityKindRule { public: @@ -221,4 +272,5 @@ void gatherRules(CIArrayOf & rules) rules.append(*new IoSkewRule(StTimeDiskWriteIO, "disk write")); rules.append(*new IoSkewRule(StTimeSpillElapsed, "spill")); rules.append(*new KeyedJoinExcessRejectedRowsRule); + rules.append(*new LocalExecuteSkewRule); }