Skip to content

Commit

Permalink
HPCC-29657 Changes following review
Browse files Browse the repository at this point in the history
Signed-off-by: Shamser Ahmed <[email protected]>
  • Loading branch information
shamser committed Oct 24, 2023
1 parent 29f0602 commit 49fde9e
Show file tree
Hide file tree
Showing 11 changed files with 92 additions and 65 deletions.
59 changes: 33 additions & 26 deletions common/workunit/workunit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,18 +183,15 @@ void doDescheduleWorkkunit(char const * wuid)
* Graph progress support
*/

CWuGraphStats::CWuGraphStats(StatisticCreatorType _creatorType, const char * _creator, unsigned wfid, const char * _rootScope, unsigned _id, bool _merge, GlobalStatisticCollection * globalStatsCollection)
CWuGraphStats::CWuGraphStats(StatisticCreatorType _creatorType, const char * _creator, unsigned wfid, const char * _rootScope, unsigned _id, bool _merge, IStatisticCollection * statsCollection)
: creatorType(_creatorType), creator(_creator), id(_id), merge(_merge)
{
StatsScopeId graphScopeId;
verifyex(graphScopeId.setScopeText(_rootScope));
StatsScopeId wfScopeId(SSTworkflow,wfid);

if (globalStatsCollection)
{
StatsScopeId sgScopeId(SSTsubgraph, id);
collector.setown(createStatisticsGatherer(globalStatsCollection->getCollectionForUpdate(wfScopeId, graphScopeId, sgScopeId, _creatorType, _creator, true)));
}
if (statsCollection)
collector.setown(createStatisticsGatherer(statsCollection));
else
collector.setown(createStatisticsGatherer(_creatorType, _creator, wfScopeId));
collector->beginScope(graphScopeId);
Expand Down Expand Up @@ -2672,7 +2669,7 @@ GlobalStatisticCollection::GlobalStatisticCollection() : aggregateKindsMapping(a
statsCollection.setown(createStatisticCollection(nullptr, globalScopeId));
}

void GlobalStatisticCollection::load(IConstWorkUnit &workunit, const char * graphName, bool aggregatesOnly, bool missingScopesOnly)
void GlobalStatisticCollection::load(IConstWorkUnit &workunit, const char * graphName, bool aggregatesOnly)
{
const char * _wuid = workunit.queryWuid();
if (!streq(_wuid, wuid.str())) // New statsCollection if collection for different workunit
Expand Down Expand Up @@ -2702,14 +2699,6 @@ void GlobalStatisticCollection::load(IConstWorkUnit &workunit, const char * grap
if (strcmp(sgName, "node")==0)
continue;
verifyex(sgScopeId.setScopeText(sgName));

if (missingScopesOnly) // Skip scopes that already in collection
{
IStatisticCollection * sgCollection = statsCollection->querySubScopePath({wfScopeId, graphScopeId, sgScopeId});
if (sgCollection)
continue;
}

MemoryBuffer compressed;
sgPT->getPropBin("Stats", compressed);
if (!compressed.length())
Expand All @@ -2728,7 +2717,7 @@ void GlobalStatisticCollection::load(IConstWorkUnit &workunit, const char * grap
if (aggregatesOnly)
{
// Only store stats for subgraph level
statsMinDepth = 3;
statsMinDepth = 3; // this is subgraph level
statsMaxDepth = 3;
}
statsCollection->deserializeChild(childId, serialized, version, statsMinDepth, statsMaxDepth);
Expand All @@ -2755,11 +2744,11 @@ void GlobalStatisticCollection::loadGlobalAggregates(IConstWorkUnit &workunit)
};

WuScopeFilter filter;
filter.addScopeType(SSTglobal).addScopeType(SSTgraph).addScopeType(SSTsubgraph);
filter.addScopeType(SSTglobal).addScopeType(SSTworkflow).addScopeType(SSTgraph);
const unsigned numStats = aggregateKindsMapping.numStatistics();
for (unsigned i=0; i<numStats; ++i)
filter.addOutputStatistic(aggregateKindsMapping.getKind(i));
filter.setDepth(1,3);
filter.setDepth(1,3); // 1=global, 2=workflow, 3=graph
filter.setSources(SSFsearchGlobalStats);
filter.setIncludeNesting(0);
filter.finishedFilter();
Expand All @@ -2772,11 +2761,17 @@ void GlobalStatisticCollection::loadGlobalAggregates(IConstWorkUnit &workunit)

// getCollectionForUpdate() returns IStatisticCollection for the given subgraph
// if clearStats==true then the existing stats are cleared for the given scope and descendants
IStatisticCollection * GlobalStatisticCollection::getCollectionForUpdate(const StatsScopeId & wfScopeId, const StatsScopeId & graphScopeId, const StatsScopeId & sgScopeId, StatisticCreatorType creatorType, const char * creator, bool clearStats)
IStatisticCollection * GlobalStatisticCollection::getCollectionForUpdate(StatisticCreatorType creatorType, const char * creator, unsigned wfid, const char *graphName, unsigned sgId, bool clearStats)
{
bool wasCreated;
IStatisticCollection * sgScopeCollection = statsCollection->ensureSubScopePath({wfScopeId,graphScopeId, sgScopeId}, &wasCreated);
if (clearStats && wasCreated)

StatsScopeId graphScopeId;
verifyex(graphScopeId.setScopeText(graphName));
StatsScopeId wfScopeId(SSTworkflow, wfid);
StatsScopeId sgScopeId(SSTsubgraph, sgId);

IStatisticCollection * sgScopeCollection = statsCollection->ensureSubScopePath({wfScopeId,graphScopeId, sgScopeId}, wasCreated);
if (clearStats && !wasCreated)
sgScopeCollection->clearStats();
sgScopeCollection->markDirty();
return createRootStatisticCollection(creatorType, creator, wfScopeId, graphScopeId, sgScopeCollection);
Expand Down Expand Up @@ -2834,6 +2829,18 @@ void GlobalStatisticCollection::updateAggregates(IWorkUnit *wu)
}
}

// Prune all subgraph descendent stats (leaving subgraph stats for future aggregation)
void GlobalStatisticCollection::pruneSubGraphDescendants(unsigned wfid, const char *graphName, unsigned sgId)
{
StatsScopeId graphScopeId;
verifyex(graphScopeId.setScopeText(graphName));
StatsScopeId wfScopeId(SSTworkflow, wfid);
StatsScopeId sgScopeId(SSTsubgraph, sgId);
IStatisticCollection * sgScopeCollection = statsCollection->querySubScopePath({wfScopeId,graphScopeId, sgScopeId});
if (sgScopeCollection)
sgScopeCollection->pruneChildStats();
}

//---------------------------------------------------------------------------------------------------------------------


Expand Down Expand Up @@ -3903,7 +3910,7 @@ class CDaliWorkUnit;
class CDaliWuGraphStats : public CWuGraphStats
{
public:
CDaliWuGraphStats(const CDaliWorkUnit* _owner, StatisticCreatorType _creatorType, const char * _creator, unsigned _wfid, const char * _rootScope, unsigned _id, bool _merge, GlobalStatisticCollection * stats)
CDaliWuGraphStats(const CDaliWorkUnit* _owner, StatisticCreatorType _creatorType, const char * _creator, unsigned _wfid, const char * _rootScope, unsigned _id, bool _merge, IStatisticCollection * stats)
: CWuGraphStats(_creatorType, _creator, _wfid, _rootScope, _id, _merge, stats), owner(_owner), graphName(_rootScope), wfid(_wfid)
{
}
Expand All @@ -3918,7 +3925,7 @@ class CDaliWuGraphStats : public CWuGraphStats
class CLocalWuGraphStats : public CWuGraphStats
{
public:
CLocalWuGraphStats(IPropertyTree *_p, StatisticCreatorType _creatorType, const char * _creator, unsigned _wfid, const char * _rootScope, unsigned _id, bool _merge, GlobalStatisticCollection * stats)
CLocalWuGraphStats(IPropertyTree *_p, StatisticCreatorType _creatorType, const char * _creator, unsigned _wfid, const char * _rootScope, unsigned _id, bool _merge, IStatisticCollection * stats)
: CWuGraphStats(_creatorType, _creator, _wfid, _rootScope, _id, _merge, stats), graphName(_rootScope), p(_p)
{
}
Expand Down Expand Up @@ -4223,7 +4230,7 @@ class CDaliWorkUnit : public CPersistedWorkUnit
}
}
}
virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge, GlobalStatisticCollection * stats) const override
virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge, IStatisticCollection * stats) const override
{
return new CDaliWuGraphStats(this, creatorType, creator, _wfid, graphName, subgraph, merge, stats);
}
Expand Down Expand Up @@ -4535,7 +4542,7 @@ class CLockedWorkUnit : implements ILocalWorkUnit, implements IExtendedWUInterfa
{ c->setGraphState(graphName, wfid, state); }
virtual void setNodeState(const char *graphName, WUGraphIDType nodeId, WUGraphState state) const
{ c->setNodeState(graphName, nodeId, state); }
virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge, GlobalStatisticCollection * stats=nullptr) const override
virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge, IStatisticCollection * stats=nullptr) const override
{ return c->updateStats(graphName, creatorType, creator, _wfid, subgraph, merge, stats); }
virtual void clearGraphProgress() const
{ c->clearGraphProgress(); }
Expand Down Expand Up @@ -10366,7 +10373,7 @@ void CLocalWorkUnit::setNodeState(const char *graphName, WUGraphIDType nodeId, W
{
throwUnexpected(); // Should only be used for persisted workunits
}
IWUGraphStats *CLocalWorkUnit::updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge, GlobalStatisticCollection * stats) const
IWUGraphStats *CLocalWorkUnit::updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge, IStatisticCollection * stats) const
{
return new CLocalWuGraphStats(LINK(p), creatorType, creator, _wfid, graphName, subgraph, merge, stats);
}
Expand Down
8 changes: 4 additions & 4 deletions common/workunit/workunit.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1203,7 +1203,6 @@ interface IConstWorkUnitInfo : extends IInterface
virtual IConstWUAppValueIterator & getApplicationValues() const = 0;
};

class GlobalStatisticCollection;
interface IConstWorkUnit : extends IConstWorkUnitInfo
{
virtual bool aborting() const = 0;
Expand Down Expand Up @@ -1302,7 +1301,7 @@ interface IConstWorkUnit : extends IConstWorkUnitInfo
virtual WUGraphState queryNodeState(const char *graphName, WUGraphIDType nodeId) const = 0;
virtual void setGraphState(const char *graphName, unsigned wfid, WUGraphState state) const = 0;
virtual void setNodeState(const char *graphName, WUGraphIDType nodeId, WUGraphState state) const = 0;
virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge, GlobalStatisticCollection * stats=nullptr) const = 0;
virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge, IStatisticCollection * stats=nullptr) const = 0;
virtual void clearGraphProgress() const = 0;
virtual IStringVal & getAbortBy(IStringVal & str) const = 0;
virtual unsigned __int64 getAbortTimeStamp() const = 0;
Expand Down Expand Up @@ -1789,12 +1788,13 @@ class WORKUNIT_API GlobalStatisticCollection : public CInterface
public:
GlobalStatisticCollection();

void load(IConstWorkUnit &workunit, const char * graphName, bool aggregatesOnly, bool missingScopesOnly);
void load(IConstWorkUnit &workunit, const char * graphName, bool aggregatesOnly);
void loadGlobalAggregates(IConstWorkUnit &workunit);
IStatisticCollection * getCollectionForUpdate(const StatsScopeId & wfScopeId, const StatsScopeId & graphScopeId, const StatsScopeId & sgScopeId, StatisticCreatorType creatorType, const char * creator, bool clearStats);
IStatisticCollection * getCollectionForUpdate(StatisticCreatorType creatorType, const char * creator, unsigned wfid, const char *graphName, unsigned sgId, bool clearStats);
bool refreshAggregates();
IStatisticCollection * queryCollection() { return statsCollection; }
void updateAggregates(IWorkUnit *wu);
void pruneSubGraphDescendants(unsigned wfid, const char *graphName, unsigned sgId);
private:
Owned<IStatisticCollection> statsCollection;
const StatisticsMapping & aggregateKindsMapping;
Expand Down
4 changes: 2 additions & 2 deletions common/workunit/workunit.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ public:
virtual void setGraphState(const char *graphName, unsigned wfid, WUGraphState state) const;
virtual void setNodeState(const char *graphName, WUGraphIDType nodeId, WUGraphState state) const;
virtual WUGraphState queryNodeState(const char *graphName, WUGraphIDType nodeId) const;
virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge, GlobalStatisticCollection * stats=nullptr) const override;
virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge, IStatisticCollection * stats=nullptr) const override;
void clearGraphProgress() const;
virtual void import(IPropertyTree *wuTree, IPropertyTree *graphProgressTree) {}; //No GraphProgressTree in CLocalWorkUnit.

Expand Down Expand Up @@ -661,7 +661,7 @@ public:
class WORKUNIT_API CWuGraphStats : public CInterfaceOf<IWUGraphStats>
{
public:
CWuGraphStats(StatisticCreatorType _creatorType, const char * _creator, unsigned wfid, const char * _rootScope, unsigned _id, bool _merge, GlobalStatisticCollection * stats);
CWuGraphStats(StatisticCreatorType _creatorType, const char * _creator, unsigned wfid, const char * _rootScope, unsigned _id, bool _merge, IStatisticCollection * stats);
virtual void beforeDispose();
virtual IStatisticGatherer & queryStatsBuilder();
protected:
Expand Down
2 changes: 1 addition & 1 deletion ecl/eclagent/eclagent.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -1048,7 +1048,7 @@ public:
graphAgentContext.set(&_agent);
agent = &graphAgentContext;
aborted = false;
globalStats.load(*wu, graphName, true, true);
globalStats.load(*wu, graphName, true);
}

void createFromXGMML(ILoadedDllEntry * dll, IPropertyTree * xgmml);
Expand Down
7 changes: 5 additions & 2 deletions ecl/eclagent/eclgraph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1346,7 +1346,9 @@ void EclGraph::updateLibraryProgress()
{
EclSubGraph & cur = graphs.item(idx);
unsigned wfid = cur.parent.queryWfid();
Owned<IWUGraphStats> progress = wu->updateStats(queryGraphName(), queryStatisticsComponentType(), queryStatisticsComponentName(), wfid, cur.id, false, &globalStats);

Owned<IStatisticCollection> sgCollection = globalStats.getCollectionForUpdate(queryStatisticsComponentType(), queryStatisticsComponentName(), wfid, queryGraphName(), cur.id, true); // true=>clear existing stats
Owned<IWUGraphStats> progress = wu->updateStats(queryGraphName(), queryStatisticsComponentType(), queryStatisticsComponentName(), wfid, cur.id, false, sgCollection.getClear());
cur.updateProgress(progress->queryStatsBuilder());
}
}
Expand Down Expand Up @@ -1489,7 +1491,8 @@ void GraphResults::setResult(unsigned id, IHThorGraphResult * result)

IWUGraphStats *EclGraph::updateStats(StatisticCreatorType creatorType, const char * creator, unsigned activeWfid, unsigned subgraph)
{
return wu->updateStats (queryGraphName(), creatorType, creator, activeWfid, subgraph, false, &globalStats);
Owned<IStatisticCollection> sgCollection = globalStats.getCollectionForUpdate(creatorType, creator, activeWfid, queryGraphName(), subgraph, true); // true=>clear existing stats
return wu->updateStats (queryGraphName(), creatorType, creator, activeWfid, subgraph, false, sgCollection.getClear());
}

void EclGraph::updateAggregates(IWorkUnit* lockedwu)
Expand Down
4 changes: 2 additions & 2 deletions plugins/cassandra/cassandrawu.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2743,7 +2743,7 @@ class CCassandraWorkUnit : public CPersistedWorkUnit
class CCassandraWuGraphStats : public CWuGraphStats
{
public:
CCassandraWuGraphStats(const CCassandraWorkUnit *_parent, StatisticCreatorType _creatorType, const char * _creator, unsigned _wfid, const char * _rootScope, unsigned _id, bool _merge, GlobalStatisticCollection * stats)
CCassandraWuGraphStats(const CCassandraWorkUnit *_parent, StatisticCreatorType _creatorType, const char * _creator, unsigned _wfid, const char * _rootScope, unsigned _id, bool _merge, IStatisticCollection * stats)
: CWuGraphStats(_creatorType, _creator, _wfid, _rootScope, _id, _merge, stats),
progress(createPTree(_rootScope)), parent(_parent)
{
Expand All @@ -2764,7 +2764,7 @@ class CCassandraWorkUnit : public CPersistedWorkUnit
StringAttr wuid;
};

IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned wfid, unsigned subgraph, bool merge, GlobalStatisticCollection * stats) const override
IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned wfid, unsigned subgraph, bool merge, IStatisticCollection * stats) const override
{
return new CCassandraWuGraphStats(this, creatorType, creator, wfid, graphName, subgraph, merge, stats);
}
Expand Down
Loading

0 comments on commit 49fde9e

Please sign in to comment.