Skip to content

Commit

Permalink
Address incorrect analyzer cost calculations and cost threshold
Browse files Browse the repository at this point in the history
* The rate used to calculate the cost of issues has been updated
so that the unit is consistant and produces valid cost calculation
* 'minInterestingCost' is a decimal value to set the minimum dollar
cost value for reported issues.
* minInterestingCost' is compared with the calculated cost of the
issue (rather than the timePenalty)

Signed-off-by: Shamser Ahmed <[email protected]>
  • Loading branch information
shamser committed Dec 13, 2024
1 parent 803672b commit 25bc03a
Show file tree
Hide file tree
Showing 7 changed files with 120 additions and 61 deletions.
12 changes: 5 additions & 7 deletions common/wuanalysis/anacommon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ void PerformanceIssue::print() const
printf("\n");
}

void PerformanceIssue::createException(IWorkUnit * wu, double costRate)
void PerformanceIssue::createException(IWorkUnit * wu)
{
ErrorSeverity mappedSeverity = wu->getWarningSeverity(errorCode, (ErrorSeverity)SeverityWarning);
if (mappedSeverity == SeverityIgnore)
Expand All @@ -66,17 +66,15 @@ void PerformanceIssue::createException(IWorkUnit * wu, double costRate)
StringBuffer s(comment); // Append scope to comment as scope column is not visible in ECLWatch
s.appendf(" (%s)", scope.str());
we->setExceptionMessage(s.str());
if (costRate!=0.0)
{
double timePenaltyPerHour = (double)statUnits2seconds(timePenalty) / 3600;
we->setCost(timePenaltyPerHour*costRate);
}
if (costPenalty!=0)
we->setCost(cost_type2money(costPenalty));
we->setExceptionSource(CostOptimizerName);
}

void PerformanceIssue::set(AnalyzerErrorCode _errorCode, stat_type _timePenalty, const char * msg, ...)
void PerformanceIssue::set(AnalyzerErrorCode _errorCode, stat_type _timePenalty, cost_type _costPenalty, const char * msg, ...)
{
timePenalty = _timePenalty;
costPenalty = _costPenalty;
errorCode = _errorCode;
va_list args;
va_start(args, msg);
Expand Down
9 changes: 6 additions & 3 deletions common/wuanalysis/anacommon.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,13 @@ class PerformanceIssue : public CInterface
public:
int compareCost(const PerformanceIssue & other) const;
void print() const;
void createException(IWorkUnit * we, double costRate);
void createException(IWorkUnit * we);

void set(AnalyzerErrorCode _errorCode, stat_type _timePenalty, const char * msg, ...) __attribute__((format(printf, 4, 5)));
void set(AnalyzerErrorCode _errorCode, stat_type _timePenalty, cost_type _costPenalty, const char * msg, ...) __attribute__((format(printf, 5, 6)));
void setLocation(const char * definition);
void setScope(const char *_scope) { scope.set(_scope); }
stat_type getTimePenalityCost() const { return timePenalty; }
stat_type getTimePenalty() const { return timePenalty; }
cost_type getCostPenalty() const { return costPenalty; }

private:
AnalyzerErrorCode errorCode = ANA_GENERICERROR_ID;
Expand All @@ -76,6 +77,7 @@ class PerformanceIssue : public CInterface
unsigned column = 0;
StringAttr scope;
stat_type timePenalty = 0; // number of nanoseconds lost as a result.
cost_type costPenalty = 0;
StringBuffer comment;
};

Expand All @@ -87,6 +89,7 @@ enum WutOptionType
watOptSkewThreshold,
watOptMinRowsPerNode,
watPreFilteredKJThreshold,
watCostRatePerHour,
watOptMax
};

Expand Down
61 changes: 41 additions & 20 deletions common/wuanalysis/anarule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@
#include "anarule.hpp"
#include "commonext.hpp"


static cost_type calcCost(stat_type timePenalty, const stat_type clusterCostPerHour)
{
double timePenaltyPerHour = (double)statUnits2seconds(timePenalty) / 3600;
return timePenaltyPerHour*clusterCostPerHour;
}

class ActivityKindRule : public AActivityRule
{
public:
Expand Down Expand Up @@ -52,19 +59,21 @@ class DistributeSkewRule : public ActivityKindRule
stat_type rowsMaxSkew = outputEdge->getStatRaw(StNumRowsProcessed, StSkewMax);
if (rowsMaxSkew > options.queryOption(watOptSkewThreshold))
{
// Use downstream activity time to calculate approximate cost
// Use downstream activity time to calculate approximate timePenalty
IWuActivity * targetActivity = outputEdge->queryTarget();
assertex(targetActivity);
stat_type timeMaxLocalExecute = targetActivity->getStatRaw(StTimeLocalExecute, StMaxX);
stat_type timeAvgLocalExecute = targetActivity->getStatRaw(StTimeLocalExecute, StAvgX);
// Consider ways to improve this cost calculation further
stat_type cost = timeMaxLocalExecute - timeAvgLocalExecute;

// Consider ways to improve this timePenalty calculation further
stat_type timePenalty = timeMaxLocalExecute - timeAvgLocalExecute;
cost_type costPenalty = calcCost(timePenalty, options.queryOption(watCostRatePerHour));
if (costPenalty < options.queryOption(watOptMinInterestingCost))
return false;
IWuEdge * inputEdge = activity.queryInput(0);
if (inputEdge && (inputEdge->getStatRaw(StNumRowsProcessed, StSkewMax) < rowsMaxSkew))
result.set(ANA_DISTRIB_SKEW_INPUT_ID, cost, "DISTRIBUTE output skew is worse than input skew");
result.set(ANA_DISTRIB_SKEW_INPUT_ID, timePenalty, costPenalty, "DISTRIBUTE output skew is worse than input skew");
else
result.set(ANA_DISTRIB_SKEW_OUTPUT_ID, cost, "Significant skew in DISTRIBUTE output");
result.set(ANA_DISTRIB_SKEW_OUTPUT_ID, timePenalty, costPenalty, "Significant skew in DISTRIBUTE output");
updateInformation(result, activity);
return true;
}
Expand Down Expand Up @@ -135,12 +144,15 @@ class IoSkewRule : public AActivityRule
stat_type timeMaxLocalExecute = activity.getStatRaw(StTimeLocalExecute, StMaxX);
stat_type timeAvgLocalExecute = activity.getStatRaw(StTimeLocalExecute, StAvgX);

stat_type cost;
stat_type timePenalty;
if ((actkind==TAKspillread||actkind==TAKspillwrite) && (activity.getStatRaw(stat, StMinX) == 0))
{
//If one node didn't spill then it is possible the skew caused all the lost time
cost = timeMaxLocalExecute;
result.set(ANA_IOSKEW_RECORDS_ID, cost, "Uneven worker spilling is causing uneven %s time", category);
timePenalty = timeMaxLocalExecute;
cost_type costPenalty = calcCost(timePenalty, options.queryOption(watCostRatePerHour));
if (costPenalty < options.queryOption(watOptMinInterestingCost))
return false;
result.set(ANA_IOSKEW_RECORDS_ID, timePenalty, costPenalty, "Uneven worker spilling is causing uneven %s time", category);
}
else
{
Expand All @@ -161,16 +173,19 @@ class IoSkewRule : public AActivityRule
}
if (wuEdge && wuEdge->getStatRaw(StNumRowsProcessed, StSkewMax)>options.queryOption(watOptSkewThreshold))
numRowsSkew = true;
cost = (timeMaxLocalExecute - timeAvgLocalExecute);
timePenalty = (timeMaxLocalExecute - timeAvgLocalExecute);
cost_type costPenalty = calcCost(timePenalty, options.queryOption(watCostRatePerHour));
if (costPenalty < options.queryOption(watOptMinInterestingCost))
return false;
if (sizeSkew)
{
if (numRowsSkew)
result.set(ANA_IOSKEW_RECORDS_ID, cost, "Significant skew in number of records is causing uneven %s time", category);
result.set(ANA_IOSKEW_RECORDS_ID, timePenalty, costPenalty, "Significant skew in number of records is causing uneven %s time", category);
else
result.set(ANA_IOSKEW_RECORDS_ID, cost, "Significant skew in record sizes is causing uneven %s time", category);
result.set(ANA_IOSKEW_RECORDS_ID, timePenalty, costPenalty, "Significant skew in record sizes is causing uneven %s time", category);
}
else
result.set(ANA_IOSKEW_RECORDS_ID, cost, "Significant skew in IO performance is causing uneven %s time", category);
result.set(ANA_IOSKEW_RECORDS_ID, timePenalty, costPenalty, "Significant skew in IO performance is causing uneven %s time", category);
}
updateInformation(result, activity);
return true;
Expand Down Expand Up @@ -206,7 +221,7 @@ class LocalExecuteSkewRule : public AActivityRule

stat_type timeMaxLocalExecute = activity.getStatRaw(StTimeLocalExecute, StMaxX);
stat_type timeAvgLocalExecute = activity.getStatRaw(StTimeLocalExecute, StAvgX);
stat_type timePenalty = (timeMaxLocalExecute - timeAvgLocalExecute);;
stat_type timePenalty = (timeMaxLocalExecute - timeAvgLocalExecute);
if (timePenalty<options.queryOption(watOptMinInterestingTime))
return false;

Expand All @@ -224,12 +239,15 @@ class LocalExecuteSkewRule : public AActivityRule
if (wuOutputEdge && (wuOutputEdge->getStatRaw(StNumRowsProcessed, StSkewMax)>options.queryOption(watOptSkewThreshold)))
outputSkewed = true;

cost_type costPenalty = calcCost(timePenalty, options.queryOption(watCostRatePerHour));
if (costPenalty < options.queryOption(watOptMinInterestingCost))
return false;
if (inputSkewed)
result.set(ANA_EXECUTE_SKEW_ID, timePenalty, "Significant skew in local execute time caused by uneven input");
result.set(ANA_EXECUTE_SKEW_ID, timePenalty, costPenalty, "Significant skew in local execute time caused by uneven input");
else if (outputSkewed)
result.set(ANA_EXECUTE_SKEW_ID, timePenalty, "Significant skew in local execute time caused by uneven output");
result.set(ANA_EXECUTE_SKEW_ID, timePenalty, costPenalty, "Significant skew in local execute time caused by uneven output");
else
result.set(ANA_EXECUTE_SKEW_ID, timePenalty, "Significant skew in local execute time");
result.set(ANA_EXECUTE_SKEW_ID, timePenalty, costPenalty, "Significant skew in local execute time");
return true;
}
};
Expand All @@ -252,10 +270,13 @@ class KeyedJoinExcessRejectedRowsRule : public ActivityKindRule
if (preFilteredPer > options.queryOption(watPreFilteredKJThreshold))
{
IWuActivity * inputActivity = inputEdge->querySource();
// Use input activity as the basis of cost because the rows generated from input activity is being filtered out
// Use input activity as the basis of timePenalty because the rows generated from input activity is being filtered out
stat_type timeAvgLocalExecute = inputActivity->getStatRaw(StTimeLocalExecute, StAvgX);
stat_type cost = statPercentageOf(timeAvgLocalExecute, preFilteredPer);
result.set(ANA_KJ_EXCESS_PREFILTER_ID, cost, "Large number of rows from left dataset rejected in keyed join");
stat_type timePenalty = statPercentageOf(timeAvgLocalExecute, preFilteredPer);
cost_type costPenalty = calcCost(timePenalty, options.queryOption(watCostRatePerHour));
if (costPenalty < options.queryOption(watOptMinInterestingCost))
return false;
result.set(ANA_KJ_EXCESS_PREFILTER_ID, timePenalty, costPenalty, "Large number of rows from left dataset rejected in keyed join");
updateInformation(result, activity);
return true;
}
Expand Down
87 changes: 62 additions & 25 deletions common/wuanalysis/anawu.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ enum WutOptValueType
wutOptValueTypePercent,
wutOptValueTypeCount,
wutOptValueTypeBool,
wutOptValueTypeCost,
wutOptValueTypeMax,
};

Expand All @@ -71,10 +72,11 @@ struct WuOption

constexpr struct WuOption wuOptionsDefaults[watOptMax]
= { {watOptMinInterestingTime, "minInterestingTime", 1000, wutOptValueTypeMSec},
{watOptMinInterestingCost, "minInterestingCost", 30000, wutOptValueTypeMSec},
{watOptMinInterestingCost, "minInterestingCost", money2cost_type(0.1) /* $5 */, wutOptValueTypeCost},
{watOptSkewThreshold, "skewThreshold", 20, wutOptValueTypePercent},
{watOptMinRowsPerNode, "minRowsPerNode", 1000, wutOptValueTypeCount},
{watPreFilteredKJThreshold, "preFilteredKJThreshold", 50, wutOptValueTypePercent},
{watCostRatePerHour, "costRatePerHour", 0, wutOptValueTypeCost},
};

constexpr bool checkWuOptionsDefaults(int i = watOptMax)
Expand Down Expand Up @@ -107,9 +109,8 @@ class WuAnalyserOptions : public IAnalyserOptions
case wutOptValueTypePercent:
wuOptions[opt] = statPercent((stat_type)val);
break;
case wutOptValueTypeCost:
case wutOptValueTypeCount:
wuOptions[opt] = (stat_type) val;
break;
case wutOptValueTypeBool:
wuOptions[opt] = (stat_type) val;
break;
Expand All @@ -121,25 +122,63 @@ class WuAnalyserOptions : public IAnalyserOptions
void applyConfig(IPropertyTree *options)
{
if (!options) return;
bool minInterestedTimeExplitlySet = false;
for (int opt = watOptFirst; opt < watOptMax; opt++)
{
StringBuffer wuOptionName("@");
wuOptionName.append(wuOptionsDefaults[opt].name);
__int64 val = options->getPropInt64(wuOptionName, -1);
if (val!=-1)
setOptionValue(static_cast<WutOptionType>(opt), val);
if (opt==watOptMinInterestingCost)
{
double val = options->getPropReal(wuOptionName, -1.0);
if (val>=0.0)
{
setOptionValue(static_cast<WutOptionType>(opt), money2cost_type(val));
// if cost option provided, use it instead of the default interesting time option
if (!minInterestedTimeExplitlySet)
setOptionValue(watOptMinInterestingTime, 0);
}
}
else
{
__int64 val = options->getPropInt64(wuOptionName, -1);
if (val!=-1)
{
setOptionValue(static_cast<WutOptionType>(opt), val);
if (opt==watOptMinInterestingTime)
minInterestedTimeExplitlySet = true;
}
}
}
}

void applyConfig(IConstWorkUnit * wu)
{
bool minInterestedTimeExplitlySet = false;
for (int opt = watOptFirst; opt < watOptMax; opt++)
{
StringBuffer wuOptionName("analyzer_");
wuOptionName.append(wuOptionsDefaults[opt].name);
__int64 val = wu->getDebugValueInt64(wuOptionName, -1);
if (val!=-1)
setOptionValue(static_cast<WutOptionType>(opt), val);
if (opt==watOptMinInterestingCost)
{
double val = wu->getDebugValueReal(wuOptionName, -1.0);
if (val>=0.0)
{
setOptionValue(static_cast<WutOptionType>(opt), money2cost_type(val));
// if cost option provided, use it instead of the default interesting time option
if (!minInterestedTimeExplitlySet)
setOptionValue(watOptMinInterestingTime, 0);
}
}
else
{
__int64 val = wu->getDebugValueInt64(wuOptionName, -1);
if (val!=-1)
{
setOptionValue(static_cast<WutOptionType>(opt), val);
if (opt==watOptMinInterestingTime)
minInterestedTimeExplitlySet = true;
}
}
}
}
stat_type queryOption(WutOptionType opt) const override { return wuOptions[opt]; }
Expand Down Expand Up @@ -175,12 +214,12 @@ class WorkunitRuleAnalyser : public WorkunitAnalyserBase
public:
WorkunitRuleAnalyser();

void applyConfig(IPropertyTree *cfg, IConstWorkUnit * wu);
void applyConfig(IPropertyTree *cfg, IConstWorkUnit * wu, double _costRate);

void applyRules();
void check(const char * scope, IWuActivity & activity);
void print();
void update(IWorkUnit *wu, double costRate);
void update(IWorkUnit *wu);

protected:
CIArrayOf<AActivityRule> rules;
Expand Down Expand Up @@ -1353,10 +1392,11 @@ WorkunitRuleAnalyser::WorkunitRuleAnalyser()
gatherRules(rules);
}

void WorkunitRuleAnalyser::applyConfig(IPropertyTree *cfg, IConstWorkUnit * wu)
void WorkunitRuleAnalyser::applyConfig(IPropertyTree *cfg, IConstWorkUnit * wu, double costRate)
{
options.applyConfig(cfg);
options.applyConfig(wu);
options.setOptionValue(watCostRatePerHour, money2cost_type(costRate));
}


Expand All @@ -1372,11 +1412,8 @@ void WorkunitRuleAnalyser::check(const char * scope, IWuActivity & activity)
Owned<PerformanceIssue> issue (new PerformanceIssue);
if (rules.item(i).check(*issue, activity, options))
{
if (issue->getTimePenalityCost() >= options.queryOption(watOptMinInterestingCost))
{
if (!highestCostIssue || highestCostIssue->getTimePenalityCost() < issue->getTimePenalityCost())
highestCostIssue.setown(issue.getClear());
}
if (!highestCostIssue || highestCostIssue->getTimePenalty() < issue->getTimePenalty())
highestCostIssue.setown(issue.getClear());
}
}
}
Expand All @@ -1401,10 +1438,10 @@ void WorkunitRuleAnalyser::print()
issues.item(i).print();
}

void WorkunitRuleAnalyser::update(IWorkUnit *wu, double costRate)
void WorkunitRuleAnalyser::update(IWorkUnit *wu)
{
ForEachItemIn(i, issues)
issues.item(i).createException(wu, costRate);
issues.item(i).createException(wu);
}


Expand Down Expand Up @@ -2087,27 +2124,27 @@ void WorkunitStatsAnalyser::traceDependencies()

//---------------------------------------------------------------------------------------------------------------------

void WUANALYSIS_API analyseWorkunit(IWorkUnit * wu, const char *optGraph, IPropertyTree *options, double costPerMs)
void WUANALYSIS_API analyseWorkunit(IWorkUnit * wu, const char *optGraph, IPropertyTree *options, double costPerHour)
{
WorkunitRuleAnalyser analyser;
analyser.applyConfig(options, wu);
analyser.applyConfig(options, wu, costPerHour);
analyser.analyse(wu, optGraph);
analyser.applyRules();
analyser.update(wu, costPerMs);
analyser.update(wu);
}

void WUANALYSIS_API analyseAndPrintIssues(IConstWorkUnit * wu, double costRate, bool updatewu)
void WUANALYSIS_API analyseAndPrintIssues(IConstWorkUnit * wu, double costPerHour, bool updatewu)
{
WorkunitRuleAnalyser analyser;
analyser.applyConfig(nullptr, wu);
analyser.applyConfig(nullptr, wu, costPerHour);
analyser.analyse(wu, nullptr);
analyser.applyRules();
analyser.print();
if (updatewu)
{
Owned<IWorkUnit> lockedwu = &(wu->lock());
lockedwu->clearExceptions(CostOptimizerName);
analyser.update(lockedwu, costRate);
analyser.update(lockedwu);
}
}

Expand Down
4 changes: 2 additions & 2 deletions common/wuanalysis/anawu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@

#include "anacommon.hpp"

void WUANALYSIS_API analyseWorkunit(IWorkUnit * wu, const char *optGraph, IPropertyTree *options, double costPerMs);
void WUANALYSIS_API analyseAndPrintIssues(IConstWorkUnit * wu, double costPerMs, bool updatewu);
void WUANALYSIS_API analyseWorkunit(IWorkUnit * wu, const char *optGraph, IPropertyTree *options, double costPerHour);
void WUANALYSIS_API analyseAndPrintIssues(IConstWorkUnit * wu, double costPerHour, bool updatewu);

//---------------------------------------------------------------------------------------------------------------------

Expand Down
Loading

0 comments on commit 25bc03a

Please sign in to comment.