Skip to content

Commit

Permalink
HPCC-29657 Produce aggregate stats (e.g. spill, cost) whilst a job is…
Browse files Browse the repository at this point in the history
… running

Signed-off-by: Shamser Ahmed <[email protected]>
  • Loading branch information
shamser committed Oct 18, 2023
1 parent 9f4aa1e commit b0b22c3
Show file tree
Hide file tree
Showing 8 changed files with 244 additions and 147 deletions.
130 changes: 61 additions & 69 deletions common/workunit/workunit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2659,82 +2659,74 @@ cost_type aggregateCost(const IConstWorkUnit * wu, const char *scope, bool exclu
}
}

//aggregate disk costs from top-level subgraphs (when scope specified) or workflows (scope not specified)
cost_type aggregateDiskAccessCost(const IConstWorkUnit * wu, const char *scope)
{
WuScopeFilter filter;
if (!isEmptyString(scope))
filter.addScope(scope);
else
filter.addScope(""); // Needed to match scope
// when scope is a workflow, sum graph costs (or subgraph cost when no graph cost) to get workflow cost
// (Costs from child graphs and activities should have been summed up to graph/subgraph level already)
// when isEmptyString(scope), sum workflow costs (or graph cost when no workflow cost) to get global cost
// (Costs from all levels below graph should be summed upto at least graph level already)
// i.e. need 2 levels of nesting
filter.setIncludeNesting(2);
// includeNesting(2) needs just source "global". However, WuScopeFilter is incorrectly inferring the source as "global,stats",
// causing too many of the stats to be pulled in and inefficiency. Here, explicitly set source to "global"
filter.addSource("global");
filter.addOutputStatistic(StCostFileAccess);
filter.addRequiredStat(StCostFileAccess);
filter.finishedFilter();
Owned<IConstWUScopeIterator> it = &wu->getScopeIterator(filter);
cost_type totalCost = 0;
for (it->first(); it->isValid(); )
{
cost_type value = 0;
if (it->getStat(StCostFileAccess, value))
{
totalCost += value;
it->nextSibling();
}
else
{
it->next();
}
}
return totalCost;
}

void gatherSpillSize(const IConstWorkUnit * wu, const char *scope, stat_type & peakSizeSpill)
class StatisticsAggregatesWriter : implements IStatisticVisitor
{
WuScopeFilter filter;
if (!isEmptyString(scope))
filter.addScope(scope);
else
{
filter.addScope("");
filter.addSource("global");
}
filter.setIncludeNesting(1);
filter.addOutputStatistic(StSizeGraphSpill);
filter.addRequiredStat(StSizeGraphSpill);
filter.finishedFilter();
Owned<IConstWUScopeIterator> it = &wu->getScopeIterator(filter);
peakSizeSpill = 0;
for (it->first(); it->isValid(); )
std::vector<StatisticKind> & aggregateKinds;
Linked<IWorkUnit> wu;
public:
StatisticsAggregatesWriter(IWorkUnit * _wu, std::vector<StatisticKind> & _aggregateKinds): wu(_wu), aggregateKinds(_aggregateKinds) {}

virtual bool visitScope(const IStatisticCollection & cur)
{
stat_type value = 0;
if (it->getStat(StSizeGraphSpill, value))
switch (cur.queryScopeType())
{
if (value>peakSizeSpill)
peakSizeSpill = value;
it->nextSibling();
}
else
{
it->next();
case SSTglobal:
case SSTworkflow:
case SSTgraph:
for (auto kind: aggregateKinds)
{
stat_type value;
if (cur.getStatistic(kind, value) && value)
{
StringBuffer s;
wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), cur.queryScopeType(), cur.getFullScope(s).str(), kind, nullptr, value, 1, 0, StatsMergeReplace);
}
}
if (cur.queryScopeType()==SSTgraph)
return false;
else
return true;
default:
return false;
}
}
}
};

void updateSpillSize(IWorkUnit * wu, const char * scope, StatisticScopeType scopeType)
{
stat_type peakSizeSpill = 0;
gatherSpillSize(wu, scope, peakSizeSpill);
if (peakSizeSpill)
wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), scopeType, scope, StSizeGraphSpill, nullptr, peakSizeSpill, 1, 0, StatsMergeMax);
void updateAggregates(IWorkUnit *wu)
{
// updateAggregates() reads in the graph stats, calculates the aggregates and then writes the aggregates back to global statistics.
//
// The next iteration of this PR would be to track the stats in memory and, periodically, calculate the aggregates and write the aggregates. For Thor:
// it could do this by:
// 1) Having a single global GlobalStatisticCollection(derived from CStatisticCollection) in DeMonServer
// 2) When the jobs startup, populate the GlobalStatisticCollection with aggregates and graph stats
// 2) Replace existing code related to gathering stats from CMasterGraph and serializing with
// a new method that updates the statistics to GlobalStatisticCollection
// 3) New method for gathering and serializing graph stats:
// (i) A new class (say StatisticCollectionGatherer) would be needed with an IStatisticGatherer interface which updates the GlobalStatisticCollection
// - This StatisticCollectionGatherer would be bound to a particular graph scope (this is because the CMasterGraph::getStats expects this to be case)
// - Everytime StatisticCollectionGatherer is called with beginScope where the scope type is a subgraph, it would need to delete that scope and its children.
// This is because CMasterGraph::getStats returns a fresh full set of stats for a given subgraph.
// (ii) Create a new member function to write the stats to GraphProgress and aggregates to global stats. Possibly,
// (a) maintain a dirty flag for each subgraph scope so that only modified subgraphs are serialized.
// (b) this member function would be called by DeMonServer at fixed intevals
// 4) Modify GlobalStatisticCollection::refreshAggregates should clear the existing aggregates before calculating new aggregates
// - this is need because multiple calls to refreshAggregates should not add to existing aggregates
// Subsequent iteration:
// 1) Serialize the aggregates into a blob in GraphProgress rather than to global stats
Owned<IPropertyTree> root = getWUGraphProgress(wu->queryWuid(), true);
if (root)
{
Owned<IStatisticCollection> stats = createGlobalStatisticCollection(root);
std::vector<StatisticKind> aggregateKinds = {StCostFileAccess, StSizeGraphSpill, StSizeSpillFile};
std::vector<unsigned __int64> totals(aggregateKinds.size());

stats->refreshAggregates(aggregateKinds, totals);

StatisticsAggregatesWriter statsAggregatorWriter(wu, aggregateKinds);
stats->visit(statsAggregatorWriter);
}
}
//---------------------------------------------------------------------------------------------------------------------

Expand Down
3 changes: 1 addition & 2 deletions common/workunit/workunit.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1725,8 +1725,7 @@ extern WORKUNIT_API void updateWorkunitTimings(IWorkUnit * wu, ITimeReporter *ti
extern WORKUNIT_API void updateWorkunitTimings(IWorkUnit * wu, StatisticScopeType scopeType, StatisticKind kind, ITimeReporter *timer);
extern WORKUNIT_API void aggregateStatistic(StatsAggregation & result, IConstWorkUnit * wu, const WuScopeFilter & filter, StatisticKind search);
extern WORKUNIT_API cost_type aggregateCost(const IConstWorkUnit * wu, const char *scope=nullptr, bool excludehThor=false);
extern WORKUNIT_API cost_type aggregateDiskAccessCost(const IConstWorkUnit * wu, const char *scope);
extern WORKUNIT_API void updateSpillSize(IWorkUnit * wu, const char * scope, StatisticScopeType scopeType);
extern WORKUNIT_API void updateAggregates(IWorkUnit *wu);
extern WORKUNIT_API const char *getTargetClusterComponentName(const char *clustname, const char *processType, StringBuffer &name);
extern WORKUNIT_API void descheduleWorkunit(char const * wuid);
#if 0
Expand Down
10 changes: 2 additions & 8 deletions ecl/eclagent/eclagent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1988,10 +1988,7 @@ void EclAgent::doProcess()
const cost_type cost = aggregateCost(w, nullptr, false);
if (cost)
w->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTglobal, "", StCostExecute, NULL, cost, 1, 0, StatsMergeReplace);
const cost_type diskAccessCost = aggregateDiskAccessCost(w, nullptr);
if (diskAccessCost)
w->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTglobal, "", StCostFileAccess, NULL, diskAccessCost, 1, 0, StatsMergeReplace);
updateSpillSize(w, nullptr, SSTglobal);
updateAggregates(w);
addTimings(w);

switch (w->getState())
Expand Down Expand Up @@ -2513,10 +2510,7 @@ void EclAgentWorkflowMachine::noteTiming(unsigned wfid, timestamp_type startTime
const cost_type cost = money2cost_type(calcCost(agent.queryAgentMachineCost(), nanoToMilli(elapsedNs))) + aggregateCost(wu, scope, true);
if (cost)
wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTworkflow, scope, StCostExecute, NULL, cost, 1, 0, StatsMergeReplace);
const cost_type diskAccessCost = aggregateDiskAccessCost(wu, scope);
if (diskAccessCost)
wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTworkflow, scope, StCostFileAccess, NULL, diskAccessCost, 1, 0, StatsMergeReplace);
updateSpillSize(wu, scope, SSTworkflow);
updateAggregates(wu);
}

void EclAgentWorkflowMachine::doExecutePersistItem(IRuntimeWorkflowItem & item)
Expand Down
13 changes: 6 additions & 7 deletions ecl/eclagent/eclgraph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -897,10 +897,6 @@ void EclSubGraph::updateProgress()
if (cost)
lockedwu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTsubgraph, scope, StCostExecute, NULL, cost, 1, 0, StatsMergeReplace);
}
Owned<IStatisticCollection> statsCollection = stats.getResult();
const cost_type costDiskAccess = aggregateStatistic(StCostFileAccess, statsCollection) ;
if (costDiskAccess)
lockedwu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTsubgraph, scope, StCostFileAccess, NULL, costDiskAccess, 1, 0, StatsMergeReplace);
}
}
}
Expand All @@ -927,6 +923,11 @@ void EclSubGraph::updateProgress(IStatisticGatherer &progress)
}
ForEachItemIn(i2, subgraphs)
subgraphs.item(i2).updateProgress(progress);

Owned<IStatisticCollection> statsCollection = progress.getResult();
const cost_type costDiskAccess = aggregateStatistic(StCostFileAccess, statsCollection);
if (costDiskAccess)
progress.addStatistic(StCostFileAccess, costDiskAccess);
}

bool EclSubGraph::prepare(const byte * parentExtract, bool checkDependencies)
Expand Down Expand Up @@ -1278,9 +1279,7 @@ void EclGraph::execute(const byte * parentExtract)
if (cost)
wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTgraph, scope, StCostExecute, NULL, cost, 1, 0, StatsMergeReplace);

const cost_type costDiskAccess = aggregateDiskAccessCost(wu, scope);
if (costDiskAccess)
wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTgraph, scope, StCostFileAccess, NULL, costDiskAccess, 1, 0, StatsMergeReplace);
updateAggregates(wu);
}

if (agent->queryRemoteWorkunit())
Expand Down
Loading

0 comments on commit b0b22c3

Please sign in to comment.