From 49fde9e4d7393d9f0a44481d5c3dc24fe33cbf75 Mon Sep 17 00:00:00 2001 From: Shamser Ahmed Date: Tue, 24 Oct 2023 23:07:38 +0100 Subject: [PATCH] HPCC-29657 Changes following review Signed-off-by: Shamser Ahmed --- common/workunit/workunit.cpp | 59 +++++++++++++++++-------------- common/workunit/workunit.hpp | 8 ++--- common/workunit/workunit.ipp | 4 +-- ecl/eclagent/eclagent.ipp | 2 +- ecl/eclagent/eclgraph.cpp | 7 ++-- plugins/cassandra/cassandrawu.cpp | 4 +-- system/jlib/jstats.cpp | 42 +++++++++++++--------- system/jlib/jstats.h | 12 ++++--- thorlcr/master/thdemonserver.cpp | 15 +++++--- thorlcr/master/thdemonserver.hpp | 2 +- thorlcr/master/thgraphmanager.cpp | 2 +- 11 files changed, 92 insertions(+), 65 deletions(-) diff --git a/common/workunit/workunit.cpp b/common/workunit/workunit.cpp index cb71ef35352..9b64f4c9f20 100644 --- a/common/workunit/workunit.cpp +++ b/common/workunit/workunit.cpp @@ -183,18 +183,15 @@ void doDescheduleWorkkunit(char const * wuid) * Graph progress support */ -CWuGraphStats::CWuGraphStats(StatisticCreatorType _creatorType, const char * _creator, unsigned wfid, const char * _rootScope, unsigned _id, bool _merge, GlobalStatisticCollection * globalStatsCollection) +CWuGraphStats::CWuGraphStats(StatisticCreatorType _creatorType, const char * _creator, unsigned wfid, const char * _rootScope, unsigned _id, bool _merge, IStatisticCollection * statsCollection) : creatorType(_creatorType), creator(_creator), id(_id), merge(_merge) { StatsScopeId graphScopeId; verifyex(graphScopeId.setScopeText(_rootScope)); StatsScopeId wfScopeId(SSTworkflow,wfid); - if (globalStatsCollection) - { - StatsScopeId sgScopeId(SSTsubgraph, id); - collector.setown(createStatisticsGatherer(globalStatsCollection->getCollectionForUpdate(wfScopeId, graphScopeId, sgScopeId, _creatorType, _creator, true))); - } + if (statsCollection) + collector.setown(createStatisticsGatherer(statsCollection)); else collector.setown(createStatisticsGatherer(_creatorType, _creator, wfScopeId)); collector->beginScope(graphScopeId); @@ -2672,7 +2669,7 @@ GlobalStatisticCollection::GlobalStatisticCollection() : aggregateKindsMapping(a statsCollection.setown(createStatisticCollection(nullptr, globalScopeId)); } -void GlobalStatisticCollection::load(IConstWorkUnit &workunit, const char * graphName, bool aggregatesOnly, bool missingScopesOnly) +void GlobalStatisticCollection::load(IConstWorkUnit &workunit, const char * graphName, bool aggregatesOnly) { const char * _wuid = workunit.queryWuid(); if (!streq(_wuid, wuid.str())) // New statsCollection if collection for different workunit @@ -2702,14 +2699,6 @@ void GlobalStatisticCollection::load(IConstWorkUnit &workunit, const char * grap if (strcmp(sgName, "node")==0) continue; verifyex(sgScopeId.setScopeText(sgName)); - - if (missingScopesOnly) // Skip scopes that already in collection - { - IStatisticCollection * sgCollection = statsCollection->querySubScopePath({wfScopeId, graphScopeId, sgScopeId}); - if (sgCollection) - continue; - } - MemoryBuffer compressed; sgPT->getPropBin("Stats", compressed); if (!compressed.length()) @@ -2728,7 +2717,7 @@ void GlobalStatisticCollection::load(IConstWorkUnit &workunit, const char * grap if (aggregatesOnly) { // Only store stats for subgraph level - statsMinDepth = 3; + statsMinDepth = 3; // this is subgraph level statsMaxDepth = 3; } statsCollection->deserializeChild(childId, serialized, version, statsMinDepth, statsMaxDepth); @@ -2755,11 +2744,11 @@ void GlobalStatisticCollection::loadGlobalAggregates(IConstWorkUnit &workunit) }; WuScopeFilter filter; - filter.addScopeType(SSTglobal).addScopeType(SSTgraph).addScopeType(SSTsubgraph); + filter.addScopeType(SSTglobal).addScopeType(SSTworkflow).addScopeType(SSTgraph); const unsigned numStats = aggregateKindsMapping.numStatistics(); for (unsigned i=0; iensureSubScopePath({wfScopeId,graphScopeId, sgScopeId}, &wasCreated); - if (clearStats && wasCreated) + + StatsScopeId graphScopeId; + verifyex(graphScopeId.setScopeText(graphName)); + StatsScopeId wfScopeId(SSTworkflow, wfid); + StatsScopeId sgScopeId(SSTsubgraph, sgId); + + IStatisticCollection * sgScopeCollection = statsCollection->ensureSubScopePath({wfScopeId,graphScopeId, sgScopeId}, wasCreated); + if (clearStats && !wasCreated) sgScopeCollection->clearStats(); sgScopeCollection->markDirty(); return createRootStatisticCollection(creatorType, creator, wfScopeId, graphScopeId, sgScopeCollection); @@ -2834,6 +2829,18 @@ void GlobalStatisticCollection::updateAggregates(IWorkUnit *wu) } } +// Prune all subgraph descendent stats (leaving subgraph stats for future aggregation) +void GlobalStatisticCollection::pruneSubGraphDescendants(unsigned wfid, const char *graphName, unsigned sgId) +{ + StatsScopeId graphScopeId; + verifyex(graphScopeId.setScopeText(graphName)); + StatsScopeId wfScopeId(SSTworkflow, wfid); + StatsScopeId sgScopeId(SSTsubgraph, sgId); + IStatisticCollection * sgScopeCollection = statsCollection->querySubScopePath({wfScopeId,graphScopeId, sgScopeId}); + if (sgScopeCollection) + sgScopeCollection->pruneChildStats(); +} + //--------------------------------------------------------------------------------------------------------------------- @@ -3903,7 +3910,7 @@ class CDaliWorkUnit; class CDaliWuGraphStats : public CWuGraphStats { public: - CDaliWuGraphStats(const CDaliWorkUnit* _owner, StatisticCreatorType _creatorType, const char * _creator, unsigned _wfid, const char * _rootScope, unsigned _id, bool _merge, GlobalStatisticCollection * stats) + CDaliWuGraphStats(const CDaliWorkUnit* _owner, StatisticCreatorType _creatorType, const char * _creator, unsigned _wfid, const char * _rootScope, unsigned _id, bool _merge, IStatisticCollection * stats) : CWuGraphStats(_creatorType, _creator, _wfid, _rootScope, _id, _merge, stats), owner(_owner), graphName(_rootScope), wfid(_wfid) { } @@ -3918,7 +3925,7 @@ class CDaliWuGraphStats : public CWuGraphStats class CLocalWuGraphStats : public CWuGraphStats { public: - CLocalWuGraphStats(IPropertyTree *_p, StatisticCreatorType _creatorType, const char * _creator, unsigned _wfid, const char * _rootScope, unsigned _id, bool _merge, GlobalStatisticCollection * stats) + CLocalWuGraphStats(IPropertyTree *_p, StatisticCreatorType _creatorType, const char * _creator, unsigned _wfid, const char * _rootScope, unsigned _id, bool _merge, IStatisticCollection * stats) : CWuGraphStats(_creatorType, _creator, _wfid, _rootScope, _id, _merge, stats), graphName(_rootScope), p(_p) { } @@ -4223,7 +4230,7 @@ class CDaliWorkUnit : public CPersistedWorkUnit } } } - virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge, GlobalStatisticCollection * stats) const override + virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge, IStatisticCollection * stats) const override { return new CDaliWuGraphStats(this, creatorType, creator, _wfid, graphName, subgraph, merge, stats); } @@ -4535,7 +4542,7 @@ class CLockedWorkUnit : implements ILocalWorkUnit, implements IExtendedWUInterfa { c->setGraphState(graphName, wfid, state); } virtual void setNodeState(const char *graphName, WUGraphIDType nodeId, WUGraphState state) const { c->setNodeState(graphName, nodeId, state); } - virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge, GlobalStatisticCollection * stats=nullptr) const override + virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge, IStatisticCollection * stats=nullptr) const override { return c->updateStats(graphName, creatorType, creator, _wfid, subgraph, merge, stats); } virtual void clearGraphProgress() const { c->clearGraphProgress(); } @@ -10366,7 +10373,7 @@ void CLocalWorkUnit::setNodeState(const char *graphName, WUGraphIDType nodeId, W { throwUnexpected(); // Should only be used for persisted workunits } -IWUGraphStats *CLocalWorkUnit::updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge, GlobalStatisticCollection * stats) const +IWUGraphStats *CLocalWorkUnit::updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge, IStatisticCollection * stats) const { return new CLocalWuGraphStats(LINK(p), creatorType, creator, _wfid, graphName, subgraph, merge, stats); } diff --git a/common/workunit/workunit.hpp b/common/workunit/workunit.hpp index de3c215a256..544478d1b72 100644 --- a/common/workunit/workunit.hpp +++ b/common/workunit/workunit.hpp @@ -1203,7 +1203,6 @@ interface IConstWorkUnitInfo : extends IInterface virtual IConstWUAppValueIterator & getApplicationValues() const = 0; }; -class GlobalStatisticCollection; interface IConstWorkUnit : extends IConstWorkUnitInfo { virtual bool aborting() const = 0; @@ -1302,7 +1301,7 @@ interface IConstWorkUnit : extends IConstWorkUnitInfo virtual WUGraphState queryNodeState(const char *graphName, WUGraphIDType nodeId) const = 0; virtual void setGraphState(const char *graphName, unsigned wfid, WUGraphState state) const = 0; virtual void setNodeState(const char *graphName, WUGraphIDType nodeId, WUGraphState state) const = 0; - virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge, GlobalStatisticCollection * stats=nullptr) const = 0; + virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge, IStatisticCollection * stats=nullptr) const = 0; virtual void clearGraphProgress() const = 0; virtual IStringVal & getAbortBy(IStringVal & str) const = 0; virtual unsigned __int64 getAbortTimeStamp() const = 0; @@ -1789,12 +1788,13 @@ class WORKUNIT_API GlobalStatisticCollection : public CInterface public: GlobalStatisticCollection(); - void load(IConstWorkUnit &workunit, const char * graphName, bool aggregatesOnly, bool missingScopesOnly); + void load(IConstWorkUnit &workunit, const char * graphName, bool aggregatesOnly); void loadGlobalAggregates(IConstWorkUnit &workunit); - IStatisticCollection * getCollectionForUpdate(const StatsScopeId & wfScopeId, const StatsScopeId & graphScopeId, const StatsScopeId & sgScopeId, StatisticCreatorType creatorType, const char * creator, bool clearStats); + IStatisticCollection * getCollectionForUpdate(StatisticCreatorType creatorType, const char * creator, unsigned wfid, const char *graphName, unsigned sgId, bool clearStats); bool refreshAggregates(); IStatisticCollection * queryCollection() { return statsCollection; } void updateAggregates(IWorkUnit *wu); + void pruneSubGraphDescendants(unsigned wfid, const char *graphName, unsigned sgId); private: Owned statsCollection; const StatisticsMapping & aggregateKindsMapping; diff --git a/common/workunit/workunit.ipp b/common/workunit/workunit.ipp index 85cfc396693..46ea428c84a 100644 --- a/common/workunit/workunit.ipp +++ b/common/workunit/workunit.ipp @@ -232,7 +232,7 @@ public: virtual void setGraphState(const char *graphName, unsigned wfid, WUGraphState state) const; virtual void setNodeState(const char *graphName, WUGraphIDType nodeId, WUGraphState state) const; virtual WUGraphState queryNodeState(const char *graphName, WUGraphIDType nodeId) const; - virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge, GlobalStatisticCollection * stats=nullptr) const override; + virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge, IStatisticCollection * stats=nullptr) const override; void clearGraphProgress() const; virtual void import(IPropertyTree *wuTree, IPropertyTree *graphProgressTree) {}; //No GraphProgressTree in CLocalWorkUnit. @@ -661,7 +661,7 @@ public: class WORKUNIT_API CWuGraphStats : public CInterfaceOf { public: - CWuGraphStats(StatisticCreatorType _creatorType, const char * _creator, unsigned wfid, const char * _rootScope, unsigned _id, bool _merge, GlobalStatisticCollection * stats); + CWuGraphStats(StatisticCreatorType _creatorType, const char * _creator, unsigned wfid, const char * _rootScope, unsigned _id, bool _merge, IStatisticCollection * stats); virtual void beforeDispose(); virtual IStatisticGatherer & queryStatsBuilder(); protected: diff --git a/ecl/eclagent/eclagent.ipp b/ecl/eclagent/eclagent.ipp index ae2220d37e5..20b73dced30 100644 --- a/ecl/eclagent/eclagent.ipp +++ b/ecl/eclagent/eclagent.ipp @@ -1048,7 +1048,7 @@ public: graphAgentContext.set(&_agent); agent = &graphAgentContext; aborted = false; - globalStats.load(*wu, graphName, true, true); + globalStats.load(*wu, graphName, true); } void createFromXGMML(ILoadedDllEntry * dll, IPropertyTree * xgmml); diff --git a/ecl/eclagent/eclgraph.cpp b/ecl/eclagent/eclgraph.cpp index a51517a4c10..ced91432092 100644 --- a/ecl/eclagent/eclgraph.cpp +++ b/ecl/eclagent/eclgraph.cpp @@ -1346,7 +1346,9 @@ void EclGraph::updateLibraryProgress() { EclSubGraph & cur = graphs.item(idx); unsigned wfid = cur.parent.queryWfid(); - Owned progress = wu->updateStats(queryGraphName(), queryStatisticsComponentType(), queryStatisticsComponentName(), wfid, cur.id, false, &globalStats); + + Owned sgCollection = globalStats.getCollectionForUpdate(queryStatisticsComponentType(), queryStatisticsComponentName(), wfid, queryGraphName(), cur.id, true); // true=>clear existing stats + Owned progress = wu->updateStats(queryGraphName(), queryStatisticsComponentType(), queryStatisticsComponentName(), wfid, cur.id, false, sgCollection.getClear()); cur.updateProgress(progress->queryStatsBuilder()); } } @@ -1489,7 +1491,8 @@ void GraphResults::setResult(unsigned id, IHThorGraphResult * result) IWUGraphStats *EclGraph::updateStats(StatisticCreatorType creatorType, const char * creator, unsigned activeWfid, unsigned subgraph) { - return wu->updateStats (queryGraphName(), creatorType, creator, activeWfid, subgraph, false, &globalStats); + Owned sgCollection = globalStats.getCollectionForUpdate(creatorType, creator, activeWfid, queryGraphName(), subgraph, true); // true=>clear existing stats + return wu->updateStats (queryGraphName(), creatorType, creator, activeWfid, subgraph, false, sgCollection.getClear()); } void EclGraph::updateAggregates(IWorkUnit* lockedwu) diff --git a/plugins/cassandra/cassandrawu.cpp b/plugins/cassandra/cassandrawu.cpp index 19c89b89df9..94f8439e7ad 100644 --- a/plugins/cassandra/cassandrawu.cpp +++ b/plugins/cassandra/cassandrawu.cpp @@ -2743,7 +2743,7 @@ class CCassandraWorkUnit : public CPersistedWorkUnit class CCassandraWuGraphStats : public CWuGraphStats { public: - CCassandraWuGraphStats(const CCassandraWorkUnit *_parent, StatisticCreatorType _creatorType, const char * _creator, unsigned _wfid, const char * _rootScope, unsigned _id, bool _merge, GlobalStatisticCollection * stats) + CCassandraWuGraphStats(const CCassandraWorkUnit *_parent, StatisticCreatorType _creatorType, const char * _creator, unsigned _wfid, const char * _rootScope, unsigned _id, bool _merge, IStatisticCollection * stats) : CWuGraphStats(_creatorType, _creator, _wfid, _rootScope, _id, _merge, stats), progress(createPTree(_rootScope)), parent(_parent) { @@ -2764,7 +2764,7 @@ class CCassandraWorkUnit : public CPersistedWorkUnit StringAttr wuid; }; - IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned wfid, unsigned subgraph, bool merge, GlobalStatisticCollection * stats) const override + IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned wfid, unsigned subgraph, bool merge, IStatisticCollection * stats) const override { return new CCassandraWuGraphStats(this, creatorType, creator, wfid, graphName, subgraph, merge, stats); } diff --git a/system/jlib/jstats.cpp b/system/jlib/jstats.cpp index 2390763b677..73378f24132 100644 --- a/system/jlib/jstats.cpp +++ b/system/jlib/jstats.cpp @@ -1974,43 +1974,52 @@ class CStatisticCollection : public CInterfaceOf virtual IStatisticCollection * ensureSubScope(const StatsScopeId & search, bool hasChildren) { bool wasCreated; - return ensureSubScope(search, hasChildren, &wasCreated); + return ensureSubScope(search, hasChildren, wasCreated); } - virtual IStatisticCollection * ensureSubScope(const StatsScopeId & search, bool hasChildren, bool * wasCreated) override + virtual IStatisticCollection * ensureSubScope(const StatsScopeId & search, bool hasChildren, bool & wasCreated) override { //Once the CStatisticCollection is created it should not be replaced - so that returned pointers remain valid. - *wasCreated = false; + wasCreated = false; IStatisticCollection * match = children.find(&search); if (match) return match; IStatisticCollection * ret = new CStatisticCollection(this, search); children.add(*ret); - *wasCreated=true; + wasCreated = true; return ret; } + virtual IStatisticCollection * querySubScope(const StatsScopeId & search) override + { + return children.find(&search); + } - virtual IStatisticCollection * ensureSubScopePath(std::initializer_list path, bool * wasCreated) override + virtual IStatisticCollection * ensureSubScopePath(std::initializer_list path, bool & wasCreated) override { IStatisticCollection * curScope = this; - // interested only in the 'wasCreated' for the deepest scope (i.e. the only updated last) - for (auto it = path.begin(); it != path.end(); ++it) - curScope = curScope->ensureSubScope(*it, true, wasCreated); // n.b. this will always return a valid pointer + for (const auto scopeItem: path) + curScope = curScope->ensureSubScope(scopeItem, true, wasCreated); // n.b. this will always return a valid pointer return curScope; } - virtual IStatisticCollection * querySubScopePath(std::initializer_list path) override + virtual IStatisticCollection * querySubScopePath(std::initializer_list path) override { IStatisticCollection * curScope = this; - for (auto it = path.begin(); it != path.end(); ++it) + for (const auto scopeItem: path) { - curScope = children.find(&(*it)); + curScope = curScope->querySubScope(scopeItem); if (!curScope) - return nullptr; + break; } return curScope; } + // Note, once this is called child scope pointers will be invalid + virtual void pruneChildStats() override + { + children.kill(); + } + virtual void serialize(MemoryBuffer & out) const override { out.append(getCollectionType()); @@ -2050,7 +2059,7 @@ class CStatisticCollection : public CInterfaceOf } } - virtual void deserializeChild(const StatsScopeId childId, MemoryBuffer & in, unsigned version, int minDepth, int maxDepth) override + virtual void deserializeChild(const StatsScopeId & childId, MemoryBuffer & in, unsigned version, int minDepth, int maxDepth) override { if (maxDepth > 0) { @@ -2183,7 +2192,8 @@ StringBuffer &CStatisticCollection::toXML(StringBuffer &out) const SuperHashIteratorOf iter(children, false); for (iter.first(); iter.isValid(); iter.next()) iter.query().toXML(out); - out.append("\n"); + out.append(" // Scope id=\""); + id.getScopeText(out).append("\"\n"); return out; } @@ -2360,7 +2370,7 @@ IStatisticCollection * createStatisticCollection(MemoryBuffer & in) return deserializeCollection(NULL, in, version); } -IStatisticCollection * createStatisticCollection(IStatisticCollection * parent, StatsScopeId scopeId) +IStatisticCollection * createStatisticCollection(IStatisticCollection * parent, const StatsScopeId & scopeId) { return new CStatisticCollection(parent, scopeId); } @@ -2792,7 +2802,7 @@ StringBuffer & CRuntimeStatisticCollection::toStr(StringBuffer &str) const unsigned __int64 rawValue = getStatisticValue(rawKind); if (rawValue) value += convertMeasure(rawKind, kind, rawValue); - } + } if (value) { const char * name = queryStatisticName(serialKind); diff --git a/system/jlib/jstats.h b/system/jlib/jstats.h index 4a28632cb50..edf369ffcaf 100644 --- a/system/jlib/jstats.h +++ b/system/jlib/jstats.h @@ -140,14 +140,16 @@ interface IStatisticCollection : public IInterface virtual void visit(IStatisticVisitor & target) const = 0; virtual void visitChildren(IStatisticVisitor & target) const = 0; virtual IStatisticCollection * ensureSubScope(const StatsScopeId & search, bool hasChildren) = 0; - virtual IStatisticCollection * ensureSubScope(const StatsScopeId & search, bool hasChildren, bool * wasCreated) = 0; - virtual IStatisticCollection * ensureSubScopePath(std::initializer_list path, bool * wasCreated) = 0; - virtual IStatisticCollection * querySubScopePath(std::initializer_list path) = 0; + virtual IStatisticCollection * ensureSubScope(const StatsScopeId & search, bool hasChildren, bool & wasCreated) = 0; + virtual IStatisticCollection * querySubScope(const StatsScopeId & search) = 0; + virtual IStatisticCollection * ensureSubScopePath(std::initializer_list path, bool & wasCreated) = 0; + virtual IStatisticCollection * querySubScopePath(std::initializer_list path) = 0; + virtual void pruneChildStats() = 0; virtual void addStatistic(StatisticKind kind, unsigned __int64 value) = 0; virtual bool updateStatistic(StatisticKind kind, unsigned __int64 value, StatsMergeAction mergeAction) = 0; virtual bool refreshAggregates(CRuntimeStatisticCollection & totals, IBitSet & isTotalUpdated) = 0; virtual void deserialize(MemoryBuffer & in, unsigned version, int minDepth, int maxDepth) = 0; - virtual void deserializeChild(const StatsScopeId scopeId, MemoryBuffer & in, unsigned version, int minDepth, int maxDepth) = 0; + virtual void deserializeChild(const StatsScopeId & scopeId, MemoryBuffer & in, unsigned version, int minDepth, int maxDepth) = 0; virtual void clearStats() = 0; virtual void addChild(IStatisticCollection *stats) = 0; virtual void markDirty() = 0; @@ -905,7 +907,7 @@ extern jlib_decl IStatisticGatherer * createStatisticsGatherer(IStatisticCollect extern jlib_decl IStatisticCollection * createRootStatisticCollection(StatisticCreatorType creatorType, const char * creator, const StatsScopeId & rootScope, const StatsScopeId & graphScope, IStatisticCollection * childCollection=nullptr); extern jlib_decl void serializeStatisticCollection(MemoryBuffer & out, IStatisticCollection * collection); extern jlib_decl IStatisticCollection * createStatisticCollection(MemoryBuffer & in); -extern jlib_decl IStatisticCollection * createStatisticCollection(IStatisticCollection * parent, StatsScopeId scopeId); +extern jlib_decl IStatisticCollection * createStatisticCollection(IStatisticCollection * parent, const StatsScopeId & scopeId); inline unsigned __int64 milliToNano(unsigned __int64 value) { return value * 1000000; } // call avoids need to upcast values inline unsigned __int64 nanoToMilli(unsigned __int64 value) { return value / 1000000; } diff --git a/thorlcr/master/thdemonserver.cpp b/thorlcr/master/thdemonserver.cpp index 9d034f8061a..0c2b12ddb4c 100644 --- a/thorlcr/master/thdemonserver.cpp +++ b/thorlcr/master/thdemonserver.cpp @@ -143,7 +143,8 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer ForEachItemIn (g, activeGraphs) { CGraphBase &graph = activeGraphs.item(g); - Owned stats = currentWU.updateStats(graphName, SCTthor, queryStatisticsComponentName(), wfid, graph.queryGraphId(), false, &globalStatsCollection); + Owned sgCollection = globalStatsCollection.getCollectionForUpdate(SCTthor, queryStatisticsComponentName(), wfid, graphName, graph.queryGraphId(), true); // true=>clear existing stats + Owned stats = currentWU.updateStats(graphName, SCTthor, queryStatisticsComponentName(), wfid, graph.queryGraphId(), false, sgCollection.getClear()); reportGraph(stats->queryStatsBuilder(), &graph); } Owned wu = ¤tWU.lock(); @@ -152,6 +153,8 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer CGraphBase &graph = activeGraphs.item(g2); unsigned startTime = graphStarts.item(g2); reportStatus(wu, graph, startTime, finished, success); + // Prune subgraph descendant stats as they have been serialized and no longer needed. + globalStatsCollection.pruneSubGraphDescendants(wfid, graphName, graph.queryGraphId()); } updateAggregates(wu); queryServerStatus().commitProperties(); @@ -172,10 +175,12 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer const char *graphName = ((CJobMaster &)activeGraphs.item(0).queryJob()).queryGraphName(); unsigned wfid = graph->queryJob().getWfid(); { - Owned stats = currentWU.updateStats(graphName, SCTthor, queryStatisticsComponentName(), wfid, graph->queryGraphId(), false, &globalStatsCollection); + Owned sgCollection = globalStatsCollection.getCollectionForUpdate(SCTthor, queryStatisticsComponentName(), wfid, graphName, graph->queryGraphId(), true); // true=>clear existing stats + Owned stats = currentWU.updateStats(graphName, SCTthor, queryStatisticsComponentName(), wfid, graph->queryGraphId(), false, sgCollection.getClear()); reportGraph(stats->queryStatsBuilder(), graph); } - + // Prune subgraph descendant stats as they have been serialized and no longer needed. + globalStatsCollection.pruneSubGraphDescendants(wfid, graphName, graph->queryGraphId()); Owned wu = ¤tWU.lock(); if (startTimeStamp) { @@ -302,9 +307,9 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer { globalStatsCollection.updateAggregates(lockedWu); } - virtual void loadStats(IConstWorkUnit &workunit, const char * graphName, bool aggregatesOnly, bool missingScopesOnly) override + virtual void loadStats(IConstWorkUnit &workunit, const char * graphName, bool aggregatesOnly) override { - globalStatsCollection.load(workunit, graphName, aggregatesOnly, missingScopesOnly); + globalStatsCollection.load(workunit, graphName, aggregatesOnly); } }; diff --git a/thorlcr/master/thdemonserver.hpp b/thorlcr/master/thdemonserver.hpp index 69cd8c8fa14..70cfa5a97b7 100644 --- a/thorlcr/master/thdemonserver.hpp +++ b/thorlcr/master/thdemonserver.hpp @@ -32,7 +32,7 @@ interface IDeMonServer : extends IInterface virtual void endGraph(CGraphBase *graph, bool success) = 0; virtual void endGraphs() = 0; virtual void updateAggregates(IWorkUnit * lockedWu) = 0; - virtual void loadStats(IConstWorkUnit &workunit, const char * graphName, bool aggregatesOnly, bool missingScopesOnly) = 0; + virtual void loadStats(IConstWorkUnit &workunit, const char * graphName, bool aggregatesOnly) = 0; }; diff --git a/thorlcr/master/thgraphmanager.cpp b/thorlcr/master/thgraphmanager.cpp index 689f434a477..11db5322847 100644 --- a/thorlcr/master/thgraphmanager.cpp +++ b/thorlcr/master/thgraphmanager.cpp @@ -1118,7 +1118,7 @@ bool CJobManager::executeGraph(IConstWorkUnit &workunit, const char *graphName, podInfo.report(wu); } if (globals->getPropBool("@watchdogProgressEnabled")) - queryDeMonServer()->loadStats(workunit, graphName, true, true); + queryDeMonServer()->loadStats(workunit, graphName, true); setWuid(workunit.queryWuid(), workunit.queryClusterName());