Skip to content

Commit

Permalink
HPCC-29657 Use CRuntimeStatisticsCollection & StatisticMapping
Browse files Browse the repository at this point in the history
Signed-off-by: Shamser Ahmed <[email protected]>
  • Loading branch information
shamser committed Oct 24, 2023
1 parent c451206 commit faf5c11
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 42 deletions.
29 changes: 17 additions & 12 deletions common/workunit/workunit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2665,7 +2665,7 @@ cost_type aggregateCost(const IConstWorkUnit * wu, const char *scope, bool exclu
}
};

GlobalStatisticCollection::GlobalStatisticCollection()
GlobalStatisticCollection::GlobalStatisticCollection() : aggregateKindsMapping(aggregateKindStatistics)
{
// Construct statsCollection here as GlobalStatisticCollection::load() is optional
StatsScopeId globalScopeId(SSTglobal, (unsigned)0);
Expand Down Expand Up @@ -2756,8 +2756,9 @@ void GlobalStatisticCollection::loadGlobalAggregates(IConstWorkUnit &workunit)

WuScopeFilter filter;
filter.addScopeType(SSTglobal).addScopeType(SSTgraph).addScopeType(SSTsubgraph);
for(auto aggregateKind: aggregateKinds)
filter.addOutputStatistic(aggregateKind);
const unsigned numStats = aggregateKindsMapping.numStatistics();
for (unsigned i=0; i<numStats; ++i)
filter.addOutputStatistic(aggregateKindsMapping.getKind(i));
filter.setDepth(1,3);
filter.setSources(SSFsearchGlobalStats);
filter.setIncludeNesting(0);
Expand All @@ -2784,20 +2785,21 @@ IStatisticCollection * GlobalStatisticCollection::getCollectionForUpdate(const S
// Re-calculated aggregates from subgraph scope
bool GlobalStatisticCollection::refreshAggregates()
{
std::vector<unsigned __int64> totals(aggregateKinds.size());
std::vector<bool> isTotalUpdated(aggregateKinds.size());
return statsCollection->refreshAggregates(aggregateKinds, totals, isTotalUpdated);
CRuntimeStatisticCollection totals(aggregateKindsMapping);
Owned<IBitSet> totalUpdated = createBitSet(aggregateKindsMapping.numStatistics());
return statsCollection->refreshAggregates(totals, *totalUpdated);
}

// Refresh aggregates and write them to global stats
// Load aggregates in collection from global stats
void GlobalStatisticCollection::updateAggregates(IWorkUnit *wu)
{
class StatisticsAggregatesWriter : implements IStatisticVisitor
{
std::vector<StatisticKind> & aggregateKinds;
const StatisticsMapping & aggregateKindsMapping;
const unsigned numStats;
Linked<IWorkUnit> wu;
public:
StatisticsAggregatesWriter(IWorkUnit * _wu, std::vector<StatisticKind> & _aggregateKinds): wu(_wu), aggregateKinds(_aggregateKinds) {}
StatisticsAggregatesWriter(IWorkUnit * _wu, const StatisticsMapping & _aggregateKindsMapping): wu(_wu), aggregateKindsMapping(_aggregateKindsMapping), numStats(aggregateKindsMapping.numStatistics()) {}

virtual bool visitScope(const IStatisticCollection & cur)
{
Expand All @@ -2806,8 +2808,9 @@ void GlobalStatisticCollection::updateAggregates(IWorkUnit *wu)
case SSTglobal:
case SSTworkflow:
case SSTgraph:
for (auto kind: aggregateKinds)
for (unsigned i=0; i<numStats; ++i)
{
StatisticKind kind = aggregateKindsMapping.getKind(i);
stat_type value;
if (cur.getStatistic(kind, value) && value)
{
Expand All @@ -2824,9 +2827,11 @@ void GlobalStatisticCollection::updateAggregates(IWorkUnit *wu)
}
}
};
StatisticsAggregatesWriter statsAggregatorWriter(wu, aggregateKinds);
if(refreshAggregates()) // Only serialize if the aggregates has changed
if (refreshAggregates()) // Only serialize if the aggregates has changed
{
StatisticsAggregatesWriter statsAggregatorWriter(wu, aggregateKindsMapping);
statsCollection->visit(statsAggregatorWriter);
}
}

//---------------------------------------------------------------------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion common/workunit/workunit.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1797,7 +1797,7 @@ class WORKUNIT_API GlobalStatisticCollection : public CInterface
void updateAggregates(IWorkUnit *wu);
private:
Owned<IStatisticCollection> statsCollection;
std::vector<StatisticKind> aggregateKinds = {StCostFileAccess, StSizeGraphSpill, StSizeSpillFile};
const StatisticsMapping & aggregateKindsMapping;
StringBuffer wuid;
};

Expand Down
47 changes: 19 additions & 28 deletions system/jlib/jstats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1331,6 +1331,7 @@ const StatisticsMapping diskLocalStatistics({StCycleDiskReadIOCycles, StSizeDisk
const StatisticsMapping diskRemoteStatistics({StTimeDiskReadIO, StSizeDiskRead, StNumDiskReads, StTimeDiskWriteIO, StSizeDiskWrite, StNumDiskWrites, StNumDiskRetries});
const StatisticsMapping diskReadRemoteStatistics({StTimeDiskReadIO, StSizeDiskRead, StNumDiskReads, StNumDiskRetries, StCycleDiskReadIOCycles});
const StatisticsMapping diskWriteRemoteStatistics({StTimeDiskWriteIO, StSizeDiskWrite, StNumDiskWrites, StNumDiskRetries, StCycleDiskWriteIOCycles});
const StatisticsMapping aggregateKindStatistics({StCostFileAccess, StSizeGraphSpill, StSizeSpillFile});

const StatisticsMapping * queryStatsMapping(const StatsScopeId & scope, unsigned hashcode)
{
Expand Down Expand Up @@ -2089,58 +2090,48 @@ class CStatisticCollection : public CInterfaceOf<IStatisticCollection>
cur.visit(visitor);
}

virtual bool refreshAggregates(const std::vector<StatisticKind> & aggregateKinds, std::vector<unsigned __int64> & totals, std::vector<bool> & isTotalUpdated) override
virtual bool refreshAggregates(CRuntimeStatisticCollection & totals, IBitSet & isTotalUpdated) override
{
assertex(aggregateKinds.size()==totals.size());

const StatisticsMapping & mapping = totals.queryMapping();
bool updated = false;
if (queryScopeType()==SSTsubgraph || isDirty==false)
{
ForEachItemIn(i, stats)
{
Statistic & stat = stats.element(i);
StatisticKind kind = stat.queryKind();
auto iteratorVec = std::find(aggregateKinds.begin(), aggregateKinds.end(), kind);
if (iteratorVec!=aggregateKinds.end())
if (kind != (StatisticKind)(kind & StKindMask))
continue; // ignore variants->not supported by CRuntimeStatisticCollection
unsigned index = mapping.getIndex(kind);
if (index!=mapping.numStatistics())
{
StatsMergeAction mergeAction = queryMergeMode(kind);
unsigned pos = iteratorVec-aggregateKinds.begin();
totals[pos] += mergeStatisticValue(totals[pos], stat.queryValue(), mergeAction);
totals.mergeStatistic(kind, stat.queryValue());
isTotalUpdated.set(index);
updated = true;
isTotalUpdated[pos]=true;
}
}
}
else
{
std::vector<unsigned __int64> childTotals(aggregateKinds.size());
std::vector<bool> isChildTotalUpdated(aggregateKinds.size()); // Every entry defaults false
CRuntimeStatisticCollection childTotals(mapping);
Owned<IBitSet> childTotalUpdated = createBitSet(mapping.numStatistics());
for (auto & child : children)
{
if (child.refreshAggregates(aggregateKinds, childTotals, isChildTotalUpdated))
if (child.refreshAggregates(childTotals, *childTotalUpdated))
updated = true;
}
if (updated)
{
std::vector<unsigned __int64>::iterator totalIter = totals.begin();
std::vector<unsigned __int64>::iterator subTotalIter = childTotals.begin();
std::vector<StatisticKind>::const_iterator kindIter = aggregateKinds.begin();
std::vector<bool>::const_iterator isChildTotalUpdatedIter = isChildTotalUpdated.begin();
std::vector<bool>::iterator isTotalUpdatedIter = isTotalUpdated.begin();
while (totalIter != totals.end())
ForEachItemIn(i, childTotals)
{
if (*isChildTotalUpdatedIter)
if (childTotalUpdated->test(i))
{
updateStatistic(*kindIter, *subTotalIter, StatsMergeReplace);
StatsMergeAction mergeAction = queryMergeMode(*kindIter);
*totalIter = mergeStatisticValue(*totalIter, *subTotalIter, mergeAction);
*isTotalUpdatedIter = true;
StatisticKind kind = childTotals.getKind(i);
unsigned __int64 value = childTotals.queryStatisticByIndex(i).get();
updateStatistic(kind, value, StatsMergeReplace);
totals.mergeStatistic(kind, value);
isTotalUpdated.set(i);
}
++totalIter;
++subTotalIter;
++kindIter;
++isChildTotalUpdatedIter;
++isTotalUpdatedIter;
}
}
isDirty=false;
Expand Down
6 changes: 5 additions & 1 deletion system/jlib/jstats.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <initializer_list>

#include "jstatcodes.h"
#include "jset.hpp"

typedef unsigned __int64 stat_type;
typedef unsigned __int64 cost_type; // Decimal currency amount multiplied by 10^6
Expand Down Expand Up @@ -116,6 +117,8 @@ enum StatsMergeAction
StatsMergeLast,
};

class StatisticsMapping;
class CRuntimeStatisticCollection;
interface IStatisticCollection : public IInterface
{
public:
Expand All @@ -142,7 +145,7 @@ interface IStatisticCollection : public IInterface
virtual IStatisticCollection * querySubScopePath(std::initializer_list<StatsScopeId> path) = 0;
virtual void addStatistic(StatisticKind kind, unsigned __int64 value) = 0;
virtual bool updateStatistic(StatisticKind kind, unsigned __int64 value, StatsMergeAction mergeAction) = 0;
virtual bool refreshAggregates(const std::vector<StatisticKind> & aggregateKinds, std::vector<unsigned __int64> & totals, std::vector<bool> & isKindUpdated) = 0;
virtual bool refreshAggregates(CRuntimeStatisticCollection & totals, IBitSet & isTotalUpdated) = 0;
virtual void deserialize(MemoryBuffer & in, unsigned version, int minDepth, int maxDepth) = 0;
virtual void deserializeChild(const StatsScopeId scopeId, MemoryBuffer & in, unsigned version, int minDepth, int maxDepth) = 0;
virtual void clearStats() = 0;
Expand Down Expand Up @@ -507,6 +510,7 @@ extern const jlib_decl StatisticsMapping diskRemoteStatistics;
extern const jlib_decl StatisticsMapping diskReadRemoteStatistics;
extern const jlib_decl StatisticsMapping diskWriteRemoteStatistics;
extern const jlib_decl StatisticsMapping jhtreeCacheStatistics;
extern const jlib_decl StatisticsMapping aggregateKindStatistics;

//---------------------------------------------------------------------------------------------------------------------

Expand Down

0 comments on commit faf5c11

Please sign in to comment.