Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPCC-31650 Address incorrect analyzer cost calculations and cost threshold #19360

Merged
merged 1 commit into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions common/wuanalysis/anacommon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ void PerformanceIssue::print() const
printf("\n");
}

void PerformanceIssue::createException(IWorkUnit * wu, double costRate)
void PerformanceIssue::createException(IWorkUnit * wu)
{
ErrorSeverity mappedSeverity = wu->getWarningSeverity(errorCode, (ErrorSeverity)SeverityWarning);
if (mappedSeverity == SeverityIgnore)
Expand All @@ -66,17 +66,15 @@ void PerformanceIssue::createException(IWorkUnit * wu, double costRate)
StringBuffer s(comment); // Append scope to comment as scope column is not visible in ECLWatch
s.appendf(" (%s)", scope.str());
we->setExceptionMessage(s.str());
if (costRate!=0.0)
{
double timePenaltyPerHour = (double)statUnits2seconds(timePenalty) / 3600;
we->setCost(timePenaltyPerHour*costRate);
}
if (costPenalty!=0)
we->setCost(cost_type2money(costPenalty));
we->setExceptionSource(CostOptimizerName);
}

void PerformanceIssue::set(AnalyzerErrorCode _errorCode, stat_type _timePenalty, const char * msg, ...)
void PerformanceIssue::set(AnalyzerErrorCode _errorCode, stat_type _timePenalty, cost_type _costPenalty, const char * msg, ...)
{
timePenalty = _timePenalty;
costPenalty = _costPenalty;
errorCode = _errorCode;
va_list args;
va_start(args, msg);
Expand Down
10 changes: 7 additions & 3 deletions common/wuanalysis/anacommon.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,13 @@ class PerformanceIssue : public CInterface
public:
int compareCost(const PerformanceIssue & other) const;
void print() const;
void createException(IWorkUnit * we, double costRate);
void createException(IWorkUnit * we);

void set(AnalyzerErrorCode _errorCode, stat_type _timePenalty, const char * msg, ...) __attribute__((format(printf, 4, 5)));
void set(AnalyzerErrorCode _errorCode, stat_type _timePenalty, cost_type _costPenalty, const char * msg, ...) __attribute__((format(printf, 5, 6)));
void setLocation(const char * definition);
void setScope(const char *_scope) { scope.set(_scope); }
stat_type getTimePenalityCost() const { return timePenalty; }
stat_type getTimePenalty() const { return timePenalty; }
cost_type getCostPenalty() const { return costPenalty; }

private:
AnalyzerErrorCode errorCode = ANA_GENERICERROR_ID;
Expand All @@ -76,6 +77,7 @@ class PerformanceIssue : public CInterface
unsigned column = 0;
StringAttr scope;
stat_type timePenalty = 0; // number of nanoseconds lost as a result.
cost_type costPenalty = 0;
StringBuffer comment;
};

Expand All @@ -84,9 +86,11 @@ enum WutOptionType
watOptFirst=0,
watOptMinInterestingTime=0,
watOptMinInterestingCost,
watOptMinInterestingWaste,
watOptSkewThreshold,
watOptMinRowsPerNode,
watPreFilteredKJThreshold,
watClusterCostPerHour,
watOptMax
};

Expand Down
98 changes: 68 additions & 30 deletions common/wuanalysis/anarule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,28 @@
#include "anarule.hpp"
#include "commonext.hpp"


static constexpr cost_type calcIssueCost(const stat_type clusterCostPerHour, stat_type timeWasted)
{
double timePenaltyHours = statUnits2seconds(timeWasted) / 3600;
return timePenaltyHours * clusterCostPerHour;
}

static constexpr bool isWorthReporting(const IAnalyserOptions & options, stat_type timeWasted, cost_type moneyWasted)
{
// if neither threshold is set, then report all issues
if (timeWasted && options.queryOption(watOptMinInterestingWaste)==0 && options.queryOption(watOptMinInterestingCost)==0)
return true;
// if the cluster cost available and the issue cost is greater than threshold, report it
if (options.queryOption(watClusterCostPerHour) && options.queryOption(watOptMinInterestingCost)
&& (moneyWasted >= options.queryOption(watOptMinInterestingCost)))
return true;
// if the issue time wasted is greater than threshold, report it
if (options.queryOption(watOptMinInterestingWaste) && (timeWasted >= options.queryOption(watOptMinInterestingWaste)))
return true;
return false;
}

class ActivityKindRule : public AActivityRule
{
public:
Expand Down Expand Up @@ -52,21 +74,24 @@ class DistributeSkewRule : public ActivityKindRule
stat_type rowsMaxSkew = outputEdge->getStatRaw(StNumRowsProcessed, StSkewMax);
if (rowsMaxSkew > options.queryOption(watOptSkewThreshold))
{
// Use downstream activity time to calculate approximate cost
// Use downstream activity time to calculate approximate time wasted
IWuActivity * targetActivity = outputEdge->queryTarget();
assertex(targetActivity);
stat_type timeMaxLocalExecute = targetActivity->getStatRaw(StTimeLocalExecute, StMaxX);
stat_type timeAvgLocalExecute = targetActivity->getStatRaw(StTimeLocalExecute, StAvgX);
// Consider ways to improve this cost calculation further
stat_type cost = timeMaxLocalExecute - timeAvgLocalExecute;

IWuEdge * inputEdge = activity.queryInput(0);
if (inputEdge && (inputEdge->getStatRaw(StNumRowsProcessed, StSkewMax) < rowsMaxSkew))
result.set(ANA_DISTRIB_SKEW_INPUT_ID, cost, "DISTRIBUTE output skew is worse than input skew");
else
result.set(ANA_DISTRIB_SKEW_OUTPUT_ID, cost, "Significant skew in DISTRIBUTE output");
updateInformation(result, activity);
return true;
// Consider ways to improve this time wasted calculation
stat_type timeWasted = timeMaxLocalExecute - timeAvgLocalExecute;
cost_type moneyWasted = calcIssueCost(options.queryOption(watClusterCostPerHour), timeWasted);
if (isWorthReporting(options, timeWasted, moneyWasted))
{
IWuEdge * inputEdge = activity.queryInput(0);
if (inputEdge && (inputEdge->getStatRaw(StNumRowsProcessed, StSkewMax) < rowsMaxSkew))
result.set(ANA_DISTRIB_SKEW_INPUT_ID, timeWasted, moneyWasted, "DISTRIBUTE output skew is worse than input skew");
else
result.set(ANA_DISTRIB_SKEW_OUTPUT_ID, timeWasted, moneyWasted, "Significant skew in DISTRIBUTE output");
updateInformation(result, activity);
return true;
}
}
return false;
}
Expand Down Expand Up @@ -135,12 +160,13 @@ class IoSkewRule : public AActivityRule
stat_type timeMaxLocalExecute = activity.getStatRaw(StTimeLocalExecute, StMaxX);
stat_type timeAvgLocalExecute = activity.getStatRaw(StTimeLocalExecute, StAvgX);

stat_type cost;
stat_type timeWasted;
const char * msg = nullptr;
if ((actkind==TAKspillread||actkind==TAKspillwrite) && (activity.getStatRaw(stat, StMinX) == 0))
{
//If one node didn't spill then it is possible the skew caused all the lost time
cost = timeMaxLocalExecute;
result.set(ANA_IOSKEW_RECORDS_ID, cost, "Uneven worker spilling is causing uneven %s time", category);
timeWasted = timeMaxLocalExecute;
msg = "Uneven worker spilling";
}
else
{
Expand All @@ -161,19 +187,25 @@ class IoSkewRule : public AActivityRule
}
if (wuEdge && wuEdge->getStatRaw(StNumRowsProcessed, StSkewMax)>options.queryOption(watOptSkewThreshold))
numRowsSkew = true;
cost = (timeMaxLocalExecute - timeAvgLocalExecute);
timeWasted = (timeMaxLocalExecute - timeAvgLocalExecute);
if (sizeSkew)
{
if (numRowsSkew)
result.set(ANA_IOSKEW_RECORDS_ID, cost, "Significant skew in number of records is causing uneven %s time", category);
msg = "Significant skew in number of records";
else
result.set(ANA_IOSKEW_RECORDS_ID, cost, "Significant skew in record sizes is causing uneven %s time", category);
msg = "Significant skew in record sizes";
}
else
result.set(ANA_IOSKEW_RECORDS_ID, cost, "Significant skew in IO performance is causing uneven %s time", category);
msg = "Significant skew in IO performance";
}
assertex(msg);
cost_type moneyWasted = calcIssueCost(options.queryOption(watClusterCostPerHour), timeWasted);
if (isWorthReporting(options, timeWasted, moneyWasted))
{
result.set(ANA_IOSKEW_RECORDS_ID, timeWasted, moneyWasted, "%s is causing uneven %s time", msg, category);
updateInformation(result, activity);
return true;
}
updateInformation(result, activity);
return true;
}
return false;
}
Expand Down Expand Up @@ -206,8 +238,9 @@ class LocalExecuteSkewRule : public AActivityRule

stat_type timeMaxLocalExecute = activity.getStatRaw(StTimeLocalExecute, StMaxX);
stat_type timeAvgLocalExecute = activity.getStatRaw(StTimeLocalExecute, StAvgX);
stat_type timePenalty = (timeMaxLocalExecute - timeAvgLocalExecute);;
if (timePenalty<options.queryOption(watOptMinInterestingTime))
stat_type timeWasted = (timeMaxLocalExecute - timeAvgLocalExecute);
cost_type moneyWasted = calcIssueCost(options.queryOption(watClusterCostPerHour), timeWasted);
if (!isWorthReporting(options, timeWasted, moneyWasted))
return false;

bool inputSkewed = false;
Expand All @@ -225,11 +258,12 @@ class LocalExecuteSkewRule : public AActivityRule
outputSkewed = true;

if (inputSkewed)
result.set(ANA_EXECUTE_SKEW_ID, timePenalty, "Significant skew in local execute time caused by uneven input");
result.set(ANA_EXECUTE_SKEW_ID, timeWasted, moneyWasted, "Significant skew in local execute time caused by uneven input");
else if (outputSkewed)
result.set(ANA_EXECUTE_SKEW_ID, timePenalty, "Significant skew in local execute time caused by uneven output");
result.set(ANA_EXECUTE_SKEW_ID, timeWasted, moneyWasted, "Significant skew in local execute time caused by uneven output");
else
result.set(ANA_EXECUTE_SKEW_ID, timePenalty, "Significant skew in local execute time");
result.set(ANA_EXECUTE_SKEW_ID, timeWasted, moneyWasted, "Significant skew in local execute time");
updateInformation(result, activity);
return true;
}
};
Expand All @@ -252,12 +286,16 @@ class KeyedJoinExcessRejectedRowsRule : public ActivityKindRule
if (preFilteredPer > options.queryOption(watPreFilteredKJThreshold))
{
IWuActivity * inputActivity = inputEdge->querySource();
// Use input activity as the basis of cost because the rows generated from input activity is being filtered out
// Use input activity as the basis of time wasted because the rows generated from input activity is being filtered out
stat_type timeAvgLocalExecute = inputActivity->getStatRaw(StTimeLocalExecute, StAvgX);
stat_type cost = statPercentageOf(timeAvgLocalExecute, preFilteredPer);
result.set(ANA_KJ_EXCESS_PREFILTER_ID, cost, "Large number of rows from left dataset rejected in keyed join");
updateInformation(result, activity);
return true;
stat_type timeWasted = statPercentageOf(timeAvgLocalExecute, preFilteredPer);
cost_type moneyWasted = calcIssueCost(options.queryOption(watClusterCostPerHour), timeWasted);
if (isWorthReporting(options, timeWasted, moneyWasted))
{
result.set(ANA_KJ_EXCESS_PREFILTER_ID, timeWasted, moneyWasted, "Large number of rows from left dataset rejected in keyed join");
updateInformation(result, activity);
return true;
}
}
}
}
Expand Down
61 changes: 38 additions & 23 deletions common/wuanalysis/anawu.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ enum WutOptValueType
wutOptValueTypePercent,
wutOptValueTypeCount,
wutOptValueTypeBool,
wutOptValueTypeCost,
wutOptValueTypeMax,
};

Expand All @@ -71,10 +72,13 @@ struct WuOption

constexpr struct WuOption wuOptionsDefaults[watOptMax]
= { {watOptMinInterestingTime, "minInterestingTime", 1000, wutOptValueTypeMSec},
{watOptMinInterestingCost, "minInterestingCost", 30000, wutOptValueTypeMSec},
{watOptMinInterestingCost, "minInterestingCost", money2cost_type(1.00) /* $1.00 */, wutOptValueTypeCost},
{watOptMinInterestingWaste, "minInterestingTimeWaste", 30000, wutOptValueTypeMSec},
{watOptSkewThreshold, "skewThreshold", 20, wutOptValueTypePercent},
{watOptMinRowsPerNode, "minRowsPerNode", 1000, wutOptValueTypeCount},
{watPreFilteredKJThreshold, "preFilteredKJThreshold", 50, wutOptValueTypePercent},
/* Note watClusterCostPerHour cannot be used as debug option or config option (this is calculated) */
{watClusterCostPerHour, "costRatePerHour", 0, wutOptValueTypeCost},
};

constexpr bool checkWuOptionsDefaults(int i = watOptMax)
Expand Down Expand Up @@ -107,9 +111,8 @@ class WuAnalyserOptions : public IAnalyserOptions
case wutOptValueTypePercent:
wuOptions[opt] = statPercent((stat_type)val);
break;
case wutOptValueTypeCost:
case wutOptValueTypeCount:
wuOptions[opt] = (stat_type) val;
break;
case wutOptValueTypeBool:
wuOptions[opt] = (stat_type) val;
break;
Expand All @@ -125,9 +128,15 @@ class WuAnalyserOptions : public IAnalyserOptions
{
StringBuffer wuOptionName("@");
wuOptionName.append(wuOptionsDefaults[opt].name);
__int64 val = options->getPropInt64(wuOptionName, -1);
if (val!=-1)
if (options->hasProp(wuOptionName))
{
stat_type val = 0;
if (opt==watOptMinInterestingCost)
val = money2cost_type(options->getPropReal(wuOptionName));
else
val = options->getPropInt64(wuOptionName);
setOptionValue(static_cast<WutOptionType>(opt), val);
}
}
}

Expand All @@ -137,9 +146,15 @@ class WuAnalyserOptions : public IAnalyserOptions
{
StringBuffer wuOptionName("analyzer_");
wuOptionName.append(wuOptionsDefaults[opt].name);
__int64 val = wu->getDebugValueInt64(wuOptionName, -1);
if (val!=-1)
if (wu->hasDebugValue(wuOptionName))
{
stat_type val = 0;
if (opt==watOptMinInterestingCost)
val = money2cost_type(wu->getDebugValueReal(wuOptionName, 0.0));
else
val = wu->getDebugValueInt64(wuOptionName, 0);
setOptionValue(static_cast<WutOptionType>(opt), val);
}
}
}
stat_type queryOption(WutOptionType opt) const override { return wuOptions[opt]; }
Expand Down Expand Up @@ -175,12 +190,12 @@ class WorkunitRuleAnalyser : public WorkunitAnalyserBase
public:
WorkunitRuleAnalyser();

void applyConfig(IPropertyTree *cfg, IConstWorkUnit * wu);
void applyConfig(IPropertyTree *cfg, IConstWorkUnit * wu, double _costRate);

void applyRules();
void check(const char * scope, IWuActivity & activity);
void print();
void update(IWorkUnit *wu, double costRate);
void update(IWorkUnit *wu);

protected:
CIArrayOf<AActivityRule> rules;
Expand Down Expand Up @@ -1353,10 +1368,13 @@ WorkunitRuleAnalyser::WorkunitRuleAnalyser()
gatherRules(rules);
}

void WorkunitRuleAnalyser::applyConfig(IPropertyTree *cfg, IConstWorkUnit * wu)
void WorkunitRuleAnalyser::applyConfig(IPropertyTree *cfg, IConstWorkUnit * wu, double costRate)
{
options.applyConfig(cfg);
options.applyConfig(wu);
/* watClusterCostPerHour is calculated by caller and its value is set in options*/
/* (So, watClusterCostPerHour cannot be used as debug option or config option)*/
options.setOptionValue(watClusterCostPerHour, money2cost_type(costRate));
}


Expand All @@ -1372,11 +1390,8 @@ void WorkunitRuleAnalyser::check(const char * scope, IWuActivity & activity)
Owned<PerformanceIssue> issue (new PerformanceIssue);
if (rules.item(i).check(*issue, activity, options))
{
if (issue->getTimePenalityCost() >= options.queryOption(watOptMinInterestingCost))
{
if (!highestCostIssue || highestCostIssue->getTimePenalityCost() < issue->getTimePenalityCost())
highestCostIssue.setown(issue.getClear());
}
if (!highestCostIssue || highestCostIssue->getTimePenalty() < issue->getTimePenalty())
highestCostIssue.setown(issue.getClear());
}
}
}
Expand All @@ -1401,10 +1416,10 @@ void WorkunitRuleAnalyser::print()
issues.item(i).print();
}

void WorkunitRuleAnalyser::update(IWorkUnit *wu, double costRate)
void WorkunitRuleAnalyser::update(IWorkUnit *wu)
{
ForEachItemIn(i, issues)
issues.item(i).createException(wu, costRate);
issues.item(i).createException(wu);
}


Expand Down Expand Up @@ -2087,27 +2102,27 @@ void WorkunitStatsAnalyser::traceDependencies()

//---------------------------------------------------------------------------------------------------------------------

void WUANALYSIS_API analyseWorkunit(IWorkUnit * wu, const char *optGraph, IPropertyTree *options, double costPerMs)
void WUANALYSIS_API analyseWorkunit(IWorkUnit * wu, const char *optGraph, IPropertyTree *options, double costPerHour)
{
WorkunitRuleAnalyser analyser;
analyser.applyConfig(options, wu);
analyser.applyConfig(options, wu, costPerHour);
analyser.analyse(wu, optGraph);
analyser.applyRules();
analyser.update(wu, costPerMs);
analyser.update(wu);
}

void WUANALYSIS_API analyseAndPrintIssues(IConstWorkUnit * wu, double costRate, bool updatewu)
void WUANALYSIS_API analyseAndPrintIssues(IConstWorkUnit * wu, double costPerHour, bool updatewu)
{
WorkunitRuleAnalyser analyser;
analyser.applyConfig(nullptr, wu);
analyser.applyConfig(nullptr, wu, costPerHour);
analyser.analyse(wu, nullptr);
analyser.applyRules();
analyser.print();
if (updatewu)
{
Owned<IWorkUnit> lockedwu = &(wu->lock());
lockedwu->clearExceptions(CostOptimizerName);
analyser.update(lockedwu, costRate);
analyser.update(lockedwu);
}
}

Expand Down
Loading
Loading