-
Notifications
You must be signed in to change notification settings - Fork 304
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HPCC-29657 Produce aggregate stats (e.g. spill, cost) whilst a job is running #17786
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -183,14 +183,17 @@ void doDescheduleWorkkunit(char const * wuid) | |
* Graph progress support | ||
*/ | ||
|
||
CWuGraphStats::CWuGraphStats(StatisticCreatorType _creatorType, const char * _creator, unsigned wfid, const char * _rootScope, unsigned _id, bool _merge) | ||
CWuGraphStats::CWuGraphStats(StatisticCreatorType _creatorType, const char * _creator, unsigned wfid, const char * _rootScope, unsigned _id, bool _merge, IStatisticCollection * statsCollection) | ||
: creatorType(_creatorType), creator(_creator), id(_id), merge(_merge) | ||
{ | ||
StatsScopeId graphScopeId; | ||
verifyex(graphScopeId.setScopeText(_rootScope)); | ||
StatsScopeId wfScopeId(SSTworkflow,wfid); | ||
|
||
StatsScopeId rootScopeId(SSTworkflow,wfid); | ||
collector.setown(createStatisticsGatherer(_creatorType, _creator, rootScopeId)); | ||
if (statsCollection) | ||
collector.setown(createStatisticsGatherer(statsCollection)); | ||
else | ||
collector.setown(createStatisticsGatherer(_creatorType, _creator, wfScopeId)); | ||
collector->beginScope(graphScopeId); | ||
} | ||
|
||
|
@@ -2657,85 +2660,104 @@ cost_type aggregateCost(const IConstWorkUnit * wu, const char *scope, bool exclu | |
} | ||
return totalCost; | ||
} | ||
} | ||
}; | ||
|
||
//aggregate disk costs from top-level subgraphs (when scope specified) or workflows (scope not specified) | ||
cost_type aggregateDiskAccessCost(const IConstWorkUnit * wu, const char *scope) | ||
GlobalStatisticCollection::GlobalStatisticCollection() : aggregateKindsMapping(aggregateKindStatistics) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This class name needs a bit of thought. I think it is really a GlobalStatisticAggregator, which happens to contain a collection. The name is important because it helps developers know what the purpose of the class is. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was thinking that we'd have a single location for all the stats being gathered and aggregated. I was trying to avoid having one class to gather and then have another for aggregation - and then having to copy stats between the two. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think one of the other reasons it was more generalized (not just a aggregator), was that there was a separate future intent for it to manage the gathered stat. collections - and publish them en masse periodically (rather than end of sg as it is now). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok. That could be achieved more simply by saving away a list of blobs to commit, and writing them all in one transaction. I'm not sure if this is worth changing now - I will think about it, but it does complicate the implementation by combining two different concepts and needing to worry about pruning results etc. It makes it harder to understand the responsibilities and implementation of the class etc. |
||
{ | ||
WuScopeFilter filter; | ||
if (!isEmptyString(scope)) | ||
filter.addScope(scope); | ||
else | ||
filter.addScope(""); // Needed to match scope | ||
// when scope is a workflow, sum graph costs (or subgraph cost when no graph cost) to get workflow cost | ||
// (Costs from child graphs and activities should have been summed up to graph/subgraph level already) | ||
// when isEmptyString(scope), sum workflow costs (or graph cost when no workflow cost) to get global cost | ||
// (Costs from all levels below graph should be summed upto at least graph level already) | ||
// i.e. need 2 levels of nesting | ||
filter.setIncludeNesting(2); | ||
// includeNesting(2) needs just source "global". However, WuScopeFilter is incorrectly inferring the source as "global,stats", | ||
// causing too many of the stats to be pulled in and inefficiency. Here, explicitly set source to "global" | ||
filter.addSource("global"); | ||
filter.addOutputStatistic(StCostFileAccess); | ||
filter.addRequiredStat(StCostFileAccess); | ||
filter.finishedFilter(); | ||
Owned<IConstWUScopeIterator> it = &wu->getScopeIterator(filter); | ||
cost_type totalCost = 0; | ||
for (it->first(); it->isValid(); ) | ||
{ | ||
cost_type value = 0; | ||
if (it->getStat(StCostFileAccess, value)) | ||
{ | ||
totalCost += value; | ||
it->nextSibling(); | ||
} | ||
else | ||
{ | ||
it->next(); | ||
} | ||
} | ||
return totalCost; | ||
// Construct statsCollection here as GlobalStatisticCollection::load() is optional | ||
StatsScopeId globalScopeId(SSTglobal, (unsigned)0); | ||
statsCollection.setown(createStatisticCollection(nullptr, globalScopeId)); | ||
} | ||
|
||
void gatherSpillSize(const IConstWorkUnit * wu, const char *scope, stat_type & peakSizeSpill) | ||
void GlobalStatisticCollection::loadExistingAggregates(IConstWorkUnit &workunit) | ||
{ | ||
WuScopeFilter filter; | ||
if (!isEmptyString(scope)) | ||
filter.addScope(scope); | ||
else | ||
const char * _wuid = workunit.queryWuid(); | ||
jakesmith marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if (!streq(_wuid, wuid.str())) // New statsCollection if collection for different workunit | ||
{ | ||
filter.addScope(""); | ||
filter.addSource("global"); | ||
StatsScopeId globalScopeId(SSTglobal, (unsigned)0); | ||
statsCollection.setown(createStatisticCollection(nullptr, globalScopeId)); | ||
wuid.set(_wuid); | ||
} | ||
filter.setIncludeNesting(1); | ||
filter.addOutputStatistic(StSizeGraphSpill); | ||
filter.addRequiredStat(StSizeGraphSpill); | ||
filter.finishedFilter(); | ||
Owned<IConstWUScopeIterator> it = &wu->getScopeIterator(filter); | ||
peakSizeSpill = 0; | ||
for (it->first(); it->isValid(); ) | ||
|
||
class StatsCollectionAggregatesLoader : public IWuScopeVisitor | ||
{ | ||
stat_type value = 0; | ||
if (it->getStat(StSizeGraphSpill, value)) | ||
public: | ||
StatsCollectionAggregatesLoader(IStatisticCollection * _statsCollection) : statsCollection(_statsCollection) {} | ||
|
||
virtual void noteStatistic(StatisticKind kind, unsigned __int64 value, IConstWUStatistic & extra) override | ||
{ | ||
if (value>peakSizeSpill) | ||
peakSizeSpill = value; | ||
it->nextSibling(); | ||
statsCollection->setStatistic(extra.queryScope(), kind, value); | ||
} | ||
else | ||
virtual void noteAttribute(WuAttr attr, const char * value) override { throwUnexpected(); } | ||
virtual void noteHint(const char * kind, const char * value) override { throwUnexpected(); } | ||
virtual void noteException(IConstWUException & exception) override { throwUnexpected(); } | ||
private: | ||
Linked<IStatisticCollection> statsCollection; | ||
}; | ||
|
||
WuScopeFilter filter; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. At first glance this looks more complicated than I would expect. I suspect an IConstWorkuit::loadGlobalStats(IStatisticCollection * _statsCollection) could be much more efficient. Will return after looking at the rest. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @ghalliday You were going to come back to do this. Any ideas on making this more efficient? Btw, this will be called once per graph. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes it could be made more efficient by directly walking the stats list. If this works I wouldn't change it for this PR though - something to return to and simplify the code. |
||
filter.addScopeType(SSTglobal).addScopeType(SSTworkflow).addScopeType(SSTgraph); | ||
const unsigned numStats = aggregateKindsMapping.numStatistics(); | ||
for (unsigned i=0; i<numStats; ++i) | ||
filter.addOutputStatistic(aggregateKindsMapping.getKind(i)); | ||
filter.setDepth(1,3); // 1=global, 2=workflow, 3=graph | ||
filter.setSources(SSFsearchGlobalStats); | ||
filter.setIncludeNesting(0); | ||
filter.finishedFilter(); | ||
|
||
StatsCollectionAggregatesLoader aggregatesLoader(statsCollection); | ||
Owned<IConstWUScopeIterator> iter = &workunit.getScopeIterator(filter); | ||
ForEach(*iter) | ||
iter->playProperties(aggregatesLoader); | ||
} | ||
|
||
// getCollectionForUpdate() returns IStatisticCollection for the given subgraph | ||
// if clearStats==true then the existing stats are cleared for the given scope and descendants | ||
IStatisticCollection * GlobalStatisticCollection::getCollectionForUpdate(StatisticCreatorType creatorType, const char * creator, unsigned wfid, const char *graphName, unsigned sgId, bool clearStats) | ||
{ | ||
StatsScopeId graphScopeId; | ||
verifyex(graphScopeId.setScopeText(graphName)); | ||
StatsScopeId wfScopeId(SSTworkflow, wfid); | ||
StatsScopeId sgScopeId(SSTsubgraph, sgId); | ||
|
||
bool wasCreated; | ||
IStatisticCollection * sgScopeCollection = statsCollection->ensureSubScopePath({wfScopeId,graphScopeId, sgScopeId}, wasCreated); | ||
if (clearStats && !wasCreated) | ||
sgScopeCollection->clearStats(); | ||
jakesmith marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// Marking the collection dirty here is not ideal. It would be better to have a call to IStatisticCollection::setStatistic mark the scope as dirty. | ||
// However, this would be inefficient as each call to IStatisticCollection::setStatistic would require the dirty flag to be set for all parent scopes. | ||
sgScopeCollection->markDirty(); | ||
return createRootStatisticCollection(creatorType, creator, wfScopeId, graphScopeId, sgScopeCollection); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is confusing. It is adding a "Root" scope with an id of wfid underneath a scope of workflow/graph/subgraph. So effectively in the collection it is global->wfid->graph->subgraph->wfid->. |
||
} | ||
|
||
// Recalculate aggregates and then write the aggregates to global stats (dali) | ||
void GlobalStatisticCollection::updateAggregates(IWorkUnit *wu) | ||
{ | ||
struct AggregateUpdatedCallBackFunc : implements IWhenAggregateUpdatedCallBack | ||
{ | ||
Linked<IWorkUnit> wu; | ||
AggregateUpdatedCallBackFunc(IWorkUnit *_wu) : wu(_wu) {} | ||
void operator () (const char * scope, StatisticScopeType sst, StatisticKind kind, stat_type value) | ||
{ | ||
it->next(); | ||
wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), sst, scope, kind, nullptr, value, 1, 0, StatsMergeReplace); | ||
} | ||
} | ||
} aggregateUpdatedCallBackFunc(wu); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. would be simpler to use a std::function, e.g.:
|
||
|
||
statsCollection->refreshAggregates(aggregateKindsMapping, aggregateUpdatedCallBackFunc); | ||
} | ||
|
||
void updateSpillSize(IWorkUnit * wu, const char * scope, StatisticScopeType scopeType) | ||
// Prune all subgraph descendent stats (leaving subgraph stats for future aggregation) | ||
void GlobalStatisticCollection::pruneSubGraphDescendants(unsigned wfid, const char *graphName, unsigned sgId) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not sure this is the right approach - I will think about it. My initial expectation would be the subgraph stats are gathered in the same way they always have been, and then they are passed to the global stats gatherer to extract the top level items. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was couple of reasons for implementing the functionality like this
I can leave the existing gatherer code as is and merge the necessary stats from the gatherer into the GlobalAggregator. Let me know if you think we should do this. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Let's arrange a walkthrough to discuss before heading off an making any changes though. |
||
{ | ||
stat_type peakSizeSpill = 0; | ||
gatherSpillSize(wu, scope, peakSizeSpill); | ||
if (peakSizeSpill) | ||
wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), scopeType, scope, StSizeGraphSpill, nullptr, peakSizeSpill, 1, 0, StatsMergeMax); | ||
StatsScopeId graphScopeId; | ||
verifyex(graphScopeId.setScopeText(graphName)); | ||
StatsScopeId wfScopeId(SSTworkflow, wfid); | ||
StatsScopeId sgScopeId(SSTsubgraph, sgId); | ||
IStatisticCollection * sgScopeCollection = statsCollection->querySubScopePath({wfScopeId,graphScopeId, sgScopeId}); | ||
if (sgScopeCollection) | ||
sgScopeCollection->pruneChildStats(); | ||
} | ||
|
||
//--------------------------------------------------------------------------------------------------------------------- | ||
|
||
|
||
|
@@ -3805,8 +3827,8 @@ class CDaliWorkUnit; | |
class CDaliWuGraphStats : public CWuGraphStats | ||
{ | ||
public: | ||
CDaliWuGraphStats(const CDaliWorkUnit* _owner, StatisticCreatorType _creatorType, const char * _creator, unsigned _wfid, const char * _rootScope, unsigned _id, bool _merge) | ||
: CWuGraphStats(_creatorType, _creator, _wfid, _rootScope, _id, _merge), owner(_owner), graphName(_rootScope), wfid(_wfid) | ||
CDaliWuGraphStats(const CDaliWorkUnit* _owner, StatisticCreatorType _creatorType, const char * _creator, unsigned _wfid, const char * _rootScope, unsigned _id, bool _merge, IStatisticCollection * stats) | ||
: CWuGraphStats(_creatorType, _creator, _wfid, _rootScope, _id, _merge, stats), owner(_owner), graphName(_rootScope), wfid(_wfid) | ||
{ | ||
} | ||
protected: | ||
|
@@ -3820,8 +3842,8 @@ class CDaliWuGraphStats : public CWuGraphStats | |
class CLocalWuGraphStats : public CWuGraphStats | ||
{ | ||
public: | ||
CLocalWuGraphStats(IPropertyTree *_p, StatisticCreatorType _creatorType, const char * _creator, unsigned _wfid, const char * _rootScope, unsigned _id, bool _merge) | ||
: CWuGraphStats(_creatorType, _creator, _wfid, _rootScope, _id, _merge), graphName(_rootScope), p(_p) | ||
CLocalWuGraphStats(IPropertyTree *_p, StatisticCreatorType _creatorType, const char * _creator, unsigned _wfid, const char * _rootScope, unsigned _id, bool _merge, IStatisticCollection * stats) | ||
: CWuGraphStats(_creatorType, _creator, _wfid, _rootScope, _id, _merge, stats), graphName(_rootScope), p(_p) | ||
{ | ||
} | ||
protected: | ||
|
@@ -4125,9 +4147,9 @@ class CDaliWorkUnit : public CPersistedWorkUnit | |
} | ||
} | ||
} | ||
virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge) const override | ||
virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge, IStatisticCollection * stats) const override | ||
{ | ||
return new CDaliWuGraphStats(this, creatorType, creator, _wfid, graphName, subgraph, merge); | ||
return new CDaliWuGraphStats(this, creatorType, creator, _wfid, graphName, subgraph, merge, stats); | ||
} | ||
virtual void import(IPropertyTree *wuTree, IPropertyTree *graphProgressTree) | ||
{ | ||
|
@@ -4437,8 +4459,8 @@ class CLockedWorkUnit : implements ILocalWorkUnit, implements IExtendedWUInterfa | |
{ c->setGraphState(graphName, wfid, state); } | ||
virtual void setNodeState(const char *graphName, WUGraphIDType nodeId, WUGraphState state) const | ||
{ c->setNodeState(graphName, nodeId, state); } | ||
virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge) const override | ||
{ return c->updateStats(graphName, creatorType, creator, _wfid, subgraph, merge); } | ||
virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge, IStatisticCollection * stats=nullptr) const override | ||
{ return c->updateStats(graphName, creatorType, creator, _wfid, subgraph, merge, stats); } | ||
virtual void clearGraphProgress() const | ||
{ c->clearGraphProgress(); } | ||
virtual IStringVal & getAbortBy(IStringVal & str) const | ||
|
@@ -10268,9 +10290,9 @@ void CLocalWorkUnit::setNodeState(const char *graphName, WUGraphIDType nodeId, W | |
{ | ||
throwUnexpected(); // Should only be used for persisted workunits | ||
} | ||
IWUGraphStats *CLocalWorkUnit::updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge) const | ||
IWUGraphStats *CLocalWorkUnit::updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge, IStatisticCollection * stats) const | ||
{ | ||
return new CLocalWuGraphStats(LINK(p), creatorType, creator, _wfid, graphName, subgraph, merge); | ||
return new CLocalWuGraphStats(LINK(p), creatorType, creator, _wfid, graphName, subgraph, merge, stats); | ||
} | ||
|
||
void CLocalWUGraph::setName(const char *str) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Q: Does this imply root scope has changed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No it hasn't. It was previously called "rootScopeId" but it was actually a workflow scope. Line 192 of the original code has "StatsScopeId rootScopeId(SSTworkflow,wfid)". I changed the name from rootScopeId to workflowScopeId as it was being confused with _rootScope.