Skip to content

Commit

Permalink
HPCC-29657 Only update aggregates if they have changed
Browse files Browse the repository at this point in the history
Signed-off-by: Shamser Ahmed <[email protected]>
  • Loading branch information
shamser committed Oct 4, 2023
1 parent c0f357e commit f6cac47
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 11 deletions.
6 changes: 3 additions & 3 deletions common/workunit/workunit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2751,7 +2751,7 @@ class SubGraphUpdaterCollection : public CInterfaceOf<IStatisticCollection>
{
return rootCollection->addStatistic(kind,value);
}
virtual void updateStatistic(StatisticKind kind, unsigned __int64 value, StatsMergeAction mergeAction) override
virtual bool updateStatistic(StatisticKind kind, unsigned __int64 value, StatsMergeAction mergeAction) override
{
return rootCollection->updateStatistic(kind, value, mergeAction);
}
Expand Down Expand Up @@ -2879,8 +2879,8 @@ void updateAggregates(IWorkUnit *wu, GlobalStatisticCollection & statsCollection
// 2) Serialize the aggregates into a blob in GraphProgress rather than to global stats
std::vector<StatisticKind> aggregateKinds = {StCostFileAccess, StSizeGraphSpill, StSizeSpillFile};
StatisticsAggregatesWriter statsAggregatorWriter(wu, aggregateKinds);
statsCollection.refreshAggregates(aggregateKinds);
statsCollection.visit(statsAggregatorWriter);
if(statsCollection.refreshAggregates(aggregateKinds)) // Only serialize if the aggregates has changed
statsCollection.visit(statsAggregatorWriter);
}

//---------------------------------------------------------------------------------------------------------------------
Expand Down
20 changes: 13 additions & 7 deletions system/jlib/jstats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1905,7 +1905,7 @@ class CStatisticCollection : public CInterfaceOf<IStatisticCollection>
stats.append(s);
}

virtual void updateStatistic(StatisticKind kind, unsigned __int64 value, StatsMergeAction mergeAction) override
virtual bool updateStatistic(StatisticKind kind, unsigned __int64 value, StatsMergeAction mergeAction) override
{
if (mergeAction != StatsMergeAppend)
{
Expand All @@ -1914,13 +1914,19 @@ class CStatisticCollection : public CInterfaceOf<IStatisticCollection>
Statistic & cur = stats.element(i);
if (cur.kind == kind)
{
if (mergeAction==StatsMergeReplace)
{
if (cur.value==value)
return false;
}
cur.value = mergeStatisticValue(cur.value, value, mergeAction);
return;
return true;
}
}
}
Statistic s(kind, value);
stats.append(s);
return true;
}

virtual IStatisticCollection * ensureSubScope(const StatsScopeId & search, bool hasChildren) override
Expand Down Expand Up @@ -2065,8 +2071,7 @@ class CStatisticCollection : public CInterfaceOf<IStatisticCollection>
if (iteratorVec!=aggregateKinds.end())
{
unsigned pos = iteratorVec-aggregateKinds.begin();
StatsMergeAction mergeAction = queryMergeMode(kind);
totals[pos] = mergeStatisticValue(totals[pos], stat.queryValue(), mergeAction);
totals[pos] += stat.queryValue();
updated = true;
}
}
Expand All @@ -2081,14 +2086,15 @@ class CStatisticCollection : public CInterfaceOf<IStatisticCollection>
}
if (updated)
{
updated=false;
std::vector<unsigned __int64>::iterator totalIter = totals.begin();
std::vector<unsigned __int64>::iterator subTotalIter = childTotals.begin();
std::vector<StatisticKind>::iterator kindIter = aggregateKinds.begin();
while (totalIter != totals.end())
{
StatsMergeAction mergeAction = queryMergeMode(*kindIter);
updateStatistic(*kindIter, *subTotalIter, mergeAction);
(*totalIter) = mergeStatisticValue(*totalIter, *subTotalIter, mergeAction);
if (updateStatistic(*kindIter, *subTotalIter, StatsMergeReplace))
updated=true;
(*totalIter) += *subTotalIter;

++totalIter;
++subTotalIter;
Expand Down
2 changes: 1 addition & 1 deletion system/jlib/jstats.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ interface IStatisticCollection : public IInterface
virtual void visitChildren(IStatisticVisitor & target) const = 0;
virtual IStatisticCollection * ensureSubScope(const StatsScopeId & search, bool hasChildren) = 0;
virtual void addStatistic(StatisticKind kind, unsigned __int64 value) = 0;
virtual void updateStatistic(StatisticKind kind, unsigned __int64 value, StatsMergeAction mergeAction) = 0;
virtual bool updateStatistic(StatisticKind kind, unsigned __int64 value, StatsMergeAction mergeAction) = 0;
virtual bool refreshAggregates(std::vector<StatisticKind> & aggregateKinds, std::vector<unsigned __int64> & totals) = 0;
virtual void deserialize(MemoryBuffer & in, unsigned version, StatisticScopeType minScope=SSTnone) = 0;
virtual void deserializeChild(StatsScopeId scopeId, MemoryBuffer & in, unsigned version, StatisticScopeType minScope=SSTnone) = 0;
Expand Down

0 comments on commit f6cac47

Please sign in to comment.