diff --git a/common/wuanalysis/anacommon.hpp b/common/wuanalysis/anacommon.hpp index a3dad25525c..e4ece324878 100644 --- a/common/wuanalysis/anacommon.hpp +++ b/common/wuanalysis/anacommon.hpp @@ -86,10 +86,11 @@ enum WutOptionType watOptFirst=0, watOptMinInterestingTime=0, watOptMinInterestingCost, + watOptMinInterestingWaste, watOptSkewThreshold, watOptMinRowsPerNode, watPreFilteredKJThreshold, - watCostRatePerHour, + watClusterCostPerHour, watOptMax }; diff --git a/common/wuanalysis/anarule.cpp b/common/wuanalysis/anarule.cpp index da5b62d61d8..0ec04245377 100644 --- a/common/wuanalysis/anarule.cpp +++ b/common/wuanalysis/anarule.cpp @@ -22,10 +22,25 @@ #include "commonext.hpp" -static cost_type calcIssueCost(stat_type timePenalty, const stat_type clusterCostPerHour) +static constexpr cost_type calcIssueCost(const stat_type clusterCostPerHour, stat_type timeWasted) { - double timePenaltyPerHour = (double)statUnits2seconds(timePenalty) / 3600; - return timePenaltyPerHour*clusterCostPerHour; + double timePenaltyHours = statUnits2seconds(timeWasted) / 3600; + return timePenaltyHours * clusterCostPerHour; +} + +static constexpr bool isWorthReporting(const IAnalyserOptions & options, stat_type timeWasted, cost_type moneyWasted) +{ + // if neither threshold is set, then report all issues + if (timeWasted && options.queryOption(watOptMinInterestingWaste)==0 && options.queryOption(watOptMinInterestingCost)==0) + return true; + // if the cluster cost available and the issue cost is greater than threshold, report it + if (options.queryOption(watClusterCostPerHour) && options.queryOption(watOptMinInterestingCost) + && (moneyWasted >= options.queryOption(watOptMinInterestingCost))) + return true; + // if the issue time wasted is greater than threshold, report it + if (options.queryOption(watOptMinInterestingWaste) && (timeWasted >= options.queryOption(watOptMinInterestingWaste))) + return true; + return false; } class ActivityKindRule : public AActivityRule @@ -59,21 +74,21 @@ class DistributeSkewRule : public ActivityKindRule stat_type rowsMaxSkew = outputEdge->getStatRaw(StNumRowsProcessed, StSkewMax); if (rowsMaxSkew > options.queryOption(watOptSkewThreshold)) { - // Use downstream activity time to calculate approximate timePenalty + // Use downstream activity time to calculate approximate time wasted IWuActivity * targetActivity = outputEdge->queryTarget(); assertex(targetActivity); stat_type timeMaxLocalExecute = targetActivity->getStatRaw(StTimeLocalExecute, StMaxX); stat_type timeAvgLocalExecute = targetActivity->getStatRaw(StTimeLocalExecute, StAvgX); - // Consider ways to improve this timePenalty calculation further - stat_type timePenalty = timeMaxLocalExecute - timeAvgLocalExecute; - cost_type costPenalty = calcIssueCost(timePenalty, options.queryOption(watCostRatePerHour)); - if (costPenalty >= options.queryOption(watOptMinInterestingCost)) + // Consider ways to improve this time wasted calculation + stat_type timeWasted = timeMaxLocalExecute - timeAvgLocalExecute; + cost_type moneyWasted = calcIssueCost(options.queryOption(watClusterCostPerHour), timeWasted); + if (isWorthReporting(options, timeWasted, moneyWasted)) { IWuEdge * inputEdge = activity.queryInput(0); if (inputEdge && (inputEdge->getStatRaw(StNumRowsProcessed, StSkewMax) < rowsMaxSkew)) - result.set(ANA_DISTRIB_SKEW_INPUT_ID, timePenalty, costPenalty, "DISTRIBUTE output skew is worse than input skew"); + result.set(ANA_DISTRIB_SKEW_INPUT_ID, timeWasted, moneyWasted, "DISTRIBUTE output skew is worse than input skew"); else - result.set(ANA_DISTRIB_SKEW_OUTPUT_ID, timePenalty, costPenalty, "Significant skew in DISTRIBUTE output"); + result.set(ANA_DISTRIB_SKEW_OUTPUT_ID, timeWasted, moneyWasted, "Significant skew in DISTRIBUTE output"); updateInformation(result, activity); return true; } @@ -145,12 +160,12 @@ class IoSkewRule : public AActivityRule stat_type timeMaxLocalExecute = activity.getStatRaw(StTimeLocalExecute, StMaxX); stat_type timeAvgLocalExecute = activity.getStatRaw(StTimeLocalExecute, StAvgX); - stat_type timePenalty; + stat_type timeWasted; const char * msg = nullptr; if ((actkind==TAKspillread||actkind==TAKspillwrite) && (activity.getStatRaw(stat, StMinX) == 0)) { //If one node didn't spill then it is possible the skew caused all the lost time - timePenalty = timeMaxLocalExecute; + timeWasted = timeMaxLocalExecute; msg = "Uneven worker spilling"; } else @@ -172,7 +187,7 @@ class IoSkewRule : public AActivityRule } if (wuEdge && wuEdge->getStatRaw(StNumRowsProcessed, StSkewMax)>options.queryOption(watOptSkewThreshold)) numRowsSkew = true; - timePenalty = (timeMaxLocalExecute - timeAvgLocalExecute); + timeWasted = (timeMaxLocalExecute - timeAvgLocalExecute); if (sizeSkew) { if (numRowsSkew) @@ -184,10 +199,10 @@ class IoSkewRule : public AActivityRule msg = "Significant skew in IO performance"; } assertex(msg); - cost_type costPenalty = calcIssueCost(timePenalty, options.queryOption(watCostRatePerHour)); - if (costPenalty >= options.queryOption(watOptMinInterestingCost)) + cost_type moneyWasted = calcIssueCost(options.queryOption(watClusterCostPerHour), timeWasted); + if (isWorthReporting(options, timeWasted, moneyWasted)) { - result.set(ANA_IOSKEW_RECORDS_ID, timePenalty, costPenalty, "%s is causing uneven %s time", msg, category); + result.set(ANA_IOSKEW_RECORDS_ID, timeWasted, moneyWasted, "%s is causing uneven %s time", msg, category); updateInformation(result, activity); return true; } @@ -223,8 +238,9 @@ class LocalExecuteSkewRule : public AActivityRule stat_type timeMaxLocalExecute = activity.getStatRaw(StTimeLocalExecute, StMaxX); stat_type timeAvgLocalExecute = activity.getStatRaw(StTimeLocalExecute, StAvgX); - stat_type timePenalty = (timeMaxLocalExecute - timeAvgLocalExecute); - if (timePenaltygetStatRaw(StNumRowsProcessed, StSkewMax)>options.queryOption(watOptSkewThreshold))) outputSkewed = true; - cost_type costPenalty = calcIssueCost(timePenalty, options.queryOption(watCostRatePerHour)); - if (costPenalty >= options.queryOption(watOptMinInterestingCost)) - { - if (inputSkewed) - result.set(ANA_EXECUTE_SKEW_ID, timePenalty, costPenalty, "Significant skew in local execute time caused by uneven input"); - else if (outputSkewed) - result.set(ANA_EXECUTE_SKEW_ID, timePenalty, costPenalty, "Significant skew in local execute time caused by uneven output"); - else - result.set(ANA_EXECUTE_SKEW_ID, timePenalty, costPenalty, "Significant skew in local execute time"); - updateInformation(result, activity); - return true; - } - return false; + if (inputSkewed) + result.set(ANA_EXECUTE_SKEW_ID, timeWasted, moneyWasted, "Significant skew in local execute time caused by uneven input"); + else if (outputSkewed) + result.set(ANA_EXECUTE_SKEW_ID, timeWasted, moneyWasted, "Significant skew in local execute time caused by uneven output"); + else + result.set(ANA_EXECUTE_SKEW_ID, timeWasted, moneyWasted, "Significant skew in local execute time"); + updateInformation(result, activity); + return true; } }; @@ -275,13 +286,13 @@ class KeyedJoinExcessRejectedRowsRule : public ActivityKindRule if (preFilteredPer > options.queryOption(watPreFilteredKJThreshold)) { IWuActivity * inputActivity = inputEdge->querySource(); - // Use input activity as the basis of timePenalty because the rows generated from input activity is being filtered out + // Use input activity as the basis of time wasted because the rows generated from input activity is being filtered out stat_type timeAvgLocalExecute = inputActivity->getStatRaw(StTimeLocalExecute, StAvgX); - stat_type timePenalty = statPercentageOf(timeAvgLocalExecute, preFilteredPer); - cost_type costPenalty = calcIssueCost(timePenalty, options.queryOption(watCostRatePerHour)); - if (costPenalty >= options.queryOption(watOptMinInterestingCost)) + stat_type timeWasted = statPercentageOf(timeAvgLocalExecute, preFilteredPer); + cost_type moneyWasted = calcIssueCost(options.queryOption(watClusterCostPerHour), timeWasted); + if (isWorthReporting(options, timeWasted, moneyWasted)) { - result.set(ANA_KJ_EXCESS_PREFILTER_ID, timePenalty, costPenalty, "Large number of rows from left dataset rejected in keyed join"); + result.set(ANA_KJ_EXCESS_PREFILTER_ID, timeWasted, moneyWasted, "Large number of rows from left dataset rejected in keyed join"); updateInformation(result, activity); return true; } diff --git a/common/wuanalysis/anawu.cpp b/common/wuanalysis/anawu.cpp index e19599771d0..8478f3045fb 100644 --- a/common/wuanalysis/anawu.cpp +++ b/common/wuanalysis/anawu.cpp @@ -72,12 +72,13 @@ struct WuOption constexpr struct WuOption wuOptionsDefaults[watOptMax] = { {watOptMinInterestingTime, "minInterestingTime", 1000, wutOptValueTypeMSec}, - {watOptMinInterestingCost, "minInterestingCost", money2cost_type(5.0) /* $5 */, wutOptValueTypeCost}, + {watOptMinInterestingCost, "minInterestingCost", money2cost_type(1.00) /* $1.00 */, wutOptValueTypeCost}, + {watOptMinInterestingWaste, "minInterestingTimeWaste", 30000, wutOptValueTypeMSec}, {watOptSkewThreshold, "skewThreshold", 20, wutOptValueTypePercent}, {watOptMinRowsPerNode, "minRowsPerNode", 1000, wutOptValueTypeCount}, {watPreFilteredKJThreshold, "preFilteredKJThreshold", 50, wutOptValueTypePercent}, - /* Note watCostRatePerHour cannot be used as debug option or config option (this is calculated) */ - {watCostRatePerHour, "costRatePerHour", 0, wutOptValueTypeCost}, + /* Note watClusterCostPerHour cannot be used as debug option or config option (this is calculated) */ + {watClusterCostPerHour, "costRatePerHour", 0, wutOptValueTypeCost}, }; constexpr bool checkWuOptionsDefaults(int i = watOptMax) @@ -127,13 +128,15 @@ class WuAnalyserOptions : public IAnalyserOptions { StringBuffer wuOptionName("@"); wuOptionName.append(wuOptionsDefaults[opt].name); - __int64 val = 0; - if (opt==watOptMinInterestingCost) - val = money2cost_type(options->getPropReal(wuOptionName, cost_type2money(-1.0))); - else - val = options->getPropInt64(wuOptionName, -1); - if (val>0) - setOptionValue(static_cast(opt), money2cost_type(val)); + if (options->hasProp(wuOptionName)) + { + stat_type val = 0; + if (opt==watOptMinInterestingCost) + val = money2cost_type(options->getPropReal(wuOptionName)); + else + val = options->getPropInt64(wuOptionName); + setOptionValue(static_cast(opt), val); + } } } @@ -143,13 +146,15 @@ class WuAnalyserOptions : public IAnalyserOptions { StringBuffer wuOptionName("analyzer_"); wuOptionName.append(wuOptionsDefaults[opt].name); - __int64 val = 0; - if (opt==watOptMinInterestingCost) - val = money2cost_type(wu->getDebugValueReal(wuOptionName, cost_type2money(-1.0))); - else - val = wu->getDebugValueInt64(wuOptionName, -1); - if (val>0) - setOptionValue(static_cast(opt), money2cost_type(val)); + if (wu->hasDebugValue(wuOptionName)) + { + stat_type val = 0; + if (opt==watOptMinInterestingCost) + val = money2cost_type(wu->getDebugValueReal(wuOptionName, 0.0)); + else + val = wu->getDebugValueInt64(wuOptionName, 0); + setOptionValue(static_cast(opt), val); + } } } stat_type queryOption(WutOptionType opt) const override { return wuOptions[opt]; } @@ -1367,9 +1372,9 @@ void WorkunitRuleAnalyser::applyConfig(IPropertyTree *cfg, IConstWorkUnit * wu, { options.applyConfig(cfg); options.applyConfig(wu); - /* watCostRatePerHour is calculated by caller and its value is set in options*/ - /* (So, watCostRatePerHour cannot be used as debug option or config option)*/ - options.setOptionValue(watCostRatePerHour, money2cost_type(costRate)); + /* watClusterCostPerHour is calculated by caller and its value is set in options*/ + /* (So, watClusterCostPerHour cannot be used as debug option or config option)*/ + options.setOptionValue(watClusterCostPerHour, money2cost_type(costRate)); } diff --git a/system/jlib/jstats.h b/system/jlib/jstats.h index 43c5bd441e5..d0ea8052040 100644 --- a/system/jlib/jstats.h +++ b/system/jlib/jstats.h @@ -34,8 +34,8 @@ const unsigned __int64 AnyStatisticValue = MaxStatisticValue; // Use the maximum inline constexpr stat_type seconds2StatUnits(stat_type secs) { return secs * 1000000000; } inline constexpr stat_type msecs2StatUnits(stat_type ms) { return ms * 1000000; } -inline constexpr stat_type statUnits2seconds(stat_type stat) {return stat / 1000000000; } -inline constexpr stat_type statUnits2msecs(stat_type stat) {return stat / 1000000; } +inline constexpr double statUnits2seconds(stat_type stat) {return ((double)stat) / 1000000000; } +inline constexpr double statUnits2msecs(stat_type stat) {return ((double)stat) / 1000000; } inline constexpr stat_type statPercent(int value) { return (stat_type)value * 100; } // Since 1 = 0.01% skew inline constexpr stat_type statPercent(double value) { return (stat_type)(value * 100); }