diff --git a/ecl/eclagent/agentctx.hpp b/ecl/eclagent/agentctx.hpp index a016679fecf..9d142312269 100644 --- a/ecl/eclagent/agentctx.hpp +++ b/ecl/eclagent/agentctx.hpp @@ -124,6 +124,8 @@ struct IAgentContext : extends IGlobalCodeContext virtual bool forceNewDiskReadActivity() const = 0; virtual void addWuExceptionEx(const char * text, unsigned code, unsigned severity, unsigned audience, char const * source) = 0; virtual double queryAgentMachineCost() const = 0; + virtual IWUGraphStats *updateStats(StatisticCreatorType creatorType, const char * creator, unsigned activeWfid, const char *graphName, unsigned subgraph) = 0; + virtual void updateAggregates(IWorkUnit* lockedwu) = 0; }; #endif // AGENTCTX_HPP_INCL diff --git a/ecl/eclagent/eclagent.cpp b/ecl/eclagent/eclagent.cpp index b2b1b356a95..ecc1f08856b 100644 --- a/ecl/eclagent/eclagent.cpp +++ b/ecl/eclagent/eclagent.cpp @@ -1985,9 +1985,6 @@ void EclAgent::doProcess() const __int64 elapsedNs = elapsedTimer.elapsedNs(); updateWorkunitStat(w, SSTglobal, NULL, StTimeElapsed, nullptr, elapsedNs); - const cost_type cost = aggregateCost(w, nullptr, false); - if (cost) - w->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTglobal, "", StCostExecute, NULL, cost, 1, 0, StatsMergeReplace); addTimings(w); switch (w->getState()) diff --git a/ecl/eclagent/eclagent.ipp b/ecl/eclagent/eclagent.ipp index 20b73dced30..db29e9292a8 100644 --- a/ecl/eclagent/eclagent.ipp +++ b/ecl/eclagent/eclagent.ipp @@ -250,6 +250,14 @@ public: { return ctx->queryAgentMachineCost(); }; + virtual IWUGraphStats *updateStats(StatisticCreatorType creatorType, const char * creator, unsigned activeWfid, const char *graphName, unsigned subgraph) override + { + return ctx->updateStats(creatorType, creator, activeWfid, graphName, subgraph); + }; + virtual void updateAggregates(IWorkUnit* lockedwu) override + { + ctx->updateAggregates(lockedwu); + } protected: IAgentContext * ctx; @@ -392,6 +400,7 @@ private: Owned outputSerializer; int retcode; double agentMachineCost = 0; + GlobalStatisticCollection globalStats; private: void doSetResultString(type_t type, const char * stepname, unsigned sequence, int len, const char *val); @@ -705,6 +714,15 @@ public: { return agentMachineCost; } + virtual IWUGraphStats *updateStats(StatisticCreatorType creatorType, const char * creator, unsigned activeWfid, const char *graphName, unsigned subgraph) override + { + Owned sgCollection = globalStats.getCollectionForUpdate(creatorType, creator, activeWfid, graphName, subgraph, true); // true=>clear existing stats + return wuRead->updateStats (graphName, creatorType, creator, activeWfid, subgraph, false, sgCollection); + } + virtual void updateAggregates(IWorkUnit* lockedwu) override + { + globalStats.updateAggregates(lockedwu); + } }; //--------------------------------------------------------------------------- @@ -1048,7 +1066,6 @@ public: graphAgentContext.set(&_agent); agent = &graphAgentContext; aborted = false; - globalStats.load(*wu, graphName, true); } void createFromXGMML(ILoadedDllEntry * dll, IPropertyTree * xgmml); @@ -1084,7 +1101,6 @@ protected: IProbeManager * probeManager; unsigned wfid; bool aborted; - GlobalStatisticCollection globalStats; }; diff --git a/ecl/eclagent/eclgraph.cpp b/ecl/eclagent/eclgraph.cpp index ced91432092..17eefa66907 100644 --- a/ecl/eclagent/eclgraph.cpp +++ b/ecl/eclagent/eclgraph.cpp @@ -1347,8 +1347,7 @@ void EclGraph::updateLibraryProgress() EclSubGraph & cur = graphs.item(idx); unsigned wfid = cur.parent.queryWfid(); - Owned sgCollection = globalStats.getCollectionForUpdate(queryStatisticsComponentType(), queryStatisticsComponentName(), wfid, queryGraphName(), cur.id, true); // true=>clear existing stats - Owned progress = wu->updateStats(queryGraphName(), queryStatisticsComponentType(), queryStatisticsComponentName(), wfid, cur.id, false, sgCollection.getClear()); + Owned progress = agent->updateStats(queryStatisticsComponentType(), queryStatisticsComponentName(), wfid, queryGraphName(), cur.id); cur.updateProgress(progress->queryStatsBuilder()); } } @@ -1491,13 +1490,12 @@ void GraphResults::setResult(unsigned id, IHThorGraphResult * result) IWUGraphStats *EclGraph::updateStats(StatisticCreatorType creatorType, const char * creator, unsigned activeWfid, unsigned subgraph) { - Owned sgCollection = globalStats.getCollectionForUpdate(creatorType, creator, activeWfid, queryGraphName(), subgraph, true); // true=>clear existing stats - return wu->updateStats (queryGraphName(), creatorType, creator, activeWfid, subgraph, false, sgCollection.getClear()); + return agent->updateStats(creatorType, creator, activeWfid, queryGraphName(), subgraph); } void EclGraph::updateAggregates(IWorkUnit* lockedwu) { - globalStats.updateAggregates(lockedwu); + agent->updateAggregates(lockedwu); } void EclGraph::updateWUStatistic(IWorkUnit *lockedwu, StatisticScopeType scopeType, const char * scope, StatisticKind kind, const char * descr, unsigned __int64 value) @@ -1549,6 +1547,7 @@ EclGraph * EclAgent::loadGraph(const char * graphName, IConstWorkUnit * wu, ILoa Owned eclGraph = new EclGraph(*this, graphName, wu, isLibrary, debugContext, probeManager, wuGraph->getWfid()); eclGraph->createFromXGMML(dll, xgmml); + globalStats.load(*wu, graphName, true); return eclGraph.getClear(); } diff --git a/system/jlib/jstats.cpp b/system/jlib/jstats.cpp index bfe07007664..e613b45d9f1 100644 --- a/system/jlib/jstats.cpp +++ b/system/jlib/jstats.cpp @@ -1330,7 +1330,7 @@ const StatisticsMapping diskLocalStatistics({StCycleDiskReadIOCycles, StSizeDisk const StatisticsMapping diskRemoteStatistics({StTimeDiskReadIO, StSizeDiskRead, StNumDiskReads, StTimeDiskWriteIO, StSizeDiskWrite, StNumDiskWrites, StNumDiskRetries}); const StatisticsMapping diskReadRemoteStatistics({StTimeDiskReadIO, StSizeDiskRead, StNumDiskReads, StNumDiskRetries, StCycleDiskReadIOCycles}); const StatisticsMapping diskWriteRemoteStatistics({StTimeDiskWriteIO, StSizeDiskWrite, StNumDiskWrites, StNumDiskRetries, StCycleDiskWriteIOCycles}); -const StatisticsMapping aggregateKindStatistics({StCostFileAccess, StSizeGraphSpill, StSizeSpillFile}); +const StatisticsMapping aggregateKindStatistics({StCostExecute, StCostFileAccess, StSizeGraphSpill, StSizeSpillFile}); const StatisticsMapping * queryStatsMapping(const StatsScopeId & scope, unsigned hashcode) { @@ -1872,8 +1872,7 @@ class CStatisticCollection : public CInterfaceOf { if (*scope=='\0') { - updateStatistic(kind, value, StatsMergeReplace); - return true; + return updateStatistic(kind, value, StatsMergeReplace); } else { @@ -1970,7 +1969,7 @@ class CStatisticCollection : public CInterfaceOf stats.append(s); return true; } - virtual IStatisticCollection * ensureSubScope(const StatsScopeId & search, bool hasChildren) + virtual IStatisticCollection * ensureSubScope(const StatsScopeId & search, bool hasChildren) override { bool wasCreated; return ensureSubScope(search, hasChildren, wasCreated); @@ -1988,7 +1987,8 @@ class CStatisticCollection : public CInterfaceOf wasCreated = true; return ret; } - virtual IStatisticCollection * querySubScope(const StatsScopeId & search) override + + virtual IStatisticCollection * querySubScope(const StatsScopeId & search) const override { return children.find(&search); } @@ -2008,7 +2008,7 @@ class CStatisticCollection : public CInterfaceOf { curScope = curScope->querySubScope(scopeItem); if (!curScope) - break; + return nullptr; } return curScope; } diff --git a/system/jlib/jstats.h b/system/jlib/jstats.h index edf369ffcaf..0855c1136a6 100644 --- a/system/jlib/jstats.h +++ b/system/jlib/jstats.h @@ -141,8 +141,8 @@ interface IStatisticCollection : public IInterface virtual void visitChildren(IStatisticVisitor & target) const = 0; virtual IStatisticCollection * ensureSubScope(const StatsScopeId & search, bool hasChildren) = 0; virtual IStatisticCollection * ensureSubScope(const StatsScopeId & search, bool hasChildren, bool & wasCreated) = 0; - virtual IStatisticCollection * querySubScope(const StatsScopeId & search) = 0; virtual IStatisticCollection * ensureSubScopePath(std::initializer_list path, bool & wasCreated) = 0; + virtual IStatisticCollection * querySubScope(const StatsScopeId & search) const = 0; virtual IStatisticCollection * querySubScopePath(std::initializer_list path) = 0; virtual void pruneChildStats() = 0; virtual void addStatistic(StatisticKind kind, unsigned __int64 value) = 0; diff --git a/thorlcr/master/thdemonserver.cpp b/thorlcr/master/thdemonserver.cpp index 0c2b12ddb4c..7c66ce7c0e9 100644 --- a/thorlcr/master/thdemonserver.cpp +++ b/thorlcr/master/thdemonserver.cpp @@ -144,7 +144,7 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer { CGraphBase &graph = activeGraphs.item(g); Owned sgCollection = globalStatsCollection.getCollectionForUpdate(SCTthor, queryStatisticsComponentName(), wfid, graphName, graph.queryGraphId(), true); // true=>clear existing stats - Owned stats = currentWU.updateStats(graphName, SCTthor, queryStatisticsComponentName(), wfid, graph.queryGraphId(), false, sgCollection.getClear()); + Owned stats = currentWU.updateStats(graphName, SCTthor, queryStatisticsComponentName(), wfid, graph.queryGraphId(), false, sgCollection); reportGraph(stats->queryStatsBuilder(), &graph); } Owned wu = ¤tWU.lock(); @@ -153,8 +153,6 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer CGraphBase &graph = activeGraphs.item(g2); unsigned startTime = graphStarts.item(g2); reportStatus(wu, graph, startTime, finished, success); - // Prune subgraph descendant stats as they have been serialized and no longer needed. - globalStatsCollection.pruneSubGraphDescendants(wfid, graphName, graph.queryGraphId()); } updateAggregates(wu); queryServerStatus().commitProperties(); @@ -176,11 +174,9 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer unsigned wfid = graph->queryJob().getWfid(); { Owned sgCollection = globalStatsCollection.getCollectionForUpdate(SCTthor, queryStatisticsComponentName(), wfid, graphName, graph->queryGraphId(), true); // true=>clear existing stats - Owned stats = currentWU.updateStats(graphName, SCTthor, queryStatisticsComponentName(), wfid, graph->queryGraphId(), false, sgCollection.getClear()); + Owned stats = currentWU.updateStats(graphName, SCTthor, queryStatisticsComponentName(), wfid, graph->queryGraphId(), false, sgCollection); reportGraph(stats->queryStatsBuilder(), graph); } - // Prune subgraph descendant stats as they have been serialized and no longer needed. - globalStatsCollection.pruneSubGraphDescendants(wfid, graphName, graph->queryGraphId()); Owned wu = ¤tWU.lock(); if (startTimeStamp) { @@ -293,6 +289,10 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer { unsigned startTime = graphStarts.item(g); reportGraph(graph, true, success, startTime, 0); + // Prune subgraph descendant stats as they have been serialized and no longer needed. + const char *graphName = ((CJobMaster &)graph->queryJob()).queryGraphName(); + unsigned wfid = graph->queryJob().getWfid(); + globalStatsCollection.pruneSubGraphDescendants(wfid, graphName, graph->queryGraphId()); activeGraphs.remove(g); graphStarts.remove(g); } @@ -301,6 +301,18 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer { synchronized block(mutex); reportActiveGraphs(true, false); + if (activeGraphs.ordinality()) + { + + CJobBase & activeJob = activeGraphs.item(0).queryJob(); + const char *graphName = ((CJobMaster &)activeJob).queryGraphName(); + unsigned wfid = activeJob.getWfid(); + ForEachItemIn (g, activeGraphs) + { + CGraphBase &graph = activeGraphs.item(g); + globalStatsCollection.pruneSubGraphDescendants(wfid, graphName, graph.queryGraphId()); + } + } activeGraphs.kill(); } virtual void updateAggregates(IWorkUnit * lockedWu) override