Skip to content

Commit

Permalink
HPCC-29657 Create global stats in AgentContext so it's shared with al…
Browse files Browse the repository at this point in the history
…l graphs and other minor changes

Signed-off-by: Shamser Ahmed <[email protected]>
  • Loading branch information
shamser committed Oct 30, 2023
1 parent e468996 commit 0a66cb0
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 23 deletions.
2 changes: 2 additions & 0 deletions ecl/eclagent/agentctx.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ struct IAgentContext : extends IGlobalCodeContext
virtual bool forceNewDiskReadActivity() const = 0;
virtual void addWuExceptionEx(const char * text, unsigned code, unsigned severity, unsigned audience, char const * source) = 0;
virtual double queryAgentMachineCost() const = 0;
virtual IWUGraphStats *updateStats(StatisticCreatorType creatorType, const char * creator, unsigned activeWfid, const char *graphName, unsigned subgraph) = 0;
virtual void updateAggregates(IWorkUnit* lockedwu) = 0;
};

#endif // AGENTCTX_HPP_INCL
3 changes: 0 additions & 3 deletions ecl/eclagent/eclagent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1985,9 +1985,6 @@ void EclAgent::doProcess()
const __int64 elapsedNs = elapsedTimer.elapsedNs();
updateWorkunitStat(w, SSTglobal, NULL, StTimeElapsed, nullptr, elapsedNs);

const cost_type cost = aggregateCost(w, nullptr, false);
if (cost)
w->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTglobal, "", StCostExecute, NULL, cost, 1, 0, StatsMergeReplace);
addTimings(w);

switch (w->getState())
Expand Down
20 changes: 18 additions & 2 deletions ecl/eclagent/eclagent.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,14 @@ public:
{
return ctx->queryAgentMachineCost();
};
virtual IWUGraphStats *updateStats(StatisticCreatorType creatorType, const char * creator, unsigned activeWfid, const char *graphName, unsigned subgraph) override
{
return ctx->updateStats(creatorType, creator, activeWfid, graphName, subgraph);
};
virtual void updateAggregates(IWorkUnit* lockedwu) override
{
ctx->updateAggregates(lockedwu);
}

protected:
IAgentContext * ctx;
Expand Down Expand Up @@ -392,6 +400,7 @@ private:
Owned<IOrderedOutputSerializer> outputSerializer;
int retcode;
double agentMachineCost = 0;
GlobalStatisticCollection globalStats;

private:
void doSetResultString(type_t type, const char * stepname, unsigned sequence, int len, const char *val);
Expand Down Expand Up @@ -705,6 +714,15 @@ public:
{
return agentMachineCost;
}
virtual IWUGraphStats *updateStats(StatisticCreatorType creatorType, const char * creator, unsigned activeWfid, const char *graphName, unsigned subgraph) override
{
Owned<IStatisticCollection> sgCollection = globalStats.getCollectionForUpdate(creatorType, creator, activeWfid, graphName, subgraph, true); // true=>clear existing stats
return wuRead->updateStats (graphName, creatorType, creator, activeWfid, subgraph, false, sgCollection);
}
virtual void updateAggregates(IWorkUnit* lockedwu) override
{
globalStats.updateAggregates(lockedwu);
}
};

//---------------------------------------------------------------------------
Expand Down Expand Up @@ -1048,7 +1066,6 @@ public:
graphAgentContext.set(&_agent);
agent = &graphAgentContext;
aborted = false;
globalStats.load(*wu, graphName, true);
}

void createFromXGMML(ILoadedDllEntry * dll, IPropertyTree * xgmml);
Expand Down Expand Up @@ -1084,7 +1101,6 @@ protected:
IProbeManager * probeManager;
unsigned wfid;
bool aborted;
GlobalStatisticCollection globalStats;
};


Expand Down
9 changes: 4 additions & 5 deletions ecl/eclagent/eclgraph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1347,8 +1347,7 @@ void EclGraph::updateLibraryProgress()
EclSubGraph & cur = graphs.item(idx);
unsigned wfid = cur.parent.queryWfid();

Owned<IStatisticCollection> sgCollection = globalStats.getCollectionForUpdate(queryStatisticsComponentType(), queryStatisticsComponentName(), wfid, queryGraphName(), cur.id, true); // true=>clear existing stats
Owned<IWUGraphStats> progress = wu->updateStats(queryGraphName(), queryStatisticsComponentType(), queryStatisticsComponentName(), wfid, cur.id, false, sgCollection.getClear());
Owned<IWUGraphStats> progress = agent->updateStats(queryStatisticsComponentType(), queryStatisticsComponentName(), wfid, queryGraphName(), cur.id);
cur.updateProgress(progress->queryStatsBuilder());
}
}
Expand Down Expand Up @@ -1491,13 +1490,12 @@ void GraphResults::setResult(unsigned id, IHThorGraphResult * result)

IWUGraphStats *EclGraph::updateStats(StatisticCreatorType creatorType, const char * creator, unsigned activeWfid, unsigned subgraph)
{
Owned<IStatisticCollection> sgCollection = globalStats.getCollectionForUpdate(creatorType, creator, activeWfid, queryGraphName(), subgraph, true); // true=>clear existing stats
return wu->updateStats (queryGraphName(), creatorType, creator, activeWfid, subgraph, false, sgCollection.getClear());
return agent->updateStats(creatorType, creator, activeWfid, queryGraphName(), subgraph);
}

void EclGraph::updateAggregates(IWorkUnit* lockedwu)
{
globalStats.updateAggregates(lockedwu);
agent->updateAggregates(lockedwu);
}

void EclGraph::updateWUStatistic(IWorkUnit *lockedwu, StatisticScopeType scopeType, const char * scope, StatisticKind kind, const char * descr, unsigned __int64 value)
Expand Down Expand Up @@ -1549,6 +1547,7 @@ EclGraph * EclAgent::loadGraph(const char * graphName, IConstWorkUnit * wu, ILoa

Owned<EclGraph> eclGraph = new EclGraph(*this, graphName, wu, isLibrary, debugContext, probeManager, wuGraph->getWfid());
eclGraph->createFromXGMML(dll, xgmml);
globalStats.load(*wu, graphName, true);
return eclGraph.getClear();
}

Expand Down
12 changes: 6 additions & 6 deletions system/jlib/jstats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1330,7 +1330,7 @@ const StatisticsMapping diskLocalStatistics({StCycleDiskReadIOCycles, StSizeDisk
const StatisticsMapping diskRemoteStatistics({StTimeDiskReadIO, StSizeDiskRead, StNumDiskReads, StTimeDiskWriteIO, StSizeDiskWrite, StNumDiskWrites, StNumDiskRetries});
const StatisticsMapping diskReadRemoteStatistics({StTimeDiskReadIO, StSizeDiskRead, StNumDiskReads, StNumDiskRetries, StCycleDiskReadIOCycles});
const StatisticsMapping diskWriteRemoteStatistics({StTimeDiskWriteIO, StSizeDiskWrite, StNumDiskWrites, StNumDiskRetries, StCycleDiskWriteIOCycles});
const StatisticsMapping aggregateKindStatistics({StCostFileAccess, StSizeGraphSpill, StSizeSpillFile});
const StatisticsMapping aggregateKindStatistics({StCostExecute, StCostFileAccess, StSizeGraphSpill, StSizeSpillFile});

const StatisticsMapping * queryStatsMapping(const StatsScopeId & scope, unsigned hashcode)
{
Expand Down Expand Up @@ -1872,8 +1872,7 @@ class CStatisticCollection : public CInterfaceOf<IStatisticCollection>
{
if (*scope=='\0')
{
updateStatistic(kind, value, StatsMergeReplace);
return true;
return updateStatistic(kind, value, StatsMergeReplace);
}
else
{
Expand Down Expand Up @@ -1970,7 +1969,7 @@ class CStatisticCollection : public CInterfaceOf<IStatisticCollection>
stats.append(s);
return true;
}
virtual IStatisticCollection * ensureSubScope(const StatsScopeId & search, bool hasChildren)
virtual IStatisticCollection * ensureSubScope(const StatsScopeId & search, bool hasChildren) override
{
bool wasCreated;
return ensureSubScope(search, hasChildren, wasCreated);
Expand All @@ -1988,7 +1987,8 @@ class CStatisticCollection : public CInterfaceOf<IStatisticCollection>
wasCreated = true;
return ret;
}
virtual IStatisticCollection * querySubScope(const StatsScopeId & search) override

virtual IStatisticCollection * querySubScope(const StatsScopeId & search) const override
{
return children.find(&search);
}
Expand All @@ -2008,7 +2008,7 @@ class CStatisticCollection : public CInterfaceOf<IStatisticCollection>
{
curScope = curScope->querySubScope(scopeItem);
if (!curScope)
break;
return nullptr;
}
return curScope;
}
Expand Down
2 changes: 1 addition & 1 deletion system/jlib/jstats.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,8 @@ interface IStatisticCollection : public IInterface
virtual void visitChildren(IStatisticVisitor & target) const = 0;
virtual IStatisticCollection * ensureSubScope(const StatsScopeId & search, bool hasChildren) = 0;
virtual IStatisticCollection * ensureSubScope(const StatsScopeId & search, bool hasChildren, bool & wasCreated) = 0;
virtual IStatisticCollection * querySubScope(const StatsScopeId & search) = 0;
virtual IStatisticCollection * ensureSubScopePath(std::initializer_list<const StatsScopeId> path, bool & wasCreated) = 0;
virtual IStatisticCollection * querySubScope(const StatsScopeId & search) const = 0;
virtual IStatisticCollection * querySubScopePath(std::initializer_list<const StatsScopeId> path) = 0;
virtual void pruneChildStats() = 0;
virtual void addStatistic(StatisticKind kind, unsigned __int64 value) = 0;
Expand Down
24 changes: 18 additions & 6 deletions thorlcr/master/thdemonserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer
{
CGraphBase &graph = activeGraphs.item(g);
Owned<IStatisticCollection> sgCollection = globalStatsCollection.getCollectionForUpdate(SCTthor, queryStatisticsComponentName(), wfid, graphName, graph.queryGraphId(), true); // true=>clear existing stats
Owned<IWUGraphStats> stats = currentWU.updateStats(graphName, SCTthor, queryStatisticsComponentName(), wfid, graph.queryGraphId(), false, sgCollection.getClear());
Owned<IWUGraphStats> stats = currentWU.updateStats(graphName, SCTthor, queryStatisticsComponentName(), wfid, graph.queryGraphId(), false, sgCollection);
reportGraph(stats->queryStatsBuilder(), &graph);
}
Owned<IWorkUnit> wu = &currentWU.lock();
Expand All @@ -153,8 +153,6 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer
CGraphBase &graph = activeGraphs.item(g2);
unsigned startTime = graphStarts.item(g2);
reportStatus(wu, graph, startTime, finished, success);
// Prune subgraph descendant stats as they have been serialized and no longer needed.
globalStatsCollection.pruneSubGraphDescendants(wfid, graphName, graph.queryGraphId());
}
updateAggregates(wu);
queryServerStatus().commitProperties();
Expand All @@ -176,11 +174,9 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer
unsigned wfid = graph->queryJob().getWfid();
{
Owned<IStatisticCollection> sgCollection = globalStatsCollection.getCollectionForUpdate(SCTthor, queryStatisticsComponentName(), wfid, graphName, graph->queryGraphId(), true); // true=>clear existing stats
Owned<IWUGraphStats> stats = currentWU.updateStats(graphName, SCTthor, queryStatisticsComponentName(), wfid, graph->queryGraphId(), false, sgCollection.getClear());
Owned<IWUGraphStats> stats = currentWU.updateStats(graphName, SCTthor, queryStatisticsComponentName(), wfid, graph->queryGraphId(), false, sgCollection);
reportGraph(stats->queryStatsBuilder(), graph);
}
// Prune subgraph descendant stats as they have been serialized and no longer needed.
globalStatsCollection.pruneSubGraphDescendants(wfid, graphName, graph->queryGraphId());
Owned<IWorkUnit> wu = &currentWU.lock();
if (startTimeStamp)
{
Expand Down Expand Up @@ -293,6 +289,10 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer
{
unsigned startTime = graphStarts.item(g);
reportGraph(graph, true, success, startTime, 0);
// Prune subgraph descendant stats as they have been serialized and no longer needed.
const char *graphName = ((CJobMaster &)graph->queryJob()).queryGraphName();
unsigned wfid = graph->queryJob().getWfid();
globalStatsCollection.pruneSubGraphDescendants(wfid, graphName, graph->queryGraphId());
activeGraphs.remove(g);
graphStarts.remove(g);
}
Expand All @@ -301,6 +301,18 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer
{
synchronized block(mutex);
reportActiveGraphs(true, false);
if (activeGraphs.ordinality())
{

CJobBase & activeJob = activeGraphs.item(0).queryJob();
const char *graphName = ((CJobMaster &)activeJob).queryGraphName();
unsigned wfid = activeJob.getWfid();
ForEachItemIn (g, activeGraphs)
{
CGraphBase &graph = activeGraphs.item(g);
globalStatsCollection.pruneSubGraphDescendants(wfid, graphName, graph.queryGraphId());
}
}
activeGraphs.kill();
}
virtual void updateAggregates(IWorkUnit * lockedWu) override
Expand Down

0 comments on commit 0a66cb0

Please sign in to comment.