diff --git a/common/workunit/workunit.cpp b/common/workunit/workunit.cpp index 64f569e5284..1c79fdbe821 100644 --- a/common/workunit/workunit.cpp +++ b/common/workunit/workunit.cpp @@ -2659,82 +2659,74 @@ cost_type aggregateCost(const IConstWorkUnit * wu, const char *scope, bool exclu } } -//aggregate disk costs from top-level subgraphs (when scope specified) or workflows (scope not specified) -cost_type aggregateDiskAccessCost(const IConstWorkUnit * wu, const char *scope) -{ - WuScopeFilter filter; - if (!isEmptyString(scope)) - filter.addScope(scope); - else - filter.addScope(""); // Needed to match scope - // when scope is a workflow, sum graph costs (or subgraph cost when no graph cost) to get workflow cost - // (Costs from child graphs and activities should have been summed up to graph/subgraph level already) - // when isEmptyString(scope), sum workflow costs (or graph cost when no workflow cost) to get global cost - // (Costs from all levels below graph should be summed upto at least graph level already) - // i.e. need 2 levels of nesting - filter.setIncludeNesting(2); - // includeNesting(2) needs just source "global". However, WuScopeFilter is incorrectly inferring the source as "global,stats", - // causing too many of the stats to be pulled in and inefficiency. Here, explicitly set source to "global" - filter.addSource("global"); - filter.addOutputStatistic(StCostFileAccess); - filter.addRequiredStat(StCostFileAccess); - filter.finishedFilter(); - Owned it = &wu->getScopeIterator(filter); - cost_type totalCost = 0; - for (it->first(); it->isValid(); ) - { - cost_type value = 0; - if (it->getStat(StCostFileAccess, value)) - { - totalCost += value; - it->nextSibling(); - } - else - { - it->next(); - } - } - return totalCost; -} -void gatherSpillSize(const IConstWorkUnit * wu, const char *scope, stat_type & peakSizeSpill) +class StatisticsAggregatesWriter : implements IStatisticVisitor { - WuScopeFilter filter; - if (!isEmptyString(scope)) - filter.addScope(scope); - else - { - filter.addScope(""); - filter.addSource("global"); - } - filter.setIncludeNesting(1); - filter.addOutputStatistic(StSizeGraphSpill); - filter.addRequiredStat(StSizeGraphSpill); - filter.finishedFilter(); - Owned it = &wu->getScopeIterator(filter); - peakSizeSpill = 0; - for (it->first(); it->isValid(); ) + std::vector & aggregateKinds; + Linked wu; +public: + StatisticsAggregatesWriter(IWorkUnit * _wu, std::vector & _aggregateKinds): wu(_wu), aggregateKinds(_aggregateKinds) {} + + virtual bool visitScope(const IStatisticCollection & cur) { - stat_type value = 0; - if (it->getStat(StSizeGraphSpill, value)) + switch (cur.queryScopeType()) { - if (value>peakSizeSpill) - peakSizeSpill = value; - it->nextSibling(); - } - else - { - it->next(); + case SSTglobal: + case SSTworkflow: + case SSTgraph: + for (auto kind: aggregateKinds) + { + stat_type value; + if (cur.getStatistic(kind, value) && value) + { + StringBuffer s; + wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), cur.queryScopeType(), cur.getFullScope(s).str(), kind, nullptr, value, 1, 0, StatsMergeReplace); + } + } + if (cur.queryScopeType()==SSTgraph) + return false; + else + return true; + default: + return false; } } -} +}; -void updateSpillSize(IWorkUnit * wu, const char * scope, StatisticScopeType scopeType) -{ - stat_type peakSizeSpill = 0; - gatherSpillSize(wu, scope, peakSizeSpill); - if (peakSizeSpill) - wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), scopeType, scope, StSizeGraphSpill, nullptr, peakSizeSpill, 1, 0, StatsMergeMax); +void updateAggregates(IWorkUnit *wu) +{ + // updateAggregates() reads in the graph stats, calculates the aggregates and then writes the aggregates back to global statistics. + // + // The next iteration of this PR would be to track the stats in memory and, periodically, calculate the aggregates and write the aggregates. For Thor: + // it could do this by: + // 1) Having a single global GlobalStatisticCollection(derived from CStatisticCollection) in DeMonServer + // 2) When the jobs startup, populate the GlobalStatisticCollection with aggregates and graph stats + // 2) Replace existing code related to gathering stats from CMasterGraph and serializing with + // a new method that updates the statistics to GlobalStatisticCollection + // 3) New method for gathering and serializing graph stats: + // (i) A new class (say StatisticCollectionGatherer) would be needed with an IStatisticGatherer interface which updates the GlobalStatisticCollection + // - This StatisticCollectionGatherer would be bound to a particular graph scope (this is because the CMasterGraph::getStats expects this to be case) + // - Everytime StatisticCollectionGatherer is called with beginScope where the scope type is a subgraph, it would need to delete that scope and its children. + // This is because CMasterGraph::getStats returns a fresh full set of stats for a given subgraph. + // (ii) Create a new member function to write the stats to GraphProgress and aggregates to global stats. Possibly, + // (a) maintain a dirty flag for each subgraph scope so that only modified subgraphs are serialized. + // (b) this member function would be called by DeMonServer at fixed intevals + // 4) Modify GlobalStatisticCollection::refreshAggregates should clear the existing aggregates before calculating new aggregates + // - this is need because multiple calls to refreshAggregates should not add to existing aggregates + // Subsequent iteration: + // 1) Serialize the aggregates into a blob in GraphProgress rather than to global stats + Owned root = getWUGraphProgress(wu->queryWuid(), true); + if (root) + { + Owned stats = createGlobalStatisticCollection(root); + std::vector aggregateKinds = {StCostFileAccess, StSizeGraphSpill, StSizeSpillFile}; + std::vector totals(aggregateKinds.size()); + + stats->refreshAggregates(aggregateKinds, totals); + + StatisticsAggregatesWriter statsAggregatorWriter(wu, aggregateKinds); + stats->visit(statsAggregatorWriter); + } } //--------------------------------------------------------------------------------------------------------------------- diff --git a/common/workunit/workunit.hpp b/common/workunit/workunit.hpp index 60c06b5a56d..3769438de71 100644 --- a/common/workunit/workunit.hpp +++ b/common/workunit/workunit.hpp @@ -1725,8 +1725,7 @@ extern WORKUNIT_API void updateWorkunitTimings(IWorkUnit * wu, ITimeReporter *ti extern WORKUNIT_API void updateWorkunitTimings(IWorkUnit * wu, StatisticScopeType scopeType, StatisticKind kind, ITimeReporter *timer); extern WORKUNIT_API void aggregateStatistic(StatsAggregation & result, IConstWorkUnit * wu, const WuScopeFilter & filter, StatisticKind search); extern WORKUNIT_API cost_type aggregateCost(const IConstWorkUnit * wu, const char *scope=nullptr, bool excludehThor=false); -extern WORKUNIT_API cost_type aggregateDiskAccessCost(const IConstWorkUnit * wu, const char *scope); -extern WORKUNIT_API void updateSpillSize(IWorkUnit * wu, const char * scope, StatisticScopeType scopeType); +extern WORKUNIT_API void updateAggregates(IWorkUnit *wu); extern WORKUNIT_API const char *getTargetClusterComponentName(const char *clustname, const char *processType, StringBuffer &name); extern WORKUNIT_API void descheduleWorkunit(char const * wuid); #if 0 diff --git a/ecl/eclagent/eclagent.cpp b/ecl/eclagent/eclagent.cpp index dc22e1f69dd..61314717e6c 100644 --- a/ecl/eclagent/eclagent.cpp +++ b/ecl/eclagent/eclagent.cpp @@ -1988,10 +1988,7 @@ void EclAgent::doProcess() const cost_type cost = aggregateCost(w, nullptr, false); if (cost) w->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTglobal, "", StCostExecute, NULL, cost, 1, 0, StatsMergeReplace); - const cost_type diskAccessCost = aggregateDiskAccessCost(w, nullptr); - if (diskAccessCost) - w->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTglobal, "", StCostFileAccess, NULL, diskAccessCost, 1, 0, StatsMergeReplace); - updateSpillSize(w, nullptr, SSTglobal); + updateAggregates(w); addTimings(w); switch (w->getState()) @@ -2513,10 +2510,7 @@ void EclAgentWorkflowMachine::noteTiming(unsigned wfid, timestamp_type startTime const cost_type cost = money2cost_type(calcCost(agent.queryAgentMachineCost(), nanoToMilli(elapsedNs))) + aggregateCost(wu, scope, true); if (cost) wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTworkflow, scope, StCostExecute, NULL, cost, 1, 0, StatsMergeReplace); - const cost_type diskAccessCost = aggregateDiskAccessCost(wu, scope); - if (diskAccessCost) - wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTworkflow, scope, StCostFileAccess, NULL, diskAccessCost, 1, 0, StatsMergeReplace); - updateSpillSize(wu, scope, SSTworkflow); + updateAggregates(wu); } void EclAgentWorkflowMachine::doExecutePersistItem(IRuntimeWorkflowItem & item) diff --git a/ecl/eclagent/eclgraph.cpp b/ecl/eclagent/eclgraph.cpp index d4ffdd99c42..eacfad7f992 100644 --- a/ecl/eclagent/eclgraph.cpp +++ b/ecl/eclagent/eclgraph.cpp @@ -897,10 +897,6 @@ void EclSubGraph::updateProgress() if (cost) lockedwu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTsubgraph, scope, StCostExecute, NULL, cost, 1, 0, StatsMergeReplace); } - Owned statsCollection = stats.getResult(); - const cost_type costDiskAccess = aggregateStatistic(StCostFileAccess, statsCollection) ; - if (costDiskAccess) - lockedwu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTsubgraph, scope, StCostFileAccess, NULL, costDiskAccess, 1, 0, StatsMergeReplace); } } } @@ -927,6 +923,11 @@ void EclSubGraph::updateProgress(IStatisticGatherer &progress) } ForEachItemIn(i2, subgraphs) subgraphs.item(i2).updateProgress(progress); + + Owned statsCollection = progress.getResult(); + const cost_type costDiskAccess = aggregateStatistic(StCostFileAccess, statsCollection); + if (costDiskAccess) + progress.addStatistic(StCostFileAccess, costDiskAccess); } bool EclSubGraph::prepare(const byte * parentExtract, bool checkDependencies) @@ -1278,9 +1279,7 @@ void EclGraph::execute(const byte * parentExtract) if (cost) wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTgraph, scope, StCostExecute, NULL, cost, 1, 0, StatsMergeReplace); - const cost_type costDiskAccess = aggregateDiskAccessCost(wu, scope); - if (costDiskAccess) - wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTgraph, scope, StCostFileAccess, NULL, costDiskAccess, 1, 0, StatsMergeReplace); + updateAggregates(wu); } if (agent->queryRemoteWorkunit()) diff --git a/system/jlib/jstats.cpp b/system/jlib/jstats.cpp index 8ebcdcadef5..ec62349dd7b 100644 --- a/system/jlib/jstats.cpp +++ b/system/jlib/jstats.cpp @@ -24,6 +24,7 @@ #include "jregexp.hpp" #include "jfile.hpp" #include "jerror.hpp" +#include "jlzw.hpp" #include #include #include @@ -1429,6 +1430,8 @@ StringBuffer & StatsScopeId::getScopeText(StringBuffer & out) const return out.append(ChannelScopePrefix).append(id); case SSTunknown: return out.append(name); + case SSTglobal: + return out; default: #ifdef _DEBUG throwUnexpected(); @@ -1747,11 +1750,11 @@ enum }; class CStatisticCollection; -static CStatisticCollection * deserializeCollection(CStatisticCollection * parent, MemoryBuffer & in, unsigned version); +static IStatisticCollection * deserializeCollection(IStatisticCollection * parent, MemoryBuffer & in, unsigned version); //MORE: Create an implementation with no children typedef StructArrayOf StatsArray; -class CollectionHashTable : public SuperHashTableOf +class CollectionHashTable : public SuperHashTableOf { public: ~CollectionHashTable() { _releaseAll(); } @@ -1783,31 +1786,16 @@ class CStatisticCollection : public CInterfaceOf { friend class CollectionHashTable; public: - CStatisticCollection(CStatisticCollection * _parent, const StatsScopeId & _id) : id(_id), parent(_parent) + CStatisticCollection(IStatisticCollection * _parent=nullptr) : parent(_parent) {} + + CStatisticCollection(IStatisticCollection * _parent, const StatsScopeId & _id) : id(_id), parent(_parent) { } - CStatisticCollection(CStatisticCollection * _parent, MemoryBuffer & in, unsigned version) : parent(_parent) + CStatisticCollection(IStatisticCollection * _parent, MemoryBuffer & in, unsigned version) : parent(_parent) { id.deserialize(in, version); - - unsigned numStats; - in.read(numStats); - stats.ensureCapacity(numStats); - while (numStats-- > 0) - { - Statistic next (in, version); - stats.append(next); - } - - unsigned numChildren; - in.read(numChildren); - children.ensure(numChildren); - while (numChildren-- > 0) - { - CStatisticCollection * next = deserializeCollection(this, in, version); - children.add(*next); - } + deserialize(in, version); } virtual byte getCollectionType() const { return SCintermediate; } @@ -1831,7 +1819,7 @@ class CStatisticCollection : public CInterfaceOf } virtual StringBuffer & getFullScope(StringBuffer & str) const override { - if (parent) + if (parent && queryScopeType()!=SSTworkflow) { parent->getFullScope(str); str.append(':'); @@ -1916,13 +1904,13 @@ class CStatisticCollection : public CInterfaceOf } //other public interface functions - void addStatistic(StatisticKind kind, unsigned __int64 value) + virtual void addStatistic(StatisticKind kind, unsigned __int64 value) override { Statistic s(kind, value); stats.append(s); } - void updateStatistic(StatisticKind kind, unsigned __int64 value, StatsMergeAction mergeAction) + virtual void updateStatistic(StatisticKind kind, unsigned __int64 value, StatsMergeAction mergeAction) override { if (mergeAction != StatsMergeAppend) { @@ -1940,14 +1928,14 @@ class CStatisticCollection : public CInterfaceOf stats.append(s); } - CStatisticCollection * ensureSubScope(const StatsScopeId & search, bool hasChildren) + virtual IStatisticCollection * ensureSubScope(const StatsScopeId & search, bool hasChildren) override { //Once the CStatisticCollection is created it should not be replaced - so that returned pointers remain valid. - CStatisticCollection * match = children.find(&search); + IStatisticCollection * match = children.find(&search); if (match) return match; - CStatisticCollection * ret = new CStatisticCollection(this, search); + IStatisticCollection * ret = new CStatisticCollection(this, search); children.add(*ret); return ret; } @@ -1967,6 +1955,30 @@ class CStatisticCollection : public CInterfaceOf iter.query().serialize(out); } + virtual void deserialize(MemoryBuffer & in, unsigned version) override + { + unsigned numStats; + in.read(numStats); + stats.ensureCapacity(numStats); + while (numStats-- > 0) + { + Statistic next (in, version); + stats.append(next); + } + + unsigned numChildren; + in.read(numChildren); + children.ensure(numChildren); + while (numChildren-- > 0) + { + byte kind; + in.read(kind); + StatsScopeId childId; + childId.deserialize(in, version); + IStatisticCollection * collection = ensureSubScope(childId, true); + collection->deserialize(in, version); + } + } inline const StatsScopeId & queryScopeId() const { return id; } virtual void mergeInto(IStatisticGatherer & target) const @@ -1994,9 +2006,58 @@ class CStatisticCollection : public CInterfaceOf cur.visit(visitor); } + virtual bool refreshAggregates(std::vector & aggregateKinds, std::vector & totals) override + { + assertex(aggregateKinds.size()==totals.size()); + + bool updated = false; + if (queryScopeType()==SSTsubgraph) + { + ForEachItemIn(i, stats) + { + Statistic & stat = stats.element(i); + StatisticKind kind = stat.queryKind(); + auto iteratorVec = std::find(aggregateKinds.begin(), aggregateKinds.end(), kind); + if (iteratorVec!=aggregateKinds.end()) + { + unsigned pos = iteratorVec-aggregateKinds.begin(); + StatsMergeAction mergeAction = queryMergeMode(kind); + totals[pos] = mergeStatisticValue(totals[pos], stat.queryValue(), mergeAction); + updated = true; + } + } + } + else + { + std::vector childTotals(aggregateKinds.size()); + for (auto & child : children) + { + if (child.refreshAggregates(aggregateKinds, childTotals)) + updated = true; + } + if (updated) + { + std::vector::iterator totalIter = totals.begin(); + std::vector::iterator subTotalIter = childTotals.begin(); + std::vector::iterator kindIter = aggregateKinds.begin(); + while (totalIter != totals.end()) + { + StatsMergeAction mergeAction = queryMergeMode(*kindIter); + updateStatistic(*kindIter, *subTotalIter, mergeAction); + (*totalIter) = mergeStatisticValue(*totalIter, *subTotalIter, mergeAction); + + ++totalIter; + ++subTotalIter; + ++kindIter; + } + } + } + return updated; + } + private: StatsScopeId id; - CStatisticCollection * parent; + IStatisticCollection * parent; protected: CollectionHashTable children; StatsArray stats; @@ -2072,11 +2133,11 @@ class CRootStatisticCollection : public CStatisticCollection { public: CRootStatisticCollection(StatisticCreatorType _creatorType, const char * _creator, const StatsScopeId & _id) - : CStatisticCollection(NULL, _id), creatorType(_creatorType), creator(_creator) + : CStatisticCollection(nullptr, _id), creatorType(_creatorType), creator(_creator) { whenCreated = getTimeStampNowValue(); } - CRootStatisticCollection(MemoryBuffer & in, unsigned version) : CStatisticCollection(NULL, in, version) + CRootStatisticCollection(MemoryBuffer & in, unsigned version) : CStatisticCollection(nullptr, in, version) { byte creatorTypeByte; in.read(creatorTypeByte); @@ -2114,6 +2175,51 @@ class CRootStatisticCollection : public CStatisticCollection }; +StatsScopeId globalScopeId(SSTglobal, (unsigned)0); +class GlobalStatisticCollection : public CStatisticCollection +{ +public: + GlobalStatisticCollection(IPropertyTree * root) : CStatisticCollection(nullptr, globalScopeId) + { + Owned iter = root->getElements("*"); + ForEach(*iter) + { + IPropertyTree * graphPT = &iter->query(); + + Owned iter2 = graphPT->getElements("./*"); + ForEach(*iter2) + { + IPropertyTree * sgPT = & iter2->query(); + const char * sgName = sgPT->queryName(); + if (strcmp(sgName, "node")==0) + continue; + assertex(strncmp(sgName, "sg", 2)==0); + MemoryBuffer compressed; + sgPT->getPropBin("Stats", compressed); + if (!compressed.length()) + return; + + MemoryBuffer serialized; + decompressToBuffer(serialized, compressed); + unsigned version; + serialized.read(version); + byte kind; + serialized.read(kind); + + StatsScopeId id; + id.deserialize(serialized, version); + IStatisticCollection * collection = ensureSubScope(id, true); + collection->deserialize(serialized, version); + } + } + } +}; + +IStatisticCollection * createGlobalStatisticCollection(IPropertyTree * root) +{ + return new GlobalStatisticCollection(root); +} + class StatAggregator : implements IStatisticVisitor { public: @@ -2167,14 +2273,13 @@ void serializeStatisticCollection(MemoryBuffer & out, IStatisticCollection * col collection->serialize(out); } -static CStatisticCollection * deserializeCollection(CStatisticCollection * parent, MemoryBuffer & in, unsigned version) +static IStatisticCollection * deserializeCollection(IStatisticCollection * parent, MemoryBuffer & in, unsigned version) { byte kind; in.read(kind); switch (kind) { case SCroot: - assertex(!parent); return new CRootStatisticCollection(in, version); case SCintermediate: return new CStatisticCollection(parent, in, version); @@ -2196,43 +2301,43 @@ IStatisticCollection * createStatisticCollection(MemoryBuffer & in) class StatisticGatherer : implements CInterfaceOf { public: - StatisticGatherer(CStatisticCollection * scope) : rootScope(scope) + StatisticGatherer(IStatisticCollection * scope) : rootScope(scope) { scopes.append(*scope); } virtual void beginScope(const StatsScopeId & id) override { - CStatisticCollection & tos = scopes.tos(); + IStatisticCollection & tos = scopes.tos(); scopes.append(*tos.ensureSubScope(id, true)); } virtual void beginActivityScope(unsigned id) override { StatsScopeId scopeId(SSTactivity, id); - CStatisticCollection & tos = scopes.tos(); + IStatisticCollection & tos = scopes.tos(); scopes.append(*tos.ensureSubScope(scopeId, false)); } virtual void beginSubGraphScope(unsigned id) override { StatsScopeId scopeId(SSTsubgraph, id); - CStatisticCollection & tos = scopes.tos(); + IStatisticCollection & tos = scopes.tos(); scopes.append(*tos.ensureSubScope(scopeId, true)); } virtual void beginEdgeScope(unsigned id, unsigned oid) override { StatsScopeId scopeId(SSTedge, id, oid); - CStatisticCollection & tos = scopes.tos(); + IStatisticCollection & tos = scopes.tos(); scopes.append(*tos.ensureSubScope(scopeId, false)); } virtual void beginChildGraphScope(unsigned id) override { StatsScopeId scopeId(SSTchildgraph, id); - CStatisticCollection & tos = scopes.tos(); + IStatisticCollection & tos = scopes.tos(); scopes.append(*tos.ensureSubScope(scopeId, true)); } virtual void beginChannelScope(unsigned id) override { StatsScopeId scopeId(SSTchannel, id); - CStatisticCollection & tos = scopes.tos(); + IStatisticCollection & tos = scopes.tos(); scopes.append(*tos.ensureSubScope(scopeId, true)); } virtual void endScope() override @@ -2241,12 +2346,12 @@ class StatisticGatherer : implements CInterfaceOf } virtual void addStatistic(StatisticKind kind, unsigned __int64 value) override { - CStatisticCollection & tos = scopes.tos(); + IStatisticCollection & tos = scopes.tos(); tos.addStatistic(kind, value); } virtual void updateStatistic(StatisticKind kind, unsigned __int64 value, StatsMergeAction mergeAction) override { - CStatisticCollection & tos = scopes.tos(); + IStatisticCollection & tos = scopes.tos(); tos.updateStatistic(kind, value, mergeAction); } virtual IStatisticCollection * getResult() override @@ -2255,8 +2360,8 @@ class StatisticGatherer : implements CInterfaceOf } protected: - ICopyArrayOf scopes; - Linked rootScope; + ICopyArrayOf scopes; + Linked rootScope; }; extern IStatisticGatherer * createStatisticsGatherer(StatisticCreatorType creatorType, const char * creator, const StatsScopeId & rootScope) diff --git a/system/jlib/jstats.h b/system/jlib/jstats.h index c86d18c1937..2508fc6524b 100644 --- a/system/jlib/jstats.h +++ b/system/jlib/jstats.h @@ -104,6 +104,18 @@ interface IStatisticCollectionIterator; interface IStatisticGatherer; interface IStatisticVisitor; +enum StatsMergeAction +{ + StatsMergeKeepNonZero, + StatsMergeReplace, + StatsMergeSum, + StatsMergeMin, + StatsMergeMax, + StatsMergeAppend, + StatsMergeFirst, + StatsMergeLast, +}; + interface IStatisticCollection : public IInterface { public: @@ -123,6 +135,11 @@ interface IStatisticCollection : public IInterface virtual StringBuffer &toXML(StringBuffer &out) const = 0; virtual void visit(IStatisticVisitor & target) const = 0; virtual void visitChildren(IStatisticVisitor & target) const = 0; + virtual IStatisticCollection * ensureSubScope(const StatsScopeId & search, bool hasChildren) = 0; + virtual void addStatistic(StatisticKind kind, unsigned __int64 value) = 0; + virtual void updateStatistic(StatisticKind kind, unsigned __int64 value, StatsMergeAction mergeAction) = 0; + virtual bool refreshAggregates(std::vector & aggregateKinds, std::vector & totals) = 0; + virtual void deserialize(MemoryBuffer & in, unsigned version) = 0; }; interface IStatisticCollectionIterator : public IIteratorOf @@ -134,17 +151,6 @@ interface IStatisticVisitor virtual bool visitScope(const IStatisticCollection & cur) = 0; // return true to iterate through children }; -enum StatsMergeAction -{ - StatsMergeKeepNonZero, - StatsMergeReplace, - StatsMergeSum, - StatsMergeMin, - StatsMergeMax, - StatsMergeAppend, - StatsMergeFirst, - StatsMergeLast, -}; interface IStatisticGatherer : public IInterface { @@ -885,6 +891,7 @@ extern jlib_decl StatisticScopeType queryScopeType(const char * sst, StatisticSc extern jlib_decl IStatisticGatherer * createStatisticsGatherer(StatisticCreatorType creatorType, const char * creator, const StatsScopeId & rootScope); extern jlib_decl void serializeStatisticCollection(MemoryBuffer & out, IStatisticCollection * collection); extern jlib_decl IStatisticCollection * createStatisticCollection(MemoryBuffer & in); +extern jlib_decl IStatisticCollection * createGlobalStatisticCollection(IPropertyTree * root, std::vector & aggregateKinds); inline unsigned __int64 milliToNano(unsigned __int64 value) { return value * 1000000; } // call avoids need to upcast values inline unsigned __int64 nanoToMilli(unsigned __int64 value) { return value / 1000000; } @@ -943,5 +950,5 @@ class jlib_decl RuntimeStatisticTarget : implements IStatisticTarget extern jlib_decl StringBuffer & formatMoney(StringBuffer &out, unsigned __int64 value); extern jlib_decl stat_type aggregateStatistic(StatisticKind kind, IStatisticCollection * statsCollection); - +extern jlib_decl IStatisticCollection * createGlobalStatisticCollection(IPropertyTree * root); #endif diff --git a/thorlcr/master/thdemonserver.cpp b/thorlcr/master/thdemonserver.cpp index 5de19dced0b..e96a4ae07c9 100644 --- a/thorlcr/master/thdemonserver.cpp +++ b/thorlcr/master/thdemonserver.cpp @@ -69,6 +69,10 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer else { StatsSubgraphScope subgraph(stats, graph->queryGraphId()); + cost_type costDiskAccess = graph->getDiskAccessCost(); + if (costDiskAccess) + stats.addStatistic(StCostFileAccess, costDiskAccess); + reportGraphContents(stats, graph); } } @@ -91,16 +95,13 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer if (costLimit || finished) { const cost_type sgCost = money2cost_type(calcCost(thorManagerRate, duration) + calcCost(thorWorkerRate, duration) * numberOfMachines); - cost_type costDiskAccess = graph.getDiskAccessCost(); if (finished) { if (sgCost) wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTsubgraph, graphScope, StCostExecute, NULL, sgCost, 1, 0, StatsMergeReplace); - if (costDiskAccess) - wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTsubgraph, graphScope, StCostFileAccess, NULL, costDiskAccess, 1, 0, StatsMergeReplace); } - const cost_type totalCost = workunitCost + sgCost + costDiskAccess; + const cost_type totalCost = workunitCost + sgCost + graph.getDiskAccessCost(); if (costLimit>0 && totalCost > costLimit) { LOG(MCwarning, thorJob, "ABORT job cost exceeds limit"); diff --git a/thorlcr/master/thgraphmanager.cpp b/thorlcr/master/thgraphmanager.cpp index 65d506fbd08..b249d301b4b 100644 --- a/thorlcr/master/thgraphmanager.cpp +++ b/thorlcr/master/thgraphmanager.cpp @@ -1134,7 +1134,7 @@ bool CJobManager::executeGraph(IConstWorkUnit &workunit, const char *graphName, cost_type cost = money2cost_type(calculateThorCost(nanoToMilli(graphTimeNs), numberOfMachines)); if (cost) wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTgraph, graphScope, StCostExecute, NULL, cost, 1, 0, StatsMergeReplace); - updateSpillSize(wu, graphScope, SSTgraph); + updateAggregates(wu); removeJob(*job); } catch (IException *e)