Skip to content

Commit

Permalink
HPCC-29657 Improve performance by updating aggregates intermittantly …
Browse files Browse the repository at this point in the history
…and at end of each graph

Signed-off-by: Shamser Ahmed <[email protected]>
  • Loading branch information
shamser committed Oct 4, 2023
1 parent 0c3726d commit 4930b80
Show file tree
Hide file tree
Showing 6 changed files with 17 additions and 9 deletions.
2 changes: 0 additions & 2 deletions ecl/eclagent/eclagent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1988,7 +1988,6 @@ void EclAgent::doProcess()
const cost_type cost = aggregateCost(w, nullptr, false);
if (cost)
w->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTglobal, "", StCostExecute, NULL, cost, 1, 0, StatsMergeReplace);
updateAggregates(w);
addTimings(w);

switch (w->getState())
Expand Down Expand Up @@ -2534,7 +2533,6 @@ void EclAgentWorkflowMachine::noteTiming(unsigned wfid, timestamp_type startTime
const cost_type cost = money2cost_type(calcCost(agent.queryAgentMachineCost(), nanoToMilli(elapsedNs))) + aggregateCost(wu, scope, true);
if (cost)
wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTworkflow, scope, StCostExecute, NULL, cost, 1, 0, StatsMergeReplace);
updateAggregates(wu);
}

void EclAgentWorkflowMachine::doExecutePersistItem(IRuntimeWorkflowItem & item)
Expand Down
3 changes: 2 additions & 1 deletion ecl/eclagent/eclagent.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -1055,7 +1055,7 @@ public:
void executeLibrary(const byte * parentExtract, IHThorGraphResults * results);
IWUGraphStats *updateStats(StatisticCreatorType creatorType, const char * creator, unsigned wfid, unsigned subgraph);
void updateWUStatistic(IWorkUnit* lockedwu, StatisticScopeType scopeType, const char* scope, StatisticKind kind, const char* descr, long long unsigned int value);

void updateAggregates(IWorkUnit* lockedwu);
EclSubGraph * idToGraph(unsigned id);
EclGraphElement * idToActivity(unsigned id);
const char *queryGraphName() { return graphName; }
Expand Down Expand Up @@ -1083,6 +1083,7 @@ protected:
IProbeManager * probeManager;
unsigned wfid;
bool aborted;
GlobalStatisticCollection statsCache;
};


Expand Down
11 changes: 7 additions & 4 deletions ecl/eclagent/eclgraph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -879,10 +879,10 @@ void EclSubGraph::updateProgress()
Owned<IWUGraphStats> progress = parent.updateStats(queryStatisticsComponentType(), queryStatisticsComponentName(), parent.queryWfid(), id);
IStatisticGatherer & stats = progress->queryStatsBuilder();
updateProgress(stats);

if (startGraphTime || elapsedGraphCycles)
{
WorkunitUpdate lockedwu(agent->updateWorkUnit());
updateAggregates(lockedwu);
StringBuffer subgraphid;
subgraphid.append(parent.queryGraphName()).append(":").append(SubGraphScopePrefix).append(id);
if (startGraphTime)
Expand Down Expand Up @@ -1278,8 +1278,6 @@ void EclGraph::execute(const byte * parentExtract)
const cost_type cost = money2cost_type(calcCost(agent->queryAgentMachineCost(), elapsed));
if (cost)
wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTgraph, scope, StCostExecute, NULL, cost, 1, 0, StatsMergeReplace);

updateAggregates(wu);
}

if (agent->queryRemoteWorkunit())
Expand Down Expand Up @@ -1491,7 +1489,12 @@ void GraphResults::setResult(unsigned id, IHThorGraphResult * result)

IWUGraphStats *EclGraph::updateStats(StatisticCreatorType creatorType, const char * creator, unsigned activeWfid, unsigned subgraph)
{
return wu->updateStats (queryGraphName(), creatorType, creator, activeWfid, subgraph, false);
return wu->updateStats (queryGraphName(), creatorType, creator, activeWfid, subgraph, false, &statsCache);
}

void EclGraph::updateAggregates(IWorkUnit* lockedwu)
{
::updateAggregates(lockedwu, &statsCache);
}

void EclGraph::updateWUStatistic(IWorkUnit *lockedwu, StatisticScopeType scopeType, const char * scope, StatisticKind kind, const char * descr, unsigned __int64 value)
Expand Down
7 changes: 5 additions & 2 deletions thorlcr/master/thdemonserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer
unsigned startTime = graphStarts.item(g2);
reportStatus(wu, graph, startTime, finished, success);
}
updateAggregates(wu, &statsCache);
::updateAggregates(wu, &statsCache);
queryServerStatus().commitProperties();
}
catch (IException *E)
Expand Down Expand Up @@ -189,7 +189,6 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer
wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTsubgraph, graphScope, StWhenStarted, NULL, getTimeStampNowValue(), 1, 0, StatsMergeAppend);
}
reportStatus(wu, *graph, startTime, finished, success);

queryServerStatus().commitProperties();
}
catch (IException *e)
Expand Down Expand Up @@ -303,6 +302,10 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer
reportActiveGraphs(true, false);
activeGraphs.kill();
}
virtual void updateAggregates(IWorkUnit * lockedWu) override
{
::updateAggregates(lockedWu, &statsCache);
}
};


Expand Down
1 change: 1 addition & 0 deletions thorlcr/master/thdemonserver.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ interface IDeMonServer : extends IInterface
virtual void startGraph(CGraphBase *graph) = 0;
virtual void endGraph(CGraphBase *graph, bool success) = 0;
virtual void endGraphs() = 0;
virtual void updateAggregates(IWorkUnit * lockedWu) = 0;
};


Expand Down
2 changes: 2 additions & 0 deletions thorlcr/master/thgraphmanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1101,6 +1101,8 @@ bool CJobManager::executeGraph(IConstWorkUnit &workunit, const char *graphName,
cost_type cost = money2cost_type(calculateThorCost(nanoToMilli(graphTimeNs), numberOfMachines));
if (cost)
wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTgraph, graphScope, StCostExecute, NULL, cost, 1, 0, StatsMergeReplace);
if (globals->getPropBool("@watchdogProgressEnabled"))
queryDeMonServer()->updateAggregates(wu);
removeJob(*job);
}
catch (IException *e)
Expand Down

0 comments on commit 4930b80

Please sign in to comment.