From 0184f9645ece9e9cffd6cc68415a577c2d42ea6a Mon Sep 17 00:00:00 2001 From: Shamser Ahmed Date: Thu, 28 Sep 2023 14:54:46 +0100 Subject: [PATCH] Relocate GlobalStatisticCollection to workunit.cpp Prepare dynamic stats tracking by GlobalStatisticCollection Remove unnecessary calls to updateAggregation Track active graph stats for aggregation Improve performance by updating aggregates intermittantly and at end of each graph Load serialized graph stats when resuming job to ensure aggregates are calculated correctly. Only update aggregates if they have changed Signed-off-by: Shamser Ahmed --- common/workunit/workunit.cpp | 169 +++++++++++++++++++++--------- common/workunit/workunit.hpp | 21 +++- common/workunit/workunit.ipp | 4 +- ecl/eclagent/eclagent.cpp | 2 - ecl/eclagent/eclagent.ipp | 3 +- ecl/eclagent/eclgraph.cpp | 11 +- plugins/cassandra/cassandrawu.cpp | 8 +- system/jlib/jstats.cpp | 161 +++++++++++++++------------- system/jlib/jstats.h | 18 +++- thorlcr/master/thdemonserver.cpp | 18 +++- thorlcr/master/thdemonserver.hpp | 2 + thorlcr/master/thgraphmanager.cpp | 6 +- 12 files changed, 275 insertions(+), 148 deletions(-) diff --git a/common/workunit/workunit.cpp b/common/workunit/workunit.cpp index 5493bd0888f..8aafdda2765 100644 --- a/common/workunit/workunit.cpp +++ b/common/workunit/workunit.cpp @@ -183,20 +183,27 @@ void doDescheduleWorkkunit(char const * wuid) * Graph progress support */ -CWuGraphStats::CWuGraphStats(StatisticCreatorType _creatorType, const char * _creator, unsigned wfid, const char * _rootScope, unsigned _id, bool _merge) +CWuGraphStats::CWuGraphStats(StatisticCreatorType _creatorType, const char * _creator, unsigned wfid, const char * _rootScope, unsigned _id, bool _merge, GlobalStatisticCollection * globalStatsCollection) : creatorType(_creatorType), creator(_creator), id(_id), merge(_merge) { StatsScopeId graphScopeId; verifyex(graphScopeId.setScopeText(_rootScope)); + StatsScopeId wfScopeId(SSTworkflow,wfid); - StatsScopeId rootScopeId(SSTworkflow,wfid); - collector.setown(createStatisticsGatherer(_creatorType, _creator, rootScopeId)); + if (globalStatsCollection) + { + StatsScopeId sgScopeId(SSTsubgraph, id); + collector.setown(createStatisticsGatherer(globalStatsCollection->getCollection(wfScopeId, graphScopeId, sgScopeId, _creatorType, _creator, true))); + } + else + collector.setown(createStatisticsGatherer(_creatorType, _creator, wfScopeId)); collector->beginScope(graphScopeId); } void CWuGraphStats::beforeDispose() { collector->endScope(); + StringBuffer tag; tag.append("sg").append(id); @@ -2659,6 +2666,87 @@ cost_type aggregateCost(const IConstWorkUnit * wu, const char *scope, bool exclu } } +GlobalStatisticCollection::GlobalStatisticCollection() +{ + StatsScopeId globalScopeId(SSTglobal, (unsigned)0); + statsCollection.setown(createStatisticCollection(nullptr, globalScopeId)); +} + +void GlobalStatisticCollection::load(const char *_wuid, unsigned statsMaxDepth, bool missingScopesOnly) +{ + if (strcmp(_wuid, wuid.str())!=0) // Make sure stats collection is for this workunit + { + if (!wuid.isEmpty()) + { + StatsScopeId globalScopeId(SSTglobal, (unsigned)0); + statsCollection.setown(createStatisticCollection(nullptr, globalScopeId)); + } + wuid.set(_wuid); + } + Owned root = getWUGraphProgress(wuid, true); + if (!root) + return; + Owned iter = root->getElements("*"); + ForEach(*iter) + { + IPropertyTree * graphPT = &iter->query(); + StatsScopeId graphScopeId; + verifyex(graphScopeId.setScopeText(graphPT->queryName())); + StatsScopeId wfScopeId(SSTworkflow, graphPT->getPropInt64("@wfid",0)); + Owned iter2 = graphPT->getElements("./*"); + ForEach(*iter2) + { + StatsScopeId sgScopeId; + IPropertyTree * sgPT = & iter2->query(); + const char * sgName = sgPT->queryName(); + if (strcmp(sgName, "node")==0) + continue; + verifyex(sgScopeId.setScopeText(sgName)); + + if (missingScopesOnly) // Skip any scopes that are already in the stats collection + { + IStatisticCollection * sgCollection = statsCollection->querySubScopePath({wfScopeId, graphScopeId, sgScopeId}); + if (sgCollection) + continue; + } + + MemoryBuffer compressed; + sgPT->getPropBin("Stats", compressed); + if (!compressed.length()) + break; + + MemoryBuffer serialized; + decompressToBuffer(serialized, compressed); + unsigned version; + serialized.read(version); + byte kind; + serialized.read(kind); + + StatsScopeId childId; + childId.deserialize(serialized, version); + statsCollection->deserializeChild(childId, serialized, version, statsMaxDepth); + } + } +} + +bool GlobalStatisticCollection::refreshAggregates(std::vector & aggregateKinds) +{ + std::vector totals(aggregateKinds.size()); + return statsCollection->refreshAggregates(aggregateKinds, totals); +} + +void GlobalStatisticCollection::visit(IStatisticVisitor & target) const +{ + statsCollection->visit(target); +} + +IStatisticCollection * GlobalStatisticCollection::getCollection(const StatsScopeId & wfScopeId, const StatsScopeId & graphScopeId, const StatsScopeId & sgScopeId, StatisticCreatorType creatorType, const char * creator, bool clearStats) +{ + IStatisticCollection * sgScopeCollection = statsCollection->ensureSubScopePath({wfScopeId,graphScopeId, sgScopeId}); + if (clearStats) + sgScopeCollection->clearStats(); + return createRootStatisticCollection(creatorType, creator, wfScopeId, graphScopeId, sgScopeCollection); +} class StatisticsAggregatesWriter : implements IStatisticVisitor { @@ -2669,6 +2757,8 @@ class StatisticsAggregatesWriter : implements IStatisticVisitor virtual bool visitScope(const IStatisticCollection & cur) { + StringBuffer s2, s3; + DBGLOG("visitScope %s -> %s", cur.getScope(s2).str(), cur.getFullScope(s3).str() ); switch (cur.queryScopeType()) { case SSTglobal: @@ -2683,51 +2773,28 @@ class StatisticsAggregatesWriter : implements IStatisticVisitor wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), cur.queryScopeType(), cur.getFullScope(s).str(), kind, nullptr, value, 1, 0, StatsMergeReplace); } } - if (cur.queryScopeType()==SSTgraph) - return false; - else + //if (cur.queryScopeType()==SSTgraph) + // return false; + //else return true; default: - return false; + //return false; + return true; } } }; -void updateAggregates(IWorkUnit *wu) -{ - // updateAggregates() reads in the graph stats, calculates the aggregates and then writes the aggregates back to global statistics. - // - // The next iteration of this PR would be to track the stats in memory and, periodically, calculate the aggregates and write the aggregates. For Thor: - // it could do this by: - // 1) Having a single global GlobalStatisticCollection(derived from CStatisticCollection) in DeMonServer - // 2) When the jobs startup, populate the GlobalStatisticCollection with aggregates and graph stats - // 2) Replace existing code related to gathering stats from CMasterGraph and serializing with - // a new method that updates the statistics to GlobalStatisticCollection - // 3) New method for gathering and serializing graph stats: - // (i) A new class (say StatisticCollectionGatherer) would be needed with an IStatisticGatherer interface which updates the GlobalStatisticCollection - // - This StatisticCollectionGatherer would be bound to a particular graph scope (this is because the CMasterGraph::getStats expects this to be case) - // - Everytime StatisticCollectionGatherer is called with beginScope where the scope type is a subgraph, it would need to delete that scope and its children. - // This is because CMasterGraph::getStats returns a fresh full set of stats for a given subgraph. - // (ii) Create a new member function to write the stats to GraphProgress and aggregates to global stats. Possibly, - // (a) maintain a dirty flag for each subgraph scope so that only modified subgraphs are serialized. - // (b) this member function would be called by DeMonServer at fixed intevals - // 4) Modify GlobalStatisticCollection::refreshAggregates should clear the existing aggregates before calculating new aggregates - // - this is need because multiple calls to refreshAggregates should not add to existing aggregates - // Subsequent iteration: - // 1) Serialize the aggregates into a blob in GraphProgress rather than to global stats - Owned root = getWUGraphProgress(wu->queryWuid(), true); - if (root) - { - Owned stats = createGlobalStatisticCollection(root); - std::vector aggregateKinds = {StCostFileAccess, StSizeGraphSpill, StSizeSpillFile}; - std::vector totals(aggregateKinds.size()); - - stats->refreshAggregates(aggregateKinds, totals); - - StatisticsAggregatesWriter statsAggregatorWriter(wu, aggregateKinds); - stats->visit(statsAggregatorWriter); - } +void updateAggregates(IWorkUnit *wu, GlobalStatisticCollection & statsCollection) +{ + // Further improvements: + // 1) maintain a dirty flag for each subgraph scope so that only modified subgraphs are serialized. + // 2) Serialize the aggregates into a blob in GraphProgress rather than to global stats + std::vector aggregateKinds = {StCostFileAccess, StSizeGraphSpill, StSizeSpillFile}; + StatisticsAggregatesWriter statsAggregatorWriter(wu, aggregateKinds); + if(statsCollection.refreshAggregates(aggregateKinds)) // Only serialize if the aggregates has changed + statsCollection.visit(statsAggregatorWriter); } + //--------------------------------------------------------------------------------------------------------------------- @@ -3797,8 +3864,8 @@ class CDaliWorkUnit; class CDaliWuGraphStats : public CWuGraphStats { public: - CDaliWuGraphStats(const CDaliWorkUnit* _owner, StatisticCreatorType _creatorType, const char * _creator, unsigned _wfid, const char * _rootScope, unsigned _id, bool _merge) - : CWuGraphStats(_creatorType, _creator, _wfid, _rootScope, _id, _merge), owner(_owner), graphName(_rootScope), wfid(_wfid) + CDaliWuGraphStats(const CDaliWorkUnit* _owner, StatisticCreatorType _creatorType, const char * _creator, unsigned _wfid, const char * _rootScope, unsigned _id, bool _merge, GlobalStatisticCollection * stats) + : CWuGraphStats(_creatorType, _creator, _wfid, _rootScope, _id, _merge, stats), owner(_owner), graphName(_rootScope), wfid(_wfid) { } protected: @@ -3812,8 +3879,8 @@ class CDaliWuGraphStats : public CWuGraphStats class CLocalWuGraphStats : public CWuGraphStats { public: - CLocalWuGraphStats(IPropertyTree *_p, StatisticCreatorType _creatorType, const char * _creator, unsigned _wfid, const char * _rootScope, unsigned _id, bool _merge) - : CWuGraphStats(_creatorType, _creator, _wfid, _rootScope, _id, _merge), graphName(_rootScope), p(_p) + CLocalWuGraphStats(IPropertyTree *_p, StatisticCreatorType _creatorType, const char * _creator, unsigned _wfid, const char * _rootScope, unsigned _id, bool _merge, GlobalStatisticCollection * stats) + : CWuGraphStats(_creatorType, _creator, _wfid, _rootScope, _id, _merge, stats), graphName(_rootScope), p(_p) { } protected: @@ -4117,9 +4184,9 @@ class CDaliWorkUnit : public CPersistedWorkUnit } } } - virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge) const override + virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge, GlobalStatisticCollection * stats) const override { - return new CDaliWuGraphStats(this, creatorType, creator, _wfid, graphName, subgraph, merge); + return new CDaliWuGraphStats(this, creatorType, creator, _wfid, graphName, subgraph, merge, stats); } virtual void import(IPropertyTree *wuTree, IPropertyTree *graphProgressTree) { @@ -4429,8 +4496,8 @@ class CLockedWorkUnit : implements ILocalWorkUnit, implements IExtendedWUInterfa { c->setGraphState(graphName, wfid, state); } virtual void setNodeState(const char *graphName, WUGraphIDType nodeId, WUGraphState state) const { c->setNodeState(graphName, nodeId, state); } - virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge) const override - { return c->updateStats(graphName, creatorType, creator, _wfid, subgraph, merge); } + virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge, GlobalStatisticCollection * stats=nullptr) const override + { return c->updateStats(graphName, creatorType, creator, _wfid, subgraph, merge, stats); } virtual void clearGraphProgress() const { c->clearGraphProgress(); } virtual IStringVal & getAbortBy(IStringVal & str) const @@ -10260,9 +10327,9 @@ void CLocalWorkUnit::setNodeState(const char *graphName, WUGraphIDType nodeId, W { throwUnexpected(); // Should only be used for persisted workunits } -IWUGraphStats *CLocalWorkUnit::updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge) const +IWUGraphStats *CLocalWorkUnit::updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge, GlobalStatisticCollection * stats) const { - return new CLocalWuGraphStats(LINK(p), creatorType, creator, _wfid, graphName, subgraph, merge); + return new CLocalWuGraphStats(LINK(p), creatorType, creator, _wfid, graphName, subgraph, merge, stats); } void CLocalWUGraph::setName(const char *str) diff --git a/common/workunit/workunit.hpp b/common/workunit/workunit.hpp index dd1ca7f8722..053c0359126 100644 --- a/common/workunit/workunit.hpp +++ b/common/workunit/workunit.hpp @@ -1175,6 +1175,23 @@ interface IConstWUScopeIterator : extends IScmIterator //--------------------------------------------------------------------------------------------------------------------- +class WORKUNIT_API GlobalStatisticCollection : public CInterface +{ +public: + GlobalStatisticCollection(); + bool refreshAggregates(std::vector & aggregateKinds); + void visit(IStatisticVisitor & target) const; + // getCollection() returns IStatisticCollection for given rootScope + // if clearStats==true then the existing stats are cleared for the given scope + IStatisticCollection * getCollection(const StatsScopeId & wfScopeId, const StatsScopeId & graphScopeId, const StatsScopeId & sgScopeId, StatisticCreatorType creatorType, const char * creator, bool clearStats); + // statsMaxDepth = load stats up until this depth (e.g. 3 means loads stats up until sg scope ) + void load(const char *_wuid, unsigned statsMaxDepth, bool missingScopesOnly); + IStatisticCollection * queryCollection() { return statsCollection; } +private: + Owned statsCollection; + StringBuffer wuid; +}; + //! IWorkUnit //! Provides high level access to WorkUnit "header" data. interface IWorkUnit; @@ -1302,7 +1319,7 @@ interface IConstWorkUnit : extends IConstWorkUnitInfo virtual WUGraphState queryNodeState(const char *graphName, WUGraphIDType nodeId) const = 0; virtual void setGraphState(const char *graphName, unsigned wfid, WUGraphState state) const = 0; virtual void setNodeState(const char *graphName, WUGraphIDType nodeId, WUGraphState state) const = 0; - virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge) const = 0; + virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge, GlobalStatisticCollection * stats=nullptr) const = 0; virtual void clearGraphProgress() const = 0; virtual IStringVal & getAbortBy(IStringVal & str) const = 0; virtual unsigned __int64 getAbortTimeStamp() const = 0; @@ -1725,7 +1742,7 @@ extern WORKUNIT_API void updateWorkunitTimings(IWorkUnit * wu, ITimeReporter *ti extern WORKUNIT_API void updateWorkunitTimings(IWorkUnit * wu, StatisticScopeType scopeType, StatisticKind kind, ITimeReporter *timer); extern WORKUNIT_API void aggregateStatistic(StatsAggregation & result, IConstWorkUnit * wu, const WuScopeFilter & filter, StatisticKind search); extern WORKUNIT_API cost_type aggregateCost(const IConstWorkUnit * wu, const char *scope=nullptr, bool excludehThor=false); -extern WORKUNIT_API void updateAggregates(IWorkUnit *wu); +extern WORKUNIT_API void updateAggregates(IWorkUnit *wu, GlobalStatisticCollection & statsCollection); extern WORKUNIT_API const char *getTargetClusterComponentName(const char *clustname, const char *processType, StringBuffer &name); extern WORKUNIT_API void descheduleWorkunit(char const * wuid); #if 0 diff --git a/common/workunit/workunit.ipp b/common/workunit/workunit.ipp index 4f166ea67fb..85cfc396693 100644 --- a/common/workunit/workunit.ipp +++ b/common/workunit/workunit.ipp @@ -232,7 +232,7 @@ public: virtual void setGraphState(const char *graphName, unsigned wfid, WUGraphState state) const; virtual void setNodeState(const char *graphName, WUGraphIDType nodeId, WUGraphState state) const; virtual WUGraphState queryNodeState(const char *graphName, WUGraphIDType nodeId) const; - virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge) const override; + virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge, GlobalStatisticCollection * stats=nullptr) const override; void clearGraphProgress() const; virtual void import(IPropertyTree *wuTree, IPropertyTree *graphProgressTree) {}; //No GraphProgressTree in CLocalWorkUnit. @@ -661,7 +661,7 @@ public: class WORKUNIT_API CWuGraphStats : public CInterfaceOf { public: - CWuGraphStats(StatisticCreatorType _creatorType, const char * _creator, unsigned wfid, const char * _rootScope, unsigned _id, bool _merge); + CWuGraphStats(StatisticCreatorType _creatorType, const char * _creator, unsigned wfid, const char * _rootScope, unsigned _id, bool _merge, GlobalStatisticCollection * stats); virtual void beforeDispose(); virtual IStatisticGatherer & queryStatsBuilder(); protected: diff --git a/ecl/eclagent/eclagent.cpp b/ecl/eclagent/eclagent.cpp index 9a8afecd43f..961bd894fe8 100644 --- a/ecl/eclagent/eclagent.cpp +++ b/ecl/eclagent/eclagent.cpp @@ -1988,7 +1988,6 @@ void EclAgent::doProcess() const cost_type cost = aggregateCost(w, nullptr, false); if (cost) w->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTglobal, "", StCostExecute, NULL, cost, 1, 0, StatsMergeReplace); - updateAggregates(w); addTimings(w); switch (w->getState()) @@ -2534,7 +2533,6 @@ void EclAgentWorkflowMachine::noteTiming(unsigned wfid, timestamp_type startTime const cost_type cost = money2cost_type(calcCost(agent.queryAgentMachineCost(), nanoToMilli(elapsedNs))) + aggregateCost(wu, scope, true); if (cost) wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTworkflow, scope, StCostExecute, NULL, cost, 1, 0, StatsMergeReplace); - updateAggregates(wu); } void EclAgentWorkflowMachine::doExecutePersistItem(IRuntimeWorkflowItem & item) diff --git a/ecl/eclagent/eclagent.ipp b/ecl/eclagent/eclagent.ipp index 10c86cc6ebb..0cc843d9c18 100644 --- a/ecl/eclagent/eclagent.ipp +++ b/ecl/eclagent/eclagent.ipp @@ -1055,7 +1055,7 @@ public: void executeLibrary(const byte * parentExtract, IHThorGraphResults * results); IWUGraphStats *updateStats(StatisticCreatorType creatorType, const char * creator, unsigned wfid, unsigned subgraph); void updateWUStatistic(IWorkUnit* lockedwu, StatisticScopeType scopeType, const char* scope, StatisticKind kind, const char* descr, long long unsigned int value); - + void updateAggregates(IWorkUnit* lockedwu); EclSubGraph * idToGraph(unsigned id); EclGraphElement * idToActivity(unsigned id); const char *queryGraphName() { return graphName; } @@ -1083,6 +1083,7 @@ protected: IProbeManager * probeManager; unsigned wfid; bool aborted; + GlobalStatisticCollection statsCache; }; diff --git a/ecl/eclagent/eclgraph.cpp b/ecl/eclagent/eclgraph.cpp index eacfad7f992..8fc9117685f 100644 --- a/ecl/eclagent/eclgraph.cpp +++ b/ecl/eclagent/eclgraph.cpp @@ -879,10 +879,10 @@ void EclSubGraph::updateProgress() Owned progress = parent.updateStats(queryStatisticsComponentType(), queryStatisticsComponentName(), parent.queryWfid(), id); IStatisticGatherer & stats = progress->queryStatsBuilder(); updateProgress(stats); - if (startGraphTime || elapsedGraphCycles) { WorkunitUpdate lockedwu(agent->updateWorkUnit()); + parent.updateAggregates(lockedwu); StringBuffer subgraphid; subgraphid.append(parent.queryGraphName()).append(":").append(SubGraphScopePrefix).append(id); if (startGraphTime) @@ -1278,8 +1278,6 @@ void EclGraph::execute(const byte * parentExtract) const cost_type cost = money2cost_type(calcCost(agent->queryAgentMachineCost(), elapsed)); if (cost) wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTgraph, scope, StCostExecute, NULL, cost, 1, 0, StatsMergeReplace); - - updateAggregates(wu); } if (agent->queryRemoteWorkunit()) @@ -1491,7 +1489,12 @@ void GraphResults::setResult(unsigned id, IHThorGraphResult * result) IWUGraphStats *EclGraph::updateStats(StatisticCreatorType creatorType, const char * creator, unsigned activeWfid, unsigned subgraph) { - return wu->updateStats (queryGraphName(), creatorType, creator, activeWfid, subgraph, false); + return wu->updateStats (queryGraphName(), creatorType, creator, activeWfid, subgraph, false, &statsCache); +} + +void EclGraph::updateAggregates(IWorkUnit* lockedwu) +{ + ::updateAggregates(lockedwu, statsCache); } void EclGraph::updateWUStatistic(IWorkUnit *lockedwu, StatisticScopeType scopeType, const char * scope, StatisticKind kind, const char * descr, unsigned __int64 value) diff --git a/plugins/cassandra/cassandrawu.cpp b/plugins/cassandra/cassandrawu.cpp index 6315ad7800b..19c89b89df9 100644 --- a/plugins/cassandra/cassandrawu.cpp +++ b/plugins/cassandra/cassandrawu.cpp @@ -2743,8 +2743,8 @@ class CCassandraWorkUnit : public CPersistedWorkUnit class CCassandraWuGraphStats : public CWuGraphStats { public: - CCassandraWuGraphStats(const CCassandraWorkUnit *_parent, StatisticCreatorType _creatorType, const char * _creator, unsigned _wfid, const char * _rootScope, unsigned _id, bool _merge) - : CWuGraphStats(_creatorType, _creator, _wfid, _rootScope, _id, _merge), + CCassandraWuGraphStats(const CCassandraWorkUnit *_parent, StatisticCreatorType _creatorType, const char * _creator, unsigned _wfid, const char * _rootScope, unsigned _id, bool _merge, GlobalStatisticCollection * stats) + : CWuGraphStats(_creatorType, _creator, _wfid, _rootScope, _id, _merge, stats), progress(createPTree(_rootScope)), parent(_parent) { } @@ -2764,9 +2764,9 @@ class CCassandraWorkUnit : public CPersistedWorkUnit StringAttr wuid; }; - IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned wfid, unsigned subgraph, bool merge) const override + IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned wfid, unsigned subgraph, bool merge, GlobalStatisticCollection * stats) const override { - return new CCassandraWuGraphStats(this, creatorType, creator, wfid, graphName, subgraph, merge); + return new CCassandraWuGraphStats(this, creatorType, creator, wfid, graphName, subgraph, merge, stats); } diff --git a/system/jlib/jstats.cpp b/system/jlib/jstats.cpp index ec6ae7db3cb..866e1f87799 100644 --- a/system/jlib/jstats.cpp +++ b/system/jlib/jstats.cpp @@ -1813,10 +1813,11 @@ class CStatisticCollection : public CInterfaceOf } virtual StringBuffer & getFullScope(StringBuffer & str) const override { - if (parent && queryScopeType()!=SSTworkflow) + if (parent && queryScopeType()!=SSTglobal) { parent->getFullScope(str); - str.append(':'); + if (!str.isEmpty()) + str.append(':'); } id.getScopeText(str); return str; @@ -1904,7 +1905,7 @@ class CStatisticCollection : public CInterfaceOf stats.append(s); } - virtual void updateStatistic(StatisticKind kind, unsigned __int64 value, StatsMergeAction mergeAction) override + virtual bool updateStatistic(StatisticKind kind, unsigned __int64 value, StatsMergeAction mergeAction) override { if (mergeAction != StatsMergeAppend) { @@ -1913,13 +1914,19 @@ class CStatisticCollection : public CInterfaceOf Statistic & cur = stats.element(i); if (cur.kind == kind) { + if (mergeAction==StatsMergeReplace) + { + if (cur.value==value) + return false; + } cur.value = mergeStatisticValue(cur.value, value, mergeAction); - return; + return true; } } } Statistic s(kind, value); stats.append(s); + return true; } virtual IStatisticCollection * ensureSubScope(const StatsScopeId & search, bool hasChildren) override @@ -1934,7 +1941,27 @@ class CStatisticCollection : public CInterfaceOf return ret; } - virtual void serialize(MemoryBuffer & out) const + virtual IStatisticCollection * ensureSubScopePath(std::initializer_list path) override + { + IStatisticCollection * curScope = this; + for (auto it = path.begin(); it != path.end(); ++it) + curScope = curScope->ensureSubScope(*it, true); // n.b. this will always return a valid pointer + return curScope; + } + + virtual IStatisticCollection * querySubScopePath(std::initializer_list path) override + { + IStatisticCollection * curScope = this; + for (auto it = path.begin(); it != path.end(); ++it) + { + curScope = children.find(&(*it)); + if (!curScope) + return nullptr; + } + return curScope; + } + + virtual void serialize(MemoryBuffer & out) const override { out.append(getCollectionType()); id.serialize(out); @@ -1949,7 +1976,7 @@ class CStatisticCollection : public CInterfaceOf iter.query().serialize(out); } - virtual void deserialize(MemoryBuffer & in, unsigned version) override + virtual void deserialize(MemoryBuffer & in, unsigned version, unsigned statsMaxDepth=0) override { unsigned numStats; in.read(numStats); @@ -1957,22 +1984,28 @@ class CStatisticCollection : public CInterfaceOf while (numStats-- > 0) { Statistic next (in, version); - stats.append(next); + if (statsMaxDepth==0) stats.append(next); } - unsigned numChildren; in.read(numChildren); children.ensure(numChildren); + statsMaxDepth = (statsMaxDepth > 0) ? (statsMaxDepth - 1) : 0; while (numChildren-- > 0) { byte kind; in.read(kind); StatsScopeId childId; childId.deserialize(in, version); - IStatisticCollection * collection = ensureSubScope(childId, true); - collection->deserialize(in, version); + deserializeChild(childId, in, version, statsMaxDepth); } } + + virtual void deserializeChild(StatsScopeId childId, MemoryBuffer & in, unsigned version, unsigned statsMaxDepth=0) override + { + IStatisticCollection * childCollection = ensureSubScope(childId, true); + childCollection->deserialize(in, version, statsMaxDepth); + } + inline const StatsScopeId & queryScopeId() const { return id; } virtual void mergeInto(IStatisticGatherer & target) const @@ -2015,8 +2048,7 @@ class CStatisticCollection : public CInterfaceOf if (iteratorVec!=aggregateKinds.end()) { unsigned pos = iteratorVec-aggregateKinds.begin(); - StatsMergeAction mergeAction = queryMergeMode(kind); - totals[pos] = mergeStatisticValue(totals[pos], stat.queryValue(), mergeAction); + totals[pos] += stat.queryValue(); updated = true; } } @@ -2031,14 +2063,15 @@ class CStatisticCollection : public CInterfaceOf } if (updated) { + updated=false; std::vector::iterator totalIter = totals.begin(); std::vector::iterator subTotalIter = childTotals.begin(); std::vector::iterator kindIter = aggregateKinds.begin(); while (totalIter != totals.end()) { - StatsMergeAction mergeAction = queryMergeMode(*kindIter); - updateStatistic(*kindIter, *subTotalIter, mergeAction); - (*totalIter) = mergeStatisticValue(*totalIter, *subTotalIter, mergeAction); + if (updateStatistic(*kindIter, *subTotalIter, StatsMergeReplace)) + updated=true; + (*totalIter) += *subTotalIter; ++totalIter; ++subTotalIter; @@ -2048,7 +2081,18 @@ class CStatisticCollection : public CInterfaceOf } return updated; } - + virtual void clearStats() + { + stats.clear(true); + // Note: Children should NOT be deleted as all pointers return by ensureSubScope must still be valid + SuperHashIteratorOf iter(children, false); + for (iter.first(); iter.isValid(); iter.next()) + iter.query().clearStats(); + } + virtual void addChild(IStatisticCollection *stats) + { + children.add(*LINK(stats)); + } private: StatsScopeId id; IStatisticCollection * parent; @@ -2126,8 +2170,14 @@ bool CollectionHashTable::matchesElement(const void *et, const void *searchET) c class CRootStatisticCollection : public CStatisticCollection { public: - CRootStatisticCollection(StatisticCreatorType _creatorType, const char * _creator, const StatsScopeId & _id) - : CStatisticCollection(nullptr, _id), creatorType(_creatorType), creator(_creator) + CRootStatisticCollection(StatisticCreatorType _creatorType, const char * _creator, const StatsScopeId & rootScopeId, const StatsScopeId & graphScopeId, IStatisticCollection * sgCollection) + : CStatisticCollection(nullptr, rootScopeId), creatorType(_creatorType), creator(_creator) + { + whenCreated = getTimeStampNowValue(); + IStatisticCollection * child = ensureSubScope(graphScopeId, true); + child->addChild(sgCollection); + } + CRootStatisticCollection(StatisticCreatorType _creatorType, const char * _creator, const StatsScopeId & rootScopeId) : CStatisticCollection(nullptr, rootScopeId), creatorType(_creatorType), creator(_creator) { whenCreated = getTimeStampNowValue(); } @@ -2140,13 +2190,13 @@ class CRootStatisticCollection : public CStatisticCollection in.read(whenCreated); } - virtual byte getCollectionType() const { return SCroot; } + virtual byte getCollectionType() const override { return SCroot; } - virtual unsigned __int64 queryWhenCreated() const + virtual unsigned __int64 queryWhenCreated() const override { return whenCreated; } - virtual void serialize(MemoryBuffer & out) const + virtual void serialize(MemoryBuffer & out) const override { CStatisticCollection::serialize(out); out.append((byte)creatorType); @@ -2168,52 +2218,6 @@ class CRootStatisticCollection : public CStatisticCollection unsigned __int64 whenCreated; }; - -StatsScopeId globalScopeId(SSTglobal, (unsigned)0); -class GlobalStatisticCollection : public CStatisticCollection -{ -public: - GlobalStatisticCollection(IPropertyTree * root) : CStatisticCollection(nullptr, globalScopeId) - { - Owned iter = root->getElements("*"); - ForEach(*iter) - { - IPropertyTree * graphPT = &iter->query(); - - Owned iter2 = graphPT->getElements("./*"); - ForEach(*iter2) - { - IPropertyTree * sgPT = & iter2->query(); - const char * sgName = sgPT->queryName(); - if (strcmp(sgName, "node")==0) - continue; - assertex(strncmp(sgName, "sg", 2)==0); - MemoryBuffer compressed; - sgPT->getPropBin("Stats", compressed); - if (!compressed.length()) - return; - - MemoryBuffer serialized; - decompressToBuffer(serialized, compressed); - unsigned version; - serialized.read(version); - byte kind; - serialized.read(kind); - - StatsScopeId id; - id.deserialize(serialized, version); - IStatisticCollection * collection = ensureSubScope(id, true); - collection->deserialize(serialized, version); - } - } - } -}; - -IStatisticCollection * createGlobalStatisticCollection(IPropertyTree * root) -{ - return new GlobalStatisticCollection(root); -} - class StatAggregator : implements IStatisticVisitor { public: @@ -2289,6 +2293,16 @@ IStatisticCollection * createStatisticCollection(MemoryBuffer & in) return deserializeCollection(NULL, in, version); } +IStatisticCollection * createStatisticCollection(IStatisticCollection * parent, StatsScopeId scopeId) +{ + return new CStatisticCollection(parent, scopeId); +} + +IStatisticCollection * createRootStatisticCollection(StatisticCreatorType creatorType, const char * creator, const StatsScopeId & rootScope, const StatsScopeId & graphScopeId, IStatisticCollection * childCollection) +{ + //creator unused at the moment. + return new CRootStatisticCollection(creatorType, creator, rootScope, graphScopeId, childCollection); +} //-------------------------------------------------------------------------------------------------------------------- @@ -2360,9 +2374,12 @@ class StatisticGatherer : implements CInterfaceOf extern IStatisticGatherer * createStatisticsGatherer(StatisticCreatorType creatorType, const char * creator, const StatsScopeId & rootScope) { - //creator unused at the moment. - Owned rootCollection = new CRootStatisticCollection(creatorType, creator, rootScope); - return new StatisticGatherer(rootCollection); + return new StatisticGatherer(new CRootStatisticCollection(creatorType, creator, rootScope)); +} + +extern IStatisticGatherer * createStatisticsGatherer(IStatisticCollection * stats) +{ + return new StatisticGatherer(stats); } //-------------------------------------------------------------------------------------------------------------------- @@ -3926,9 +3943,9 @@ void StatisticsFilter::setScopeDepth(unsigned _scopeDepth) scopeFilter.setDepth(_scopeDepth); } -void StatisticsFilter::setScopeDepth(unsigned _minScopeDepth, unsigned _maxScopeDepth) +void StatisticsFilter::setScopeDepth(unsigned _maxDepthDepth, unsigned _maxScopeDepth) { - scopeFilter.setDepth(_minScopeDepth, _maxScopeDepth); + scopeFilter.setDepth(_maxDepthDepth, _maxScopeDepth); } void StatisticsFilter::setScope(const char * _scope) diff --git a/system/jlib/jstats.h b/system/jlib/jstats.h index 138a9e19dff..4958fe01949 100644 --- a/system/jlib/jstats.h +++ b/system/jlib/jstats.h @@ -136,10 +136,15 @@ interface IStatisticCollection : public IInterface virtual void visit(IStatisticVisitor & target) const = 0; virtual void visitChildren(IStatisticVisitor & target) const = 0; virtual IStatisticCollection * ensureSubScope(const StatsScopeId & search, bool hasChildren) = 0; + virtual IStatisticCollection * ensureSubScopePath(std::initializer_list path) = 0; + virtual IStatisticCollection * querySubScopePath(std::initializer_list path) = 0; virtual void addStatistic(StatisticKind kind, unsigned __int64 value) = 0; - virtual void updateStatistic(StatisticKind kind, unsigned __int64 value, StatsMergeAction mergeAction) = 0; + virtual bool updateStatistic(StatisticKind kind, unsigned __int64 value, StatsMergeAction mergeAction) = 0; virtual bool refreshAggregates(std::vector & aggregateKinds, std::vector & totals) = 0; - virtual void deserialize(MemoryBuffer & in, unsigned version) = 0; + virtual void deserialize(MemoryBuffer & in, unsigned version, unsigned statsMaxDepth) = 0; + virtual void deserializeChild(StatsScopeId scopeId, MemoryBuffer & in, unsigned version, unsigned statsMaxDepth) = 0; + virtual void clearStats() = 0; + virtual void addChild(IStatisticCollection *stats) = 0; }; interface IStatisticCollectionIterator : public IIteratorOf @@ -402,8 +407,8 @@ class jlib_decl StatisticsFilter : public CInterface void setCreator(const char * _creator); void setCreatorType(StatisticCreatorType _creatorType); void setFilter(const char * filter); - void setScopeDepth(unsigned _minScopeDepth); - void setScopeDepth(unsigned _minScopeDepth, unsigned _maxScopeDepth); + void setScopeDepth(unsigned _maxDepthDepth); + void setScopeDepth(unsigned _maxDepthDepth, unsigned _maxScopeDepth); void setScope(const char * _scope); void setScopeType(StatisticScopeType _scopeType); void setValueRange(unsigned __int64 minValue, unsigned __int64 _maxValue); @@ -888,9 +893,12 @@ extern jlib_decl StatisticCreatorType queryCreatorType(const char * sct, Statist extern jlib_decl StatisticScopeType queryScopeType(const char * sst, StatisticScopeType dft); extern jlib_decl IStatisticGatherer * createStatisticsGatherer(StatisticCreatorType creatorType, const char * creator, const StatsScopeId & rootScope); +extern jlib_decl IStatisticGatherer * createStatisticsGatherer(IStatisticCollection * stats); +extern jlib_decl IStatisticCollection * createRootStatisticCollection(StatisticCreatorType creatorType, const char * creator, const StatsScopeId & rootScope, const StatsScopeId & graphScope, IStatisticCollection * childCollection=nullptr); extern jlib_decl void serializeStatisticCollection(MemoryBuffer & out, IStatisticCollection * collection); extern jlib_decl IStatisticCollection * createStatisticCollection(MemoryBuffer & in); -extern jlib_decl IStatisticCollection * createGlobalStatisticCollection(IPropertyTree * root, std::vector & aggregateKinds); +extern jlib_decl IStatisticCollection * createStatisticCollection(IStatisticCollection * parent, StatsScopeId scopeId); +// extern jlib_decl IStatisticCollection * createGlobalStatisticCollection(IPropertyTree * root, std::vector & aggregateKinds); inline unsigned __int64 milliToNano(unsigned __int64 value) { return value * 1000000; } // call avoids need to upcast values inline unsigned __int64 nanoToMilli(unsigned __int64 value) { return value / 1000000; } diff --git a/thorlcr/master/thdemonserver.cpp b/thorlcr/master/thdemonserver.cpp index e96a4ae07c9..8579ba74684 100644 --- a/thorlcr/master/thdemonserver.cpp +++ b/thorlcr/master/thdemonserver.cpp @@ -43,6 +43,7 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer unsigned numberOfMachines = 0; cost_type costLimit = 0; cost_type workunitCost = 0; + GlobalStatisticCollection globalStatsCollection; void doReportGraph(IStatisticGatherer & stats, CGraphBase *graph) { @@ -146,7 +147,7 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer ForEachItemIn (g, activeGraphs) { CGraphBase &graph = activeGraphs.item(g); - Owned stats = currentWU.updateStats(graphName, SCTthor, queryStatisticsComponentName(), wfid, graph.queryGraphId(), false); + Owned stats = currentWU.updateStats(graphName, SCTthor, queryStatisticsComponentName(), wfid, graph.queryGraphId(), false, &globalStatsCollection); reportGraph(stats->queryStatsBuilder(), &graph); } Owned wu = ¤tWU.lock(); @@ -156,6 +157,7 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer unsigned startTime = graphStarts.item(g2); reportStatus(wu, graph, startTime, finished, success); } + ::updateAggregates(wu, globalStatsCollection); queryServerStatus().commitProperties(); } catch (IException *E) @@ -174,7 +176,7 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer const char *graphName = ((CJobMaster &)activeGraphs.item(0).queryJob()).queryGraphName(); unsigned wfid = graph->queryJob().getWfid(); { - Owned stats = currentWU.updateStats(graphName, SCTthor, queryStatisticsComponentName(), wfid, graph->queryGraphId(), false); + Owned stats = currentWU.updateStats(graphName, SCTthor, queryStatisticsComponentName(), wfid, graph->queryGraphId(), false, &globalStatsCollection); reportGraph(stats->queryStatsBuilder(), graph); } @@ -187,7 +189,6 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTsubgraph, graphScope, StWhenStarted, NULL, getTimeStampNowValue(), 1, 0, StatsMergeAppend); } reportStatus(wu, *graph, startTime, finished, success); - queryServerStatus().commitProperties(); } catch (IException *e) @@ -301,6 +302,17 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer reportActiveGraphs(true, false); activeGraphs.kill(); } + virtual void updateAggregates(IWorkUnit * lockedWu) override + { + ::updateAggregates(lockedWu, globalStatsCollection); + } + virtual void loadStats(const char *wuid) override + { + // load stats from GraphProgress + // - ignore stats below subgraph(statsMaxDepth=3) + // - ignore any scopes that have already been loaded into global collection) + globalStatsCollection.load(wuid, 3, true); + } }; diff --git a/thorlcr/master/thdemonserver.hpp b/thorlcr/master/thdemonserver.hpp index 32453c92764..585d7b18dfc 100644 --- a/thorlcr/master/thdemonserver.hpp +++ b/thorlcr/master/thdemonserver.hpp @@ -30,6 +30,8 @@ interface IDeMonServer : extends IInterface virtual void startGraph(CGraphBase *graph) = 0; virtual void endGraph(CGraphBase *graph, bool success) = 0; virtual void endGraphs() = 0; + virtual void updateAggregates(IWorkUnit * lockedWu) = 0; + virtual void loadStats(const char *wuid) = 0; }; diff --git a/thorlcr/master/thgraphmanager.cpp b/thorlcr/master/thgraphmanager.cpp index 5c114da2c55..5ffcfa4d5ca 100644 --- a/thorlcr/master/thgraphmanager.cpp +++ b/thorlcr/master/thgraphmanager.cpp @@ -1086,7 +1086,8 @@ bool CJobManager::executeGraph(IConstWorkUnit &workunit, const char *graphName, } setWuid(workunit.queryWuid(), workunit.queryClusterName()); - + if (globals->getPropBool("@watchdogProgressEnabled")) + queryDeMonServer()->loadStats(workunit.queryWuid()); allDone = job->go(); Owned wu = &workunit.lock(); @@ -1101,7 +1102,8 @@ bool CJobManager::executeGraph(IConstWorkUnit &workunit, const char *graphName, cost_type cost = money2cost_type(calculateThorCost(nanoToMilli(graphTimeNs), numberOfMachines)); if (cost) wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTgraph, graphScope, StCostExecute, NULL, cost, 1, 0, StatsMergeReplace); - updateAggregates(wu); + if (globals->getPropBool("@watchdogProgressEnabled")) + queryDeMonServer()->updateAggregates(wu); removeJob(*job); } catch (IException *e)