From b5c754bad6f9cb661bec3f8d1c4041403f6d3a17 Mon Sep 17 00:00:00 2001 From: Shamser Ahmed Date: Thu, 28 Sep 2023 14:54:46 +0100 Subject: [PATCH] HPCC-29657 Produce aggregate stats Signed-off-by: Shamser Ahmed --- common/workunit/workunit.cpp | 241 +++++++++++++++++++++--------- common/workunit/workunit.hpp | 23 ++- common/workunit/workunit.ipp | 4 +- ecl/eclagent/eclagent.cpp | 2 - ecl/eclagent/eclagent.ipp | 4 +- ecl/eclagent/eclgraph.cpp | 13 +- plugins/cassandra/cassandrawu.cpp | 8 +- system/jlib/jstats.cpp | 218 ++++++++++++++++++--------- system/jlib/jstats.h | 20 ++- thorlcr/master/thdemonserver.cpp | 15 +- thorlcr/master/thdemonserver.hpp | 3 + thorlcr/master/thgraphmanager.cpp | 6 +- vcpkg | 2 +- 13 files changed, 389 insertions(+), 170 deletions(-) diff --git a/common/workunit/workunit.cpp b/common/workunit/workunit.cpp index 1c79fdbe821..4cfbcee342f 100644 --- a/common/workunit/workunit.cpp +++ b/common/workunit/workunit.cpp @@ -183,14 +183,20 @@ void doDescheduleWorkkunit(char const * wuid) * Graph progress support */ -CWuGraphStats::CWuGraphStats(StatisticCreatorType _creatorType, const char * _creator, unsigned wfid, const char * _rootScope, unsigned _id, bool _merge) +CWuGraphStats::CWuGraphStats(StatisticCreatorType _creatorType, const char * _creator, unsigned wfid, const char * _rootScope, unsigned _id, bool _merge, GlobalStatisticCollection * globalStatsCollection) : creatorType(_creatorType), creator(_creator), id(_id), merge(_merge) { StatsScopeId graphScopeId; verifyex(graphScopeId.setScopeText(_rootScope)); + StatsScopeId wfScopeId(SSTworkflow,wfid); - StatsScopeId rootScopeId(SSTworkflow,wfid); - collector.setown(createStatisticsGatherer(_creatorType, _creator, rootScopeId)); + if (globalStatsCollection) + { + StatsScopeId sgScopeId(SSTsubgraph, id); + collector.setown(createStatisticsGatherer(globalStatsCollection->getCollectionForUpdate(wfScopeId, graphScopeId, sgScopeId, _creatorType, _creator, true))); + } + else + collector.setown(createStatisticsGatherer(_creatorType, _creator, wfScopeId)); collector->beginScope(graphScopeId); } @@ -2657,77 +2663,174 @@ cost_type aggregateCost(const IConstWorkUnit * wu, const char *scope, bool exclu } return totalCost; } -} +}; +GlobalStatisticCollection::GlobalStatisticCollection() +{ + StatsScopeId globalScopeId(SSTglobal, (unsigned)0); + statsCollection.setown(createStatisticCollection(nullptr, globalScopeId)); +} -class StatisticsAggregatesWriter : implements IStatisticVisitor +void GlobalStatisticCollection::load(IConstWorkUnit &workunit, const char * graphName, bool aggregatesOnly, bool missingScopesOnly) { - std::vector & aggregateKinds; - Linked wu; -public: - StatisticsAggregatesWriter(IWorkUnit * _wu, std::vector & _aggregateKinds): wu(_wu), aggregateKinds(_aggregateKinds) {} + const char * _wuid = workunit.queryWuid(); + if (strcmp(_wuid, wuid.str())!=0) // Make sure stats collection is for this workunit + { + if (!wuid.isEmpty()) + { + StatsScopeId globalScopeId(SSTglobal, (unsigned)0); + statsCollection.setown(createStatisticCollection(nullptr, globalScopeId)); + } + wuid.set(_wuid); + } - virtual bool visitScope(const IStatisticCollection & cur) + loadGlobalAggregates(workunit); + Owned root = getWUGraphProgress(wuid, true); + if (root) { - switch (cur.queryScopeType()) + Owned graphPT = root->getPropTree(graphName); + if (!graphPT) + return; + + StatsScopeId wfScopeId(SSTworkflow, graphPT->getPropInt("@wfid", 0)); + StatsScopeId graphScopeId(SSTgraph, graphName); + + Owned iter = graphPT->getElements("./*"); + ForEach(*iter) { - case SSTglobal: - case SSTworkflow: - case SSTgraph: - for (auto kind: aggregateKinds) + StatsScopeId sgScopeId; + IPropertyTree * sgPT = & iter->query(); + const char * sgName = sgPT->queryName(); + if (strcmp(sgName, "node")==0) + continue; + verifyex(sgScopeId.setScopeText(sgName)); + + if (missingScopesOnly) // Skip scopes that already in collection { - stat_type value; - if (cur.getStatistic(kind, value) && value) - { - StringBuffer s; - wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), cur.queryScopeType(), cur.getFullScope(s).str(), kind, nullptr, value, 1, 0, StatsMergeReplace); - } + IStatisticCollection * sgCollection = statsCollection->querySubScopePath({wfScopeId, graphScopeId, sgScopeId}); + if (sgCollection) + continue; } - if (cur.queryScopeType()==SSTgraph) - return false; - else - return true; - default: - return false; + + MemoryBuffer compressed; + sgPT->getPropBin("Stats", compressed); + if (!compressed.length()) + break; + MemoryBuffer serialized; + decompressToBuffer(serialized, compressed); + + unsigned version; + serialized.read(version); + byte kind; + serialized.read(kind); + + StatsScopeId childId; + childId.deserialize(serialized, version); + int statsMinDepth = 0, statsMaxDepth = INT_MAX; + if (aggregatesOnly) + { + statsMinDepth = 3; + statsMaxDepth = 3; + } + // deserialize the subgraph stats, excluding any descendant stats + // Note: not all sg stats required for generating aggregates, so it would be + // more efficient to deserialize just the stats in the aggregateKinds list. + statsCollection->deserializeChild(childId, serialized, version, statsMinDepth, statsMaxDepth); } } -}; +} -void updateAggregates(IWorkUnit *wu) -{ - // updateAggregates() reads in the graph stats, calculates the aggregates and then writes the aggregates back to global statistics. - // - // The next iteration of this PR would be to track the stats in memory and, periodically, calculate the aggregates and write the aggregates. For Thor: - // it could do this by: - // 1) Having a single global GlobalStatisticCollection(derived from CStatisticCollection) in DeMonServer - // 2) When the jobs startup, populate the GlobalStatisticCollection with aggregates and graph stats - // 2) Replace existing code related to gathering stats from CMasterGraph and serializing with - // a new method that updates the statistics to GlobalStatisticCollection - // 3) New method for gathering and serializing graph stats: - // (i) A new class (say StatisticCollectionGatherer) would be needed with an IStatisticGatherer interface which updates the GlobalStatisticCollection - // - This StatisticCollectionGatherer would be bound to a particular graph scope (this is because the CMasterGraph::getStats expects this to be case) - // - Everytime StatisticCollectionGatherer is called with beginScope where the scope type is a subgraph, it would need to delete that scope and its children. - // This is because CMasterGraph::getStats returns a fresh full set of stats for a given subgraph. - // (ii) Create a new member function to write the stats to GraphProgress and aggregates to global stats. Possibly, - // (a) maintain a dirty flag for each subgraph scope so that only modified subgraphs are serialized. - // (b) this member function would be called by DeMonServer at fixed intevals - // 4) Modify GlobalStatisticCollection::refreshAggregates should clear the existing aggregates before calculating new aggregates - // - this is need because multiple calls to refreshAggregates should not add to existing aggregates - // Subsequent iteration: - // 1) Serialize the aggregates into a blob in GraphProgress rather than to global stats - Owned root = getWUGraphProgress(wu->queryWuid(), true); - if (root) +void GlobalStatisticCollection::loadGlobalAggregates(IConstWorkUnit &workunit) +{ + class StatsCollectionAggregatesLoader : public IWuScopeVisitor { - Owned stats = createGlobalStatisticCollection(root); - std::vector aggregateKinds = {StCostFileAccess, StSizeGraphSpill, StSizeSpillFile}; - std::vector totals(aggregateKinds.size()); + public: + StatsCollectionAggregatesLoader(IStatisticCollection * _statsCollection) : statsCollection(_statsCollection) {} - stats->refreshAggregates(aggregateKinds, totals); + virtual void noteStatistic(StatisticKind kind, unsigned __int64 value, IConstWUStatistic & extra) override + { + statsCollection->setStatistic(extra.queryScope(), kind, value); + } + virtual void noteAttribute(WuAttr attr, const char * value) override { throwUnexpected(); } + virtual void noteHint(const char * kind, const char * value) override { throwUnexpected(); } + virtual void noteException(IConstWUException & exception) { throwUnexpected(); } + private: + Linked statsCollection; + }; - StatisticsAggregatesWriter statsAggregatorWriter(wu, aggregateKinds); - stats->visit(statsAggregatorWriter); - } + WuScopeFilter filter; + filter.addScopeType(SSTglobal).addScopeType(SSTgraph).addScopeType(SSTsubgraph); + for(auto aggregateKind: aggregateKinds) + filter.addOutputStatistic(aggregateKind); + filter.setDepth(1,3); + filter.setSources(SSFsearchGlobalStats); + filter.setIncludeNesting(0); + filter.finishedFilter(); + + StatsCollectionAggregatesLoader aggregatesLoader(statsCollection); + Owned iter = &workunit.getScopeIterator(filter); + ForEach(*iter) + iter->playProperties(aggregatesLoader); } + +// getCollectionForUpdate() returns IStatisticCollection for the given rootScope +// if clearStats==true then the existing stats are cleared for the given scope +IStatisticCollection * GlobalStatisticCollection::getCollectionForUpdate(const StatsScopeId & wfScopeId, const StatsScopeId & graphScopeId, const StatsScopeId & sgScopeId, StatisticCreatorType creatorType, const char * creator, bool clearStats) +{ + IStatisticCollection * sgScopeCollection = statsCollection->ensureSubScopePath({wfScopeId,graphScopeId, sgScopeId}); + if (clearStats) + sgScopeCollection->clearStats(); + sgScopeCollection->markDirty(); + return createRootStatisticCollection(creatorType, creator, wfScopeId, graphScopeId, sgScopeCollection); +} + +// Re-calculated aggregates from subgraph scope +bool GlobalStatisticCollection::refreshAggregates() +{ + std::vector totals(aggregateKinds.size()); + return statsCollection->refreshAggregates(aggregateKinds, totals); +} + +// Refresh aggregates and write them to global stats +void GlobalStatisticCollection::updateAggregates(IWorkUnit *wu) +{ + class StatisticsAggregatesWriter : implements IStatisticVisitor + { + std::vector & aggregateKinds; + Linked wu; + public: + StatisticsAggregatesWriter(IWorkUnit * _wu, std::vector & _aggregateKinds): wu(_wu), aggregateKinds(_aggregateKinds) {} + + virtual bool visitScope(const IStatisticCollection & cur) + { + switch (cur.queryScopeType()) + { + case SSTglobal: + case SSTworkflow: + case SSTgraph: + for (auto kind: aggregateKinds) + { + stat_type value; + if (cur.getStatistic(kind, value) && value) + { + StringBuffer s; + wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), cur.queryScopeType(), cur.getFullScope(s).str(), kind, nullptr, value, 1, 0, StatsMergeReplace); + } + } + if (cur.queryScopeType()==SSTgraph) + return false; + else + return true; + default: + return false; + } + } + }; + StatisticsAggregatesWriter statsAggregatorWriter(wu, aggregateKinds); + if(refreshAggregates()) // Only serialize if the aggregates has changed + statsCollection->visit(statsAggregatorWriter); +} + //--------------------------------------------------------------------------------------------------------------------- @@ -3797,8 +3900,8 @@ class CDaliWorkUnit; class CDaliWuGraphStats : public CWuGraphStats { public: - CDaliWuGraphStats(const CDaliWorkUnit* _owner, StatisticCreatorType _creatorType, const char * _creator, unsigned _wfid, const char * _rootScope, unsigned _id, bool _merge) - : CWuGraphStats(_creatorType, _creator, _wfid, _rootScope, _id, _merge), owner(_owner), graphName(_rootScope), wfid(_wfid) + CDaliWuGraphStats(const CDaliWorkUnit* _owner, StatisticCreatorType _creatorType, const char * _creator, unsigned _wfid, const char * _rootScope, unsigned _id, bool _merge, GlobalStatisticCollection * stats) + : CWuGraphStats(_creatorType, _creator, _wfid, _rootScope, _id, _merge, stats), owner(_owner), graphName(_rootScope), wfid(_wfid) { } protected: @@ -3812,8 +3915,8 @@ class CDaliWuGraphStats : public CWuGraphStats class CLocalWuGraphStats : public CWuGraphStats { public: - CLocalWuGraphStats(IPropertyTree *_p, StatisticCreatorType _creatorType, const char * _creator, unsigned _wfid, const char * _rootScope, unsigned _id, bool _merge) - : CWuGraphStats(_creatorType, _creator, _wfid, _rootScope, _id, _merge), graphName(_rootScope), p(_p) + CLocalWuGraphStats(IPropertyTree *_p, StatisticCreatorType _creatorType, const char * _creator, unsigned _wfid, const char * _rootScope, unsigned _id, bool _merge, GlobalStatisticCollection * stats) + : CWuGraphStats(_creatorType, _creator, _wfid, _rootScope, _id, _merge, stats), graphName(_rootScope), p(_p) { } protected: @@ -4117,9 +4220,9 @@ class CDaliWorkUnit : public CPersistedWorkUnit } } } - virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge) const override + virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge, GlobalStatisticCollection * stats) const override { - return new CDaliWuGraphStats(this, creatorType, creator, _wfid, graphName, subgraph, merge); + return new CDaliWuGraphStats(this, creatorType, creator, _wfid, graphName, subgraph, merge, stats); } virtual void import(IPropertyTree *wuTree, IPropertyTree *graphProgressTree) { @@ -4429,8 +4532,8 @@ class CLockedWorkUnit : implements ILocalWorkUnit, implements IExtendedWUInterfa { c->setGraphState(graphName, wfid, state); } virtual void setNodeState(const char *graphName, WUGraphIDType nodeId, WUGraphState state) const { c->setNodeState(graphName, nodeId, state); } - virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge) const override - { return c->updateStats(graphName, creatorType, creator, _wfid, subgraph, merge); } + virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge, GlobalStatisticCollection * stats=nullptr) const override + { return c->updateStats(graphName, creatorType, creator, _wfid, subgraph, merge, stats); } virtual void clearGraphProgress() const { c->clearGraphProgress(); } virtual IStringVal & getAbortBy(IStringVal & str) const @@ -10260,9 +10363,9 @@ void CLocalWorkUnit::setNodeState(const char *graphName, WUGraphIDType nodeId, W { throwUnexpected(); // Should only be used for persisted workunits } -IWUGraphStats *CLocalWorkUnit::updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge) const +IWUGraphStats *CLocalWorkUnit::updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge, GlobalStatisticCollection * stats) const { - return new CLocalWuGraphStats(LINK(p), creatorType, creator, _wfid, graphName, subgraph, merge); + return new CLocalWuGraphStats(LINK(p), creatorType, creator, _wfid, graphName, subgraph, merge, stats); } void CLocalWUGraph::setName(const char *str) diff --git a/common/workunit/workunit.hpp b/common/workunit/workunit.hpp index 3769438de71..7b96b2cc1fc 100644 --- a/common/workunit/workunit.hpp +++ b/common/workunit/workunit.hpp @@ -1174,7 +1174,6 @@ interface IConstWUScopeIterator : extends IScmIterator }; //--------------------------------------------------------------------------------------------------------------------- - //! IWorkUnit //! Provides high level access to WorkUnit "header" data. interface IWorkUnit; @@ -1204,6 +1203,7 @@ interface IConstWorkUnitInfo : extends IInterface virtual IConstWUAppValueIterator & getApplicationValues() const = 0; }; +class GlobalStatisticCollection; interface IConstWorkUnit : extends IConstWorkUnitInfo { virtual bool aborting() const = 0; @@ -1302,7 +1302,7 @@ interface IConstWorkUnit : extends IConstWorkUnitInfo virtual WUGraphState queryNodeState(const char *graphName, WUGraphIDType nodeId) const = 0; virtual void setGraphState(const char *graphName, unsigned wfid, WUGraphState state) const = 0; virtual void setNodeState(const char *graphName, WUGraphIDType nodeId, WUGraphState state) const = 0; - virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge) const = 0; + virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge, GlobalStatisticCollection * stats=nullptr) const = 0; virtual void clearGraphProgress() const = 0; virtual IStringVal & getAbortBy(IStringVal & str) const = 0; virtual unsigned __int64 getAbortTimeStamp() const = 0; @@ -1725,7 +1725,6 @@ extern WORKUNIT_API void updateWorkunitTimings(IWorkUnit * wu, ITimeReporter *ti extern WORKUNIT_API void updateWorkunitTimings(IWorkUnit * wu, StatisticScopeType scopeType, StatisticKind kind, ITimeReporter *timer); extern WORKUNIT_API void aggregateStatistic(StatsAggregation & result, IConstWorkUnit * wu, const WuScopeFilter & filter, StatisticKind search); extern WORKUNIT_API cost_type aggregateCost(const IConstWorkUnit * wu, const char *scope=nullptr, bool excludehThor=false); -extern WORKUNIT_API void updateAggregates(IWorkUnit *wu); extern WORKUNIT_API const char *getTargetClusterComponentName(const char *clustname, const char *processType, StringBuffer &name); extern WORKUNIT_API void descheduleWorkunit(char const * wuid); #if 0 @@ -1784,4 +1783,22 @@ extern WORKUNIT_API TraceFlags loadTraceFlags(IConstWorkUnit * wu, const std::in extern WORKUNIT_API bool executeGraphOnLingeringThor(IConstWorkUnit &workunit, unsigned wfid, const char *graphName); + +class WORKUNIT_API GlobalStatisticCollection : public CInterface +{ +public: + GlobalStatisticCollection(); + + void load(IConstWorkUnit &workunit, const char * graphName, bool aggregatesOnly, bool missingScopesOnly); + void loadGlobalAggregates(IConstWorkUnit &workunit); + IStatisticCollection * getCollectionForUpdate(const StatsScopeId & wfScopeId, const StatsScopeId & graphScopeId, const StatsScopeId & sgScopeId, StatisticCreatorType creatorType, const char * creator, bool clearStats); + bool refreshAggregates(); + IStatisticCollection * queryCollection() { return statsCollection; } + void updateAggregates(IWorkUnit *wu); +private: + Owned statsCollection; + std::vector aggregateKinds = {StCostFileAccess, StSizeGraphSpill, StSizeSpillFile}; + StringBuffer wuid; +}; + #endif diff --git a/common/workunit/workunit.ipp b/common/workunit/workunit.ipp index 4f166ea67fb..85cfc396693 100644 --- a/common/workunit/workunit.ipp +++ b/common/workunit/workunit.ipp @@ -232,7 +232,7 @@ public: virtual void setGraphState(const char *graphName, unsigned wfid, WUGraphState state) const; virtual void setNodeState(const char *graphName, WUGraphIDType nodeId, WUGraphState state) const; virtual WUGraphState queryNodeState(const char *graphName, WUGraphIDType nodeId) const; - virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge) const override; + virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge, GlobalStatisticCollection * stats=nullptr) const override; void clearGraphProgress() const; virtual void import(IPropertyTree *wuTree, IPropertyTree *graphProgressTree) {}; //No GraphProgressTree in CLocalWorkUnit. @@ -661,7 +661,7 @@ public: class WORKUNIT_API CWuGraphStats : public CInterfaceOf { public: - CWuGraphStats(StatisticCreatorType _creatorType, const char * _creator, unsigned wfid, const char * _rootScope, unsigned _id, bool _merge); + CWuGraphStats(StatisticCreatorType _creatorType, const char * _creator, unsigned wfid, const char * _rootScope, unsigned _id, bool _merge, GlobalStatisticCollection * stats); virtual void beforeDispose(); virtual IStatisticGatherer & queryStatsBuilder(); protected: diff --git a/ecl/eclagent/eclagent.cpp b/ecl/eclagent/eclagent.cpp index 61314717e6c..b2b1b356a95 100644 --- a/ecl/eclagent/eclagent.cpp +++ b/ecl/eclagent/eclagent.cpp @@ -1988,7 +1988,6 @@ void EclAgent::doProcess() const cost_type cost = aggregateCost(w, nullptr, false); if (cost) w->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTglobal, "", StCostExecute, NULL, cost, 1, 0, StatsMergeReplace); - updateAggregates(w); addTimings(w); switch (w->getState()) @@ -2510,7 +2509,6 @@ void EclAgentWorkflowMachine::noteTiming(unsigned wfid, timestamp_type startTime const cost_type cost = money2cost_type(calcCost(agent.queryAgentMachineCost(), nanoToMilli(elapsedNs))) + aggregateCost(wu, scope, true); if (cost) wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTworkflow, scope, StCostExecute, NULL, cost, 1, 0, StatsMergeReplace); - updateAggregates(wu); } void EclAgentWorkflowMachine::doExecutePersistItem(IRuntimeWorkflowItem & item) diff --git a/ecl/eclagent/eclagent.ipp b/ecl/eclagent/eclagent.ipp index 10c86cc6ebb..ae2220d37e5 100644 --- a/ecl/eclagent/eclagent.ipp +++ b/ecl/eclagent/eclagent.ipp @@ -1048,6 +1048,7 @@ public: graphAgentContext.set(&_agent); agent = &graphAgentContext; aborted = false; + globalStats.load(*wu, graphName, true, true); } void createFromXGMML(ILoadedDllEntry * dll, IPropertyTree * xgmml); @@ -1055,7 +1056,7 @@ public: void executeLibrary(const byte * parentExtract, IHThorGraphResults * results); IWUGraphStats *updateStats(StatisticCreatorType creatorType, const char * creator, unsigned wfid, unsigned subgraph); void updateWUStatistic(IWorkUnit* lockedwu, StatisticScopeType scopeType, const char* scope, StatisticKind kind, const char* descr, long long unsigned int value); - + void updateAggregates(IWorkUnit* lockedwu); EclSubGraph * idToGraph(unsigned id); EclGraphElement * idToActivity(unsigned id); const char *queryGraphName() { return graphName; } @@ -1083,6 +1084,7 @@ protected: IProbeManager * probeManager; unsigned wfid; bool aborted; + GlobalStatisticCollection globalStats; }; diff --git a/ecl/eclagent/eclgraph.cpp b/ecl/eclagent/eclgraph.cpp index eacfad7f992..a51517a4c10 100644 --- a/ecl/eclagent/eclgraph.cpp +++ b/ecl/eclagent/eclgraph.cpp @@ -879,10 +879,10 @@ void EclSubGraph::updateProgress() Owned progress = parent.updateStats(queryStatisticsComponentType(), queryStatisticsComponentName(), parent.queryWfid(), id); IStatisticGatherer & stats = progress->queryStatsBuilder(); updateProgress(stats); - if (startGraphTime || elapsedGraphCycles) { WorkunitUpdate lockedwu(agent->updateWorkUnit()); + parent.updateAggregates(lockedwu); StringBuffer subgraphid; subgraphid.append(parent.queryGraphName()).append(":").append(SubGraphScopePrefix).append(id); if (startGraphTime) @@ -1278,8 +1278,6 @@ void EclGraph::execute(const byte * parentExtract) const cost_type cost = money2cost_type(calcCost(agent->queryAgentMachineCost(), elapsed)); if (cost) wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTgraph, scope, StCostExecute, NULL, cost, 1, 0, StatsMergeReplace); - - updateAggregates(wu); } if (agent->queryRemoteWorkunit()) @@ -1348,7 +1346,7 @@ void EclGraph::updateLibraryProgress() { EclSubGraph & cur = graphs.item(idx); unsigned wfid = cur.parent.queryWfid(); - Owned progress = wu->updateStats(queryGraphName(), queryStatisticsComponentType(), queryStatisticsComponentName(), wfid, cur.id, false); + Owned progress = wu->updateStats(queryGraphName(), queryStatisticsComponentType(), queryStatisticsComponentName(), wfid, cur.id, false, &globalStats); cur.updateProgress(progress->queryStatsBuilder()); } } @@ -1491,7 +1489,12 @@ void GraphResults::setResult(unsigned id, IHThorGraphResult * result) IWUGraphStats *EclGraph::updateStats(StatisticCreatorType creatorType, const char * creator, unsigned activeWfid, unsigned subgraph) { - return wu->updateStats (queryGraphName(), creatorType, creator, activeWfid, subgraph, false); + return wu->updateStats (queryGraphName(), creatorType, creator, activeWfid, subgraph, false, &globalStats); +} + +void EclGraph::updateAggregates(IWorkUnit* lockedwu) +{ + globalStats.updateAggregates(lockedwu); } void EclGraph::updateWUStatistic(IWorkUnit *lockedwu, StatisticScopeType scopeType, const char * scope, StatisticKind kind, const char * descr, unsigned __int64 value) diff --git a/plugins/cassandra/cassandrawu.cpp b/plugins/cassandra/cassandrawu.cpp index 6315ad7800b..19c89b89df9 100644 --- a/plugins/cassandra/cassandrawu.cpp +++ b/plugins/cassandra/cassandrawu.cpp @@ -2743,8 +2743,8 @@ class CCassandraWorkUnit : public CPersistedWorkUnit class CCassandraWuGraphStats : public CWuGraphStats { public: - CCassandraWuGraphStats(const CCassandraWorkUnit *_parent, StatisticCreatorType _creatorType, const char * _creator, unsigned _wfid, const char * _rootScope, unsigned _id, bool _merge) - : CWuGraphStats(_creatorType, _creator, _wfid, _rootScope, _id, _merge), + CCassandraWuGraphStats(const CCassandraWorkUnit *_parent, StatisticCreatorType _creatorType, const char * _creator, unsigned _wfid, const char * _rootScope, unsigned _id, bool _merge, GlobalStatisticCollection * stats) + : CWuGraphStats(_creatorType, _creator, _wfid, _rootScope, _id, _merge, stats), progress(createPTree(_rootScope)), parent(_parent) { } @@ -2764,9 +2764,9 @@ class CCassandraWorkUnit : public CPersistedWorkUnit StringAttr wuid; }; - IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned wfid, unsigned subgraph, bool merge) const override + IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned wfid, unsigned subgraph, bool merge, GlobalStatisticCollection * stats) const override { - return new CCassandraWuGraphStats(this, creatorType, creator, wfid, graphName, subgraph, merge); + return new CCassandraWuGraphStats(this, creatorType, creator, wfid, graphName, subgraph, merge, stats); } diff --git a/system/jlib/jstats.cpp b/system/jlib/jstats.cpp index ec62349dd7b..afe617f1e13 100644 --- a/system/jlib/jstats.cpp +++ b/system/jlib/jstats.cpp @@ -1785,6 +1785,25 @@ class SortedCollectionIterator : public ArrayIIteratorOf { friend class CollectionHashTable; + + // deserialize without storing the stats + void deserializeNoStats(MemoryBuffer & in, unsigned version) + { + unsigned numStats; + in.read(numStats); + while (numStats-- > 0) + Statistic next (in, version); + unsigned numChildren; + in.read(numChildren); + while (numChildren-- > 0) + { + byte kind; + in.read(kind); + StatsScopeId childId; + childId.deserialize(in, version); + deserializeNoStats(in, version); + } + } public: CStatisticCollection(IStatisticCollection * _parent=nullptr) : parent(_parent) {} @@ -1795,7 +1814,7 @@ class CStatisticCollection : public CInterfaceOf CStatisticCollection(IStatisticCollection * _parent, MemoryBuffer & in, unsigned version) : parent(_parent) { id.deserialize(in, version); - deserialize(in, version); + deserialize(in, version, 0, INT_MAX); } virtual byte getCollectionType() const { return SCintermediate; } @@ -1819,10 +1838,11 @@ class CStatisticCollection : public CInterfaceOf } virtual StringBuffer & getFullScope(StringBuffer & str) const override { - if (parent && queryScopeType()!=SSTworkflow) + if (parent && queryScopeType()!=SSTglobal) { parent->getFullScope(str); - str.append(':'); + if (!str.isEmpty()) + str.append(':'); } id.getScopeText(str); return str; @@ -1850,6 +1870,25 @@ class CStatisticCollection : public CInterfaceOf } return false; } + virtual bool setStatistic(const char *scope, StatisticKind kind, unsigned __int64 & value) override + { + if (*scope=='\0') + { + updateStatistic(kind, value, StatsMergeReplace); + return true; + } + else + { + StatsScopeId childScopeId; + if (!childScopeId.setScopeText(scope, &scope) || (*scope!=':' && *scope!='\0')) + throw makeStringExceptionV(JLIBERR_UnexpectedValue, "'%s' does not appear to be a valid scope id", scope); + IStatisticCollection * child = ensureSubScope(childScopeId, true); + + if (*scope==':') + scope++; + return child->setStatistic(scope, kind, value); + } + } virtual unsigned getNumStatistics() const override { return stats.ordinality(); @@ -1910,7 +1949,7 @@ class CStatisticCollection : public CInterfaceOf stats.append(s); } - virtual void updateStatistic(StatisticKind kind, unsigned __int64 value, StatsMergeAction mergeAction) override + virtual bool updateStatistic(StatisticKind kind, unsigned __int64 value, StatsMergeAction mergeAction) override { if (mergeAction != StatsMergeAppend) { @@ -1919,13 +1958,19 @@ class CStatisticCollection : public CInterfaceOf Statistic & cur = stats.element(i); if (cur.kind == kind) { + if (mergeAction==StatsMergeReplace) + { + if (cur.value==value) + return false; + } cur.value = mergeStatisticValue(cur.value, value, mergeAction); - return; + return true; } } } Statistic s(kind, value); stats.append(s); + return true; } virtual IStatisticCollection * ensureSubScope(const StatsScopeId & search, bool hasChildren) override @@ -1940,7 +1985,27 @@ class CStatisticCollection : public CInterfaceOf return ret; } - virtual void serialize(MemoryBuffer & out) const + virtual IStatisticCollection * ensureSubScopePath(std::initializer_list path) override + { + IStatisticCollection * curScope = this; + for (auto it = path.begin(); it != path.end(); ++it) + curScope = curScope->ensureSubScope(*it, true); // n.b. this will always return a valid pointer + return curScope; + } + + virtual IStatisticCollection * querySubScopePath(std::initializer_list path) override + { + IStatisticCollection * curScope = this; + for (auto it = path.begin(); it != path.end(); ++it) + { + curScope = children.find(&(*it)); + if (!curScope) + return nullptr; + } + return curScope; + } + + virtual void serialize(MemoryBuffer & out) const override { out.append(getCollectionType()); id.serialize(out); @@ -1955,7 +2020,7 @@ class CStatisticCollection : public CInterfaceOf iter.query().serialize(out); } - virtual void deserialize(MemoryBuffer & in, unsigned version) override + virtual void deserialize(MemoryBuffer & in, unsigned version, int minDepth, int maxDepth) override { unsigned numStats; in.read(numStats); @@ -1963,9 +2028,9 @@ class CStatisticCollection : public CInterfaceOf while (numStats-- > 0) { Statistic next (in, version); - stats.append(next); + if (minDepth <= 0) + stats.append(next); } - unsigned numChildren; in.read(numChildren); children.ensure(numChildren); @@ -1975,10 +2040,23 @@ class CStatisticCollection : public CInterfaceOf in.read(kind); StatsScopeId childId; childId.deserialize(in, version); - IStatisticCollection * collection = ensureSubScope(childId, true); - collection->deserialize(in, version); + deserializeChild(childId, in, version, minDepth, maxDepth); } } + + virtual void deserializeChild(StatsScopeId childId, MemoryBuffer & in, unsigned version, int minDepth, int maxDepth) override + { + if (maxDepth > 0) + { + IStatisticCollection * childCollection = ensureSubScope(childId, true); + childCollection->deserialize(in, version, (minDepth-1), (maxDepth-1)); + } + else + { + deserializeNoStats(in, version); + } + } + inline const StatsScopeId & queryScopeId() const { return id; } virtual void mergeInto(IStatisticGatherer & target) const @@ -2011,7 +2089,7 @@ class CStatisticCollection : public CInterfaceOf assertex(aggregateKinds.size()==totals.size()); bool updated = false; - if (queryScopeType()==SSTsubgraph) + if (queryScopeType()==SSTsubgraph || isDirty==false) { ForEachItemIn(i, stats) { @@ -2020,9 +2098,9 @@ class CStatisticCollection : public CInterfaceOf auto iteratorVec = std::find(aggregateKinds.begin(), aggregateKinds.end(), kind); if (iteratorVec!=aggregateKinds.end()) { - unsigned pos = iteratorVec-aggregateKinds.begin(); StatsMergeAction mergeAction = queryMergeMode(kind); - totals[pos] = mergeStatisticValue(totals[pos], stat.queryValue(), mergeAction); + unsigned pos = iteratorVec-aggregateKinds.begin(); + totals[pos] += mergeStatisticValue(totals[pos], stat.queryValue(), mergeAction); updated = true; } } @@ -2037,30 +2115,49 @@ class CStatisticCollection : public CInterfaceOf } if (updated) { + updated=false; std::vector::iterator totalIter = totals.begin(); std::vector::iterator subTotalIter = childTotals.begin(); std::vector::iterator kindIter = aggregateKinds.begin(); while (totalIter != totals.end()) { + if (updateStatistic(*kindIter, *subTotalIter, StatsMergeReplace)) + updated=true; StatsMergeAction mergeAction = queryMergeMode(*kindIter); - updateStatistic(*kindIter, *subTotalIter, mergeAction); - (*totalIter) = mergeStatisticValue(*totalIter, *subTotalIter, mergeAction); - + (*totalIter) += mergeStatisticValue(*totalIter, *subTotalIter, mergeAction); ++totalIter; ++subTotalIter; ++kindIter; } } + isDirty=false; } return updated; } - + virtual void clearStats() override + { + stats.clear(true); + // Note: Children should NOT be deleted as all pointers return by ensureSubScope must still be valid + SuperHashIteratorOf iter(children, false); + for (iter.first(); iter.isValid(); iter.next()) + iter.query().clearStats(); + } + virtual void addChild(IStatisticCollection *stats) override + { + children.add(*LINK(stats)); + } + virtual void markDirty() override + { + isDirty=true; + if (parent) parent->markDirty(); + } private: StatsScopeId id; IStatisticCollection * parent; protected: CollectionHashTable children; StatsArray stats; + bool isDirty = false; // used to track which scope has changed (used to workout what aggregates to recalculate) }; StringBuffer &CStatisticCollection::toXML(StringBuffer &out) const @@ -2132,8 +2229,14 @@ bool CollectionHashTable::matchesElement(const void *et, const void *searchET) c class CRootStatisticCollection : public CStatisticCollection { public: - CRootStatisticCollection(StatisticCreatorType _creatorType, const char * _creator, const StatsScopeId & _id) - : CStatisticCollection(nullptr, _id), creatorType(_creatorType), creator(_creator) + CRootStatisticCollection(StatisticCreatorType _creatorType, const char * _creator, const StatsScopeId & rootScopeId, const StatsScopeId & graphScopeId, IStatisticCollection * sgCollection) + : CStatisticCollection(nullptr, rootScopeId), creatorType(_creatorType), creator(_creator) + { + whenCreated = getTimeStampNowValue(); + IStatisticCollection * child = ensureSubScope(graphScopeId, true); + child->addChild(sgCollection); + } + CRootStatisticCollection(StatisticCreatorType _creatorType, const char * _creator, const StatsScopeId & rootScopeId) : CStatisticCollection(nullptr, rootScopeId), creatorType(_creatorType), creator(_creator) { whenCreated = getTimeStampNowValue(); } @@ -2146,13 +2249,13 @@ class CRootStatisticCollection : public CStatisticCollection in.read(whenCreated); } - virtual byte getCollectionType() const { return SCroot; } + virtual byte getCollectionType() const override { return SCroot; } - virtual unsigned __int64 queryWhenCreated() const + virtual unsigned __int64 queryWhenCreated() const override { return whenCreated; } - virtual void serialize(MemoryBuffer & out) const + virtual void serialize(MemoryBuffer & out) const override { CStatisticCollection::serialize(out); out.append((byte)creatorType); @@ -2174,52 +2277,6 @@ class CRootStatisticCollection : public CStatisticCollection unsigned __int64 whenCreated; }; - -StatsScopeId globalScopeId(SSTglobal, (unsigned)0); -class GlobalStatisticCollection : public CStatisticCollection -{ -public: - GlobalStatisticCollection(IPropertyTree * root) : CStatisticCollection(nullptr, globalScopeId) - { - Owned iter = root->getElements("*"); - ForEach(*iter) - { - IPropertyTree * graphPT = &iter->query(); - - Owned iter2 = graphPT->getElements("./*"); - ForEach(*iter2) - { - IPropertyTree * sgPT = & iter2->query(); - const char * sgName = sgPT->queryName(); - if (strcmp(sgName, "node")==0) - continue; - assertex(strncmp(sgName, "sg", 2)==0); - MemoryBuffer compressed; - sgPT->getPropBin("Stats", compressed); - if (!compressed.length()) - return; - - MemoryBuffer serialized; - decompressToBuffer(serialized, compressed); - unsigned version; - serialized.read(version); - byte kind; - serialized.read(kind); - - StatsScopeId id; - id.deserialize(serialized, version); - IStatisticCollection * collection = ensureSubScope(id, true); - collection->deserialize(serialized, version); - } - } - } -}; - -IStatisticCollection * createGlobalStatisticCollection(IPropertyTree * root) -{ - return new GlobalStatisticCollection(root); -} - class StatAggregator : implements IStatisticVisitor { public: @@ -2295,6 +2352,16 @@ IStatisticCollection * createStatisticCollection(MemoryBuffer & in) return deserializeCollection(NULL, in, version); } +IStatisticCollection * createStatisticCollection(IStatisticCollection * parent, StatsScopeId scopeId) +{ + return new CStatisticCollection(parent, scopeId); +} + +IStatisticCollection * createRootStatisticCollection(StatisticCreatorType creatorType, const char * creator, const StatsScopeId & rootScope, const StatsScopeId & graphScopeId, IStatisticCollection * childCollection) +{ + //creator unused at the moment. + return new CRootStatisticCollection(creatorType, creator, rootScope, graphScopeId, childCollection); +} //-------------------------------------------------------------------------------------------------------------------- @@ -2366,9 +2433,12 @@ class StatisticGatherer : implements CInterfaceOf extern IStatisticGatherer * createStatisticsGatherer(StatisticCreatorType creatorType, const char * creator, const StatsScopeId & rootScope) { - //creator unused at the moment. - Owned rootCollection = new CRootStatisticCollection(creatorType, creator, rootScope); - return new StatisticGatherer(rootCollection); + return new StatisticGatherer(new CRootStatisticCollection(creatorType, creator, rootScope)); +} + +extern IStatisticGatherer * createStatisticsGatherer(IStatisticCollection * stats) +{ + return new StatisticGatherer(stats); } //-------------------------------------------------------------------------------------------------------------------- @@ -3932,9 +4002,9 @@ void StatisticsFilter::setScopeDepth(unsigned _scopeDepth) scopeFilter.setDepth(_scopeDepth); } -void StatisticsFilter::setScopeDepth(unsigned _minScopeDepth, unsigned _maxScopeDepth) +void StatisticsFilter::setScopeDepth(unsigned _maxDepthDepth, unsigned _maxScopeDepth) { - scopeFilter.setDepth(_minScopeDepth, _maxScopeDepth); + scopeFilter.setDepth(_maxDepthDepth, _maxScopeDepth); } void StatisticsFilter::setScope(const char * _scope) diff --git a/system/jlib/jstats.h b/system/jlib/jstats.h index 2508fc6524b..b044a9e47a2 100644 --- a/system/jlib/jstats.h +++ b/system/jlib/jstats.h @@ -126,6 +126,7 @@ interface IStatisticCollection : public IInterface virtual unsigned getNumStatistics() const = 0; virtual bool getStatistic(StatisticKind kind, unsigned __int64 & value) const = 0; virtual void getStatistic(StatisticKind & kind, unsigned __int64 & value, unsigned idx) const = 0; + virtual bool setStatistic(const char *scope, StatisticKind kind, unsigned __int64 & value) = 0; virtual IStatisticCollectionIterator & getScopes(const char * filter, bool sorted) = 0; virtual void getMinMaxScope(IStringVal & minValue, IStringVal & maxValue, StatisticScopeType searchScopeType) const = 0; virtual void getMinMaxActivity(unsigned & minValue, unsigned & maxValue) const = 0; @@ -136,10 +137,16 @@ interface IStatisticCollection : public IInterface virtual void visit(IStatisticVisitor & target) const = 0; virtual void visitChildren(IStatisticVisitor & target) const = 0; virtual IStatisticCollection * ensureSubScope(const StatsScopeId & search, bool hasChildren) = 0; + virtual IStatisticCollection * ensureSubScopePath(std::initializer_list path) = 0; + virtual IStatisticCollection * querySubScopePath(std::initializer_list path) = 0; virtual void addStatistic(StatisticKind kind, unsigned __int64 value) = 0; - virtual void updateStatistic(StatisticKind kind, unsigned __int64 value, StatsMergeAction mergeAction) = 0; + virtual bool updateStatistic(StatisticKind kind, unsigned __int64 value, StatsMergeAction mergeAction) = 0; virtual bool refreshAggregates(std::vector & aggregateKinds, std::vector & totals) = 0; - virtual void deserialize(MemoryBuffer & in, unsigned version) = 0; + virtual void deserialize(MemoryBuffer & in, unsigned version, int minDepth, int maxDepth) = 0; + virtual void deserializeChild(StatsScopeId scopeId, MemoryBuffer & in, unsigned version, int minDepth, int maxDepth) = 0; + virtual void clearStats() = 0; + virtual void addChild(IStatisticCollection *stats) = 0; + virtual void markDirty() = 0; }; interface IStatisticCollectionIterator : public IIteratorOf @@ -402,8 +409,8 @@ class jlib_decl StatisticsFilter : public CInterface void setCreator(const char * _creator); void setCreatorType(StatisticCreatorType _creatorType); void setFilter(const char * filter); - void setScopeDepth(unsigned _minScopeDepth); - void setScopeDepth(unsigned _minScopeDepth, unsigned _maxScopeDepth); + void setScopeDepth(unsigned _maxDepthDepth); + void setScopeDepth(unsigned _maxDepthDepth, unsigned _maxScopeDepth); void setScope(const char * _scope); void setScopeType(StatisticScopeType _scopeType); void setValueRange(unsigned __int64 minValue, unsigned __int64 _maxValue); @@ -889,9 +896,12 @@ extern jlib_decl StatisticCreatorType queryCreatorType(const char * sct, Statist extern jlib_decl StatisticScopeType queryScopeType(const char * sst, StatisticScopeType dft); extern jlib_decl IStatisticGatherer * createStatisticsGatherer(StatisticCreatorType creatorType, const char * creator, const StatsScopeId & rootScope); +extern jlib_decl IStatisticGatherer * createStatisticsGatherer(IStatisticCollection * stats); +extern jlib_decl IStatisticCollection * createRootStatisticCollection(StatisticCreatorType creatorType, const char * creator, const StatsScopeId & rootScope, const StatsScopeId & graphScope, IStatisticCollection * childCollection=nullptr); extern jlib_decl void serializeStatisticCollection(MemoryBuffer & out, IStatisticCollection * collection); extern jlib_decl IStatisticCollection * createStatisticCollection(MemoryBuffer & in); -extern jlib_decl IStatisticCollection * createGlobalStatisticCollection(IPropertyTree * root, std::vector & aggregateKinds); +extern jlib_decl IStatisticCollection * createStatisticCollection(IStatisticCollection * parent, StatsScopeId scopeId); +// extern jlib_decl IStatisticCollection * createGlobalStatisticCollection(IPropertyTree * root, std::vector & aggregateKinds); inline unsigned __int64 milliToNano(unsigned __int64 value) { return value * 1000000; } // call avoids need to upcast values inline unsigned __int64 nanoToMilli(unsigned __int64 value) { return value / 1000000; } diff --git a/thorlcr/master/thdemonserver.cpp b/thorlcr/master/thdemonserver.cpp index e96a4ae07c9..7e2e7c09371 100644 --- a/thorlcr/master/thdemonserver.cpp +++ b/thorlcr/master/thdemonserver.cpp @@ -43,6 +43,7 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer unsigned numberOfMachines = 0; cost_type costLimit = 0; cost_type workunitCost = 0; + GlobalStatisticCollection globalStatsCollection; void doReportGraph(IStatisticGatherer & stats, CGraphBase *graph) { @@ -146,7 +147,7 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer ForEachItemIn (g, activeGraphs) { CGraphBase &graph = activeGraphs.item(g); - Owned stats = currentWU.updateStats(graphName, SCTthor, queryStatisticsComponentName(), wfid, graph.queryGraphId(), false); + Owned stats = currentWU.updateStats(graphName, SCTthor, queryStatisticsComponentName(), wfid, graph.queryGraphId(), false, &globalStatsCollection); reportGraph(stats->queryStatsBuilder(), &graph); } Owned wu = ¤tWU.lock(); @@ -156,6 +157,7 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer unsigned startTime = graphStarts.item(g2); reportStatus(wu, graph, startTime, finished, success); } + updateAggregates(wu); queryServerStatus().commitProperties(); } catch (IException *E) @@ -174,7 +176,7 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer const char *graphName = ((CJobMaster &)activeGraphs.item(0).queryJob()).queryGraphName(); unsigned wfid = graph->queryJob().getWfid(); { - Owned stats = currentWU.updateStats(graphName, SCTthor, queryStatisticsComponentName(), wfid, graph->queryGraphId(), false); + Owned stats = currentWU.updateStats(graphName, SCTthor, queryStatisticsComponentName(), wfid, graph->queryGraphId(), false, &globalStatsCollection); reportGraph(stats->queryStatsBuilder(), graph); } @@ -187,7 +189,6 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTsubgraph, graphScope, StWhenStarted, NULL, getTimeStampNowValue(), 1, 0, StatsMergeAppend); } reportStatus(wu, *graph, startTime, finished, success); - queryServerStatus().commitProperties(); } catch (IException *e) @@ -301,6 +302,14 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer reportActiveGraphs(true, false); activeGraphs.kill(); } + virtual void updateAggregates(IWorkUnit * lockedWu) override + { + globalStatsCollection.updateAggregates(lockedWu); + } + virtual void loadStats(IConstWorkUnit &workunit, const char * graphName, bool aggregatesOnly, bool missingScopesOnly) override + { + globalStatsCollection.load(workunit, graphName, aggregatesOnly, missingScopesOnly); + } }; diff --git a/thorlcr/master/thdemonserver.hpp b/thorlcr/master/thdemonserver.hpp index 32453c92764..69cd8c8fa14 100644 --- a/thorlcr/master/thdemonserver.hpp +++ b/thorlcr/master/thdemonserver.hpp @@ -24,12 +24,15 @@ interface IWUGraphProgress; class CGraphBase; +interface IConstWorkUnit; interface IDeMonServer : extends IInterface { virtual void takeHeartBeat(MemoryBuffer &progressMbb) = 0; virtual void startGraph(CGraphBase *graph) = 0; virtual void endGraph(CGraphBase *graph, bool success) = 0; virtual void endGraphs() = 0; + virtual void updateAggregates(IWorkUnit * lockedWu) = 0; + virtual void loadStats(IConstWorkUnit &workunit, const char * graphName, bool aggregatesOnly, bool missingScopesOnly) = 0; }; diff --git a/thorlcr/master/thgraphmanager.cpp b/thorlcr/master/thgraphmanager.cpp index b249d301b4b..689f434a477 100644 --- a/thorlcr/master/thgraphmanager.cpp +++ b/thorlcr/master/thgraphmanager.cpp @@ -1117,6 +1117,8 @@ bool CJobManager::executeGraph(IConstWorkUnit &workunit, const char *graphName, if (isContainerized() && podInfo.hasStdDev()) podInfo.report(wu); } + if (globals->getPropBool("@watchdogProgressEnabled")) + queryDeMonServer()->loadStats(workunit, graphName, true, true); setWuid(workunit.queryWuid(), workunit.queryClusterName()); @@ -1134,7 +1136,9 @@ bool CJobManager::executeGraph(IConstWorkUnit &workunit, const char *graphName, cost_type cost = money2cost_type(calculateThorCost(nanoToMilli(graphTimeNs), numberOfMachines)); if (cost) wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTgraph, graphScope, StCostExecute, NULL, cost, 1, 0, StatsMergeReplace); - updateAggregates(wu); + if (globals->getPropBool("@watchdogProgressEnabled")) + queryDeMonServer()->updateAggregates(wu); + removeJob(*job); } catch (IException *e) diff --git a/vcpkg b/vcpkg index 1bc64df4225..4c3f6e34fe1 160000 --- a/vcpkg +++ b/vcpkg @@ -1 +1 @@ -Subproject commit 1bc64df4225a58259ac752adcdcbbb8bee243d2f +Subproject commit 4c3f6e34fe1e206e327000888442a3b65e82c3ec