-
Notifications
You must be signed in to change notification settings - Fork 304
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HPCC-29657 Produce aggregate stats (e.g. spill, cost) whilst a job is running #17786
Conversation
https://track.hpccsystems.com/browse/HPCC-29657 |
d34c72e
to
c2e13ef
Compare
Note: Work in progress hence draft PR |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@shamser looks good. A few comments.
The main question in my mind is where should stats that are going to be aggregated be stored? This change has moved them from the global stats to the subgraph. I think that makes logical sense, but I don't know what implications that has for efficiency. (Logically the execution times should also move, but that would have a performance impact.)
The other question in my mind is what is the minimal set of changes which would allow this change to be merged?
The other thought is
system/jlib/jstats.cpp
Outdated
@@ -1825,7 +1813,7 @@ class CStatisticCollection : public CInterfaceOf<IStatisticCollection> | |||
} | |||
virtual StringBuffer & getFullScope(StringBuffer & str) const override | |||
{ | |||
if (parent) | |||
if (parent && queryScopeType()!=SSTworkflow) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
better test would be parent->queryscopeType() != SSTglobal. (Would work with compile stages as well.)
system/jlib/jstats.cpp
Outdated
@@ -1988,9 +2000,58 @@ class CStatisticCollection : public CInterfaceOf<IStatisticCollection> | |||
cur.visit(visitor); | |||
} | |||
|
|||
virtual bool refreshAggregates(std::vector<StatisticKind> & aggregateKinds, std::vector<unsigned __int64> & totals) override |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would a CRuntimeStatisticCollection (instead of aggregateKinds, totals) simplify this code? The dual arrays seem to implement the same idea.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CRuntimeStatisticCollection cannot store the variants. When the stats are deserialized, we'd have to ignore all the variants. We could ignore all the variants. Losing the variants would mean that we could not have a single in-memory object for the stats as we'd need one for aggregation (GlobalStatisticCollection ) and another for serialization(CStatisticGatherer). We would also not be able to extend the functionality to use GlobalStatisticCollection to cache runtime stats and allow periodic updates to dali.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, not that a new CRuntimeStatisticCollection will need to be created for each scope and each one will have a StatisticMapping. With the current implementation, a single vector with the StatisticKinds is used as it traverses the entire StatisticsCollection. I think using CRuntimeStatisticCollection will be slower and use more memory.
system/jlib/jstats.cpp
Outdated
StatsScopeId id; | ||
id.deserialize(serialized, version); | ||
IStatisticCollection * collection = ensureSubScope(id, true); | ||
collection->deserialize(serialized, version); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optimization for later. Indicate to deserialize that it does not need to deserialize beyond a subgraph. Would avoid all the activities being deserialized.
That option might be usable by other stats iterators.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Noted. I'll avoid deserialize unnecessary scopes.
system/jlib/jstats.cpp
Outdated
class GlobalStatisticCollection : public CStatisticCollection | ||
{ | ||
public: | ||
GlobalStatisticCollection(IPropertyTree * root) : CStatisticCollection(nullptr, globalScopeId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code belong in workunit.cpp, rather than jlib, because it is specific to the way stats are represented in workunits. It should be possible to implement using the public interface for the stats collection.
system/jlib/jstats.cpp
Outdated
while (totalIter != totals.end()) | ||
{ | ||
StatsMergeAction mergeAction = queryMergeMode(*kindIter); | ||
updateStatistic(*kindIter, *subTotalIter, mergeAction); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should probably replace rather than merge - since this is the new aggregated value.
I didn't move execution times because there was some special aggregation code that handled the case where workflow and hthor was on the same cpu. Also, it wasn't the case the aggregation of execution time at sub-graph level should be the same as the execution of the parent scopes. For example, there was also small gaps between subgraphs which would be lost if we simply aggregated the subgraphs. I did move StSizeSpillFile, StSizeGraphSpill and StCostFileAccess which I thought wouldn't impact performance too much. Are there instances that you are aware of that would be impacted by these changes? @ghalliday |
@ghalliday I pushed 2 new commits:
With the second commit, I eliminated the loading of stats from Dali for aggregation in Thor. I'm making the similar changes in hThor. |
f6cac47
to
1347444
Compare
thorlcr/master/thdemonserver.cpp
Outdated
cost_type costDiskAccess = graph->getDiskAccessCost(); | ||
if (costDiskAccess) | ||
stats.addStatistic(StCostFileAccess, costDiskAccess); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should really move to MasterGraph::getStats()
c838bcd
to
0184f96
Compare
system/jlib/jstats.cpp
Outdated
{ | ||
if (updateStatistic(*kindIter, *subTotalIter, StatsMergeReplace)) | ||
updated=true; | ||
(*totalIter) += *subTotalIter; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same question as on line 2030 - how does this work for e.g. StatsMergeMax
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, this has been done already.
system/jlib/jstats.cpp
Outdated
std::vector<unsigned __int64>::iterator totalIter = totals.begin(); | ||
std::vector<unsigned __int64>::iterator subTotalIter = childTotals.begin(); | ||
std::vector<StatisticKind>::iterator kindIter = aggregateKinds.begin(); | ||
while (totalIter != totals.end()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think may need to conditionally look at totals - not all aggregateKinds may have have been found in the collection, therefore totals will not have been set, but will contain 0, and unconditionally setting 0 for all aggregate types may not be correct.
common/workunit/workunit.cpp
Outdated
|
||
void GlobalStatisticCollection::load(const char *_wuid, unsigned statsMaxDepth, bool missingScopesOnly) | ||
{ | ||
if (strcmp(_wuid, wuid.str())!=0) // Make sure stats collection is for this workunit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably clearer if load is the route that always creates the collection, i.e. no need for ctor route and code looks like:
if (!streq(_wuid, wuid)) // Make sure stats collection is for this workunit
{
StatsScopeId globalScopeId(SSTglobal, (unsigned)0);
statsCollection.setown(createStatisticCollection(nullptr, globalScopeId));
wuid.set(_wuid);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It also highlights that a Thor instance ping-ponging between multiple jobs will not reuse existing load stats (and have to reload all prior stats each time).
When it only loads dirty levels, it won't matter much. And/or we could keep several GlobalStatisticCollection objects around for the last N jobs, to avoid recreating them.
@shamser - please see inlne comments, and comments below. If I understand it correctly, the current implementation does:
Goals:
As it stands, when a new thor instance picks up a running workunit, let's say to run graph100, I think it will:
As it stands, there would be too much of a performance hit on medium to big size jobs, unless you had just 1 Thor instance that worked on the whole job from start to finish. To overcome that, as we've discussed and as you've alluded to in one of your comments, we need a mechanism which tracks what levels are dirty. I think this can be achieved by adding a dirty flag (not sure whether best under or ) to each level that is active, e.g. when WF2 starts, it is marked dirty, and the flag is only cleared when it completes. updateAggregates needs to recalculate and push all aggregates for any dirty layer (the top/global level is implicitly dirty if the job is running anything). In practice that means, that when it's e.g. running graph100 in wf5 and the job lands on a new Thor instance, global+wf5+graph100 should be the only things that are marked dirty. |
0184f96
to
25bdaa9
Compare
I've modified the design since this comment was posted. I was actually testing the changes before I saw these code review comments. Although the design is different from what you've suggested, I feel that it achieves that efficiency goal. Please can you have a look at the PR again. @jakesmith |
68305df
to
ceef1ae
Compare
fa65242
to
3f71b3f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@shamser - looks good and close, there's a couple of leaks that needs fixing, some trivial things and a couple of questions, but I suspect those could be done in a subsequent PR.
Please address the leak/trivials for now.
@@ -1134,7 +1136,9 @@ bool CJobManager::executeGraph(IConstWorkUnit &workunit, const char *graphName, | |||
cost_type cost = money2cost_type(calculateThorCost(nanoToMilli(graphTimeNs), numberOfMachines)); | |||
if (cost) | |||
wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTgraph, graphScope, StCostExecute, NULL, cost, 1, 0, StatsMergeReplace); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will these graph CostExecute stats. be aggregated at the workflow level, and global level?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The workflow level cost includes not only all the subgraph costs but also workflow engine cost. There is special code to handle this calculation: when calculating workflow level cost, it adds the additional workflow engine cost to the subgraph cost aggregates. Unfortunately, this also means that the graph aggregates cannot be calculated using the new method, as it would override workflow cost calculated with the special cost.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have looked into making use of the global stats collection object to handle CostExecute. It may make the code cleaner. However, this is not a trivial change and, probably, isn't worth doing in this PR. (The workflow engine costs complicates things too much).
common/workunit/workunit.cpp
Outdated
{ | ||
StatisticKind kind = aggregateKindsMapping.getKind(i); | ||
stat_type value; | ||
if (cur.getStatistic(kind, value) && value) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you do sometimes want to set if 0, it depends, should be using includeStatisticIfZero(kind) I think.
common/workunit/workunit.cpp
Outdated
if (refreshAggregates()) // Only serialize if the aggregates has changed | ||
{ | ||
StatisticsAggregatesWriter statsAggregatorWriter(wu, aggregateKindsMapping); | ||
statsCollection->visit(statsAggregatorWriter); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
related to the includeStatisticIfZero comment, should this be doing something like : ?
Owned<IStatisticGatherer> globalStaatsGatherer = createGlobalStatisticGatherer(wu);
statsCollection->recordStatistics(globalStatsGatherer);
CRuntimeStatisticCollection::recordStatistics handles things like includeStatisticIfZero, merge mode and measure conversion.
createGlobalStatisticGatherer builds up a scope as it's descending.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
statsAggregatorWriter is filtering out the required stats and writing out only those stats. It filters out some global stats that have been loaded in global stats such as dfu stats, compile stats etc. Stats at the subgraph level should also be ignored (not written to global wu stats).
Also, IStatisticCollection/CStatisticCollection doesn't have a recordStatistics function.
e786af8
to
1925dc9
Compare
system/jlib/jstats.cpp
Outdated
{ | ||
const StatisticsMapping & mapping = totals.queryMapping(); | ||
bool updated = false; | ||
if (queryScopeType()==SSTsubgraph || isDirty==false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this correct?
- Unconditionally if subgraph (even if not marked dirty)
|| - if isDirty==false (so skip if markDirty() called for this subgraph)
I guess I'm not clear why the whole thing is not wrapped in a if (isDirty) instead?
e.g. shouldn't refreshAggregates be a noop if were to be called before getCollectionForUpdate used that calls markDirty ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree: checking isDirty is sufficient. I had the subgraph test because the isDirty flag was implemented later and I didn't reassess the need for the subgraph test.
I modified the code so that if isDirty is false at the global level, the operation is a nop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ignore the previous comment. The unconditional sg test is required. The first block (if condition true) returns the totals at this level. The second block (if condition false) descends to a lower scope level to get totals from lower level.
It should return totals from the current level under 2 circumstances
- isDirty==false: the stats at this level and descendant levels haven't changed, so it is ok to use the aggregates from this level
- it is at subgraph scope: this condition is required because even if the dirty flag is true, aggregates should only be generated from subgraph level. As per the agreed requirements, the aggregates generated automatically should not be generated from activities->subgraph. (At a later date, if it is determined that aggregates should be generated from aggregates, this condition may be removed but it is beyond the scope of this jira for this change now.)
system/jlib/jstats.cpp
Outdated
return new CStatisticCollection(parent, scopeId); | ||
} | ||
|
||
IStatisticCollection * createRootStatisticCollection(StatisticCreatorType creatorType, const char * creator, const StatsScopeId & rootScope, const StatsScopeId & graphScopeId, IStatisticCollection * childCollection) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think childCollection would be clearer if called sgCollection (as it is in CRootStatisticCollection)
(and also in jstats.h)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@shamser - please see new comments.
Mostly cosmetic and 1 question re. isDirty handling: https://github.com/hpcc-systems/HPCC-Platform/pull/17786/files#r1377617622
0621714
to
fb9a872
Compare
fb9a872
to
3185f97
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@shamser Broadly the change looks good. There are two general comments:
- I expected the collection code to stay as it is, and then have a function that merges the subgraph(/graph) stats into the aggregator as a separate call. Storing the stats in the global stats aggregator seems to be overloading the meaning of that class.
- The public interface to the IStatisticsCollection class expanded by more than I expected. Some could be implemented as global functions, some indicate logic ins't quite in the right place.
Also a few bits of code where the logic does not look right/should be deleted.
- non-memberMore information is leaked out of the jstats files that I would expect. is
: creatorType(_creatorType), creator(_creator), id(_id), merge(_merge) | ||
{ | ||
StatsScopeId graphScopeId; | ||
verifyex(graphScopeId.setScopeText(_rootScope)); | ||
StatsScopeId wfScopeId(SSTworkflow,wfid); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Q: Does this imply root scope has changed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No it hasn't. It was previously called "rootScopeId" but it was actually a workflow scope. Line 192 of the original code has "StatsScopeId rootScopeId(SSTworkflow,wfid)". I changed the name from rootScopeId to workflowScopeId as it was being confused with _rootScope.
common/workunit/workunit.cpp
Outdated
} | ||
|
||
loadGlobalAggregates(workunit); | ||
if (isEmptyString(graphName)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Almost certainly should be if (!isEmptyString()). Suggests this code hasn't been walked through, or graphName is always empty
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like graphName is always empty. Better to check and assert rather than include untested code in the PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was originally, if (*graphName) and that was tested. However, at the last minute I changed the code to use isEmptyString. I know I shouldn't have done that - I should have tested every changed, even seemingly minor non-functional change.
common/workunit/workunit.cpp
Outdated
if (!compressed.length()) | ||
break; | ||
MemoryBuffer serialized; | ||
decompressToBuffer(serialized, compressed); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note to self about inefficiency of lzw...
common/workunit/workunit.cpp
Outdated
|
||
StatsScopeId childId; | ||
childId.deserialize(serialized, version); | ||
int statsMinDepth = 0, statsMaxDepth = INT_MAX; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be outside the main loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've removed the code to load sg stats in global collection- this entire section removed.
system/jlib/jstats.cpp
Outdated
else | ||
{ | ||
StatsScopeId childScopeId; | ||
if (!childScopeId.setScopeText(scope, &scope) || (*scope!=':' && *scope!='\0')) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clearer to use different variables for input and output. E.g. next.
system/jlib/jstats.cpp
Outdated
@@ -1862,6 +1868,24 @@ class CStatisticCollection : public CInterfaceOf<IStatisticCollection> | |||
} | |||
return false; | |||
} | |||
virtual bool setStatistic(const char *scope, StatisticKind kind, unsigned __int64 & value) override |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't really need to be a member function - could be implemented using the existing public interface.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is used from workunit.cpp by the aggregate loader. It could be done using the existing member functions but it would be cumbersome to add code to the aggregate loader to split the scopes and navigate down the tree.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree it should be in a function. The comment is that it could be a global function - which would reduce the number of functions in the interface. Debatable style wise, but was connected to the large number of new functions in the interface.
system/jlib/jstats.cpp
Outdated
} | ||
else | ||
{ | ||
deserializeNoStats(in, version); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems a shame. Can it be avoided for the common case i.e. where you are only reading the subgraph stats which are the root for hthor/thor graphs.
{ | ||
// descend down to lower level to obtain totals required for aggregates and then aggregate | ||
CRuntimeStatisticCollection childTotals(mapping); | ||
Owned<IBitSet> childTotalUpdated = createBitSet(mapping.numStatistics()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic does not seem correct. total will only updated with any values that have changed - but in the not-dirty case all values are updated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, I don't follow the logic (related to previous comment here: #17786 (comment)).
The true condition will be followed unconditionally for all subgraphs (regardless of whether isDirty = true or false), which looks like it means everything above every subgraph will also be updated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it achieves the goal, but am still finding how it is doing it a bit confusing.
What I'd expect I think, is for each level to do:
- if at level and dirty, do nothing
- If at level and is subgraph, do nothing.
- If dirty, recurse down each scope, and then merge the child values into new totals at current level.
- When finished recursing down each child scope and merging into new totals, update what has changed into current level stats, and push totals (via callback if necessasry).
I had a stab at what that would look like, and I also think it should be done without adding the two quite implementation orientated 'refreshAggregates' methods to IStatisticCollection :
bool refreshAggregates(IStatisticCollection &stats, const StatisticsMapping & mapping, AggregateUpdatedCallBackFunc & fWhenAggregateUpdated)
{
if (stats.queryScopeType() == SSTsubgraph) // don't descend any further
return false;
if (!stats.isDirty())
return false;
bool anyChanges = false;
CRuntimeStatisticCollection totals(mapping);
Owned<IBitSet> updated = createBitSet(mapping.numStatistics());
Owned<IStatisticCollectionIterator> childIter = &stats.getScopes(nullptr, false);
ForEach(*childIter)
{
IStatisticCollection &child = childIter->query();
refreshAggregates(child, mapping, fWhenAggregateUpdated);
// merge in child level stats into current level, and mark which aggregate kinds have updated
unsigned numChildStats = child.getNumStatistics();
for (unsigned s=0; s<numChildStats; s++)
{
StatisticKind kind;
unsigned __int64 value;
child.getStatistic(kind, value, s);
if (kind != (StatisticKind)(kind & StKindMask))
continue; // ignore variants->not supported by CRuntimeStatisticCollection
unsigned index = mapping.getIndex(kind);
if (index != mapping.numStatistics())
{
totals.mergeStatistic(kind, value);
updated->set(index); // NB: may already have been set whilst merging in another child, but that's okay.
}
}
}
// any updates? Set new totals into current level, and call callback
unsigned nextUpdateIndex = 0;
while (true)
{
nextUpdateIndex = updated->scan(nextUpdateIndex, true);
if (NotFound == nextUpdateIndex)
break;
StatisticKind kind = mapping.getKind(nextUpdateIndex);
unsigned __int64 value = totals.queryStatisticByIndex(nextUpdateIndex).get();
if (stats.updateStatistic(kind, value, StatsMergeReplace))
{
if (value || includeStatisticIfZero(kind))
{
StringBuffer s;
fWhenAggregateUpdated(stats.getFullScope(s).str(), stats.queryScopeType(), kind, value);
anyChanges = true;
}
}
}
stats.clearDirty();
return anyChanges;
}
system/jlib/jstats.cpp
Outdated
@@ -2183,7 +2191,8 @@ StringBuffer &CStatisticCollection::toXML(StringBuffer &out) const | |||
SuperHashIteratorOf<CStatisticCollection> iter(children, false); | |||
for (iter.first(); iter.isValid(); iter.next()) | |||
iter.query().toXML(out); | |||
out.append("</Scope>\n"); | |||
out.append("</Scope> // Scope id=\""); | |||
id.getScopeText(out).append("\"\n"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Except it isn't valid xml any more. The information is available in the opening tag why duplicate it here?
system/jlib/jstats.cpp
Outdated
const StatisticsMapping & mapping = totals.queryMapping(); | ||
bool updated = false; | ||
// if this scope is not dirty, the aggregates are accurate at this level so return totals (no need to descend) | ||
// Also if at sg scope, do not descend as aggregates does not need to be generated from below sg level |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
trivial: "..as aggregates do not need.."
system/jlib/jstats.cpp
Outdated
@@ -2183,7 +2191,8 @@ StringBuffer &CStatisticCollection::toXML(StringBuffer &out) const | |||
SuperHashIteratorOf<CStatisticCollection> iter(children, false); | |||
for (iter.first(); iter.isValid(); iter.next()) | |||
iter.query().toXML(out); | |||
out.append("</Scope>\n"); | |||
out.append("</Scope> // Scope id=\""); | |||
id.getScopeText(out).append("\"\n"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unresolving this comment - missed fact that it is not a valid XML comment syntax, and Gavin's other Q re. duplicate info.
{ | ||
// descend down to lower level to obtain totals required for aggregates and then aggregate | ||
CRuntimeStatisticCollection childTotals(mapping); | ||
Owned<IBitSet> childTotalUpdated = createBitSet(mapping.numStatistics()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, I don't follow the logic (related to previous comment here: #17786 (comment)).
The true condition will be followed unconditionally for all subgraphs (regardless of whether isDirty = true or false), which looks like it means everything above every subgraph will also be updated.
3185f97
to
63b3503
Compare
… running Signed-off-by: Shamser Ahmed <[email protected]>
63b3503
to
a74267b
Compare
Signed-off-by: Shamser Ahmed <[email protected]>
a74267b
to
9db0c5f
Compare
@ghalliday @jakesmith I have addressed most of the review comments. However, I haven't addressed keeping the existing gatherer code and having a separate Aggregator object. I think the reason for the current design of using a gatherer to write director to the global stats object, is that I was
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@shamser I think this is looking better. I am still not convinced by doubling up the structure for two purposes - but let's discuss before making any changes.
Linked<IStatisticCollection> statsCollection; | ||
}; | ||
|
||
WuScopeFilter filter; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes it could be made more efficient by directly walking the stats list. If this works I wouldn't change it for this PR though - something to return to and simplify the code.
@@ -883,8 +912,11 @@ extern jlib_decl StatisticCreatorType queryCreatorType(const char * sct, Statist | |||
extern jlib_decl StatisticScopeType queryScopeType(const char * sst, StatisticScopeType dft); | |||
|
|||
extern jlib_decl IStatisticGatherer * createStatisticsGatherer(StatisticCreatorType creatorType, const char * creator, const StatsScopeId & rootScope); | |||
extern jlib_decl IStatisticGatherer * createStatisticsGatherer(IStatisticCollection * stats); | |||
extern jlib_decl IStatisticCollection * createRootStatisticCollection(StatisticCreatorType creatorType, const char * creator, const StatsScopeId & rootScope, const StatsScopeId & graphScope, IStatisticCollection * sgCollection=nullptr); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks wrong. A RootStatisticCollection is meant to have no parent. I think this is another result of mixing up gathering the statisics with calculating aggregates.
// Marking the collection dirty here is not ideal. It would be better to have a call to IStatisticCollection::setStatistic mark the scope as dirty. | ||
// However, this would be inefficient as each call to IStatisticCollection::setStatistic would require the dirty flag to be set for all parent scopes. | ||
sgScopeCollection->markDirty(); | ||
return createRootStatisticCollection(creatorType, creator, wfScopeId, graphScopeId, sgScopeCollection); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is confusing. It is adding a "Root" scope with an id of wfid underneath a scope of workflow/graph/subgraph. So effectively in the collection it is global->wfid->graph->subgraph->wfid->.
It make me more convinced it is a mistake to try and merge these two concepts.
It also means roxie works rather differently from eclagent/thor.
} | ||
} | ||
|
||
void updateSpillSize(IWorkUnit * wu, const char * scope, StatisticScopeType scopeType) | ||
// Prune all subgraph descendent stats (leaving subgraph stats for future aggregation) | ||
void GlobalStatisticCollection::pruneSubGraphDescendants(unsigned wfid, const char *graphName, unsigned sgId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Copying the stats is trivial. You are copying ~4 stats from one structure to another.
- That can be solved in a different way. Saving a queue of blobs to commit.
Let's arrange a walkthrough to discuss before heading off an making any changes though.
scopes.append(*tos.ensureSubScope(scopeId, true)); | ||
} | ||
virtual void beginChannelScope(unsigned id) override | ||
{ | ||
StatsScopeId scopeId(SSTchannel, id); | ||
CStatisticCollection & tos = scopes.tos(); | ||
IStatisticCollection & tos = scopes.tos(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still lots of unrequired changes from CStatisticCollection to IStatisticCollection throught the file that complicate the compare.
if (value || includeStatisticIfZero(kind)) | ||
{ | ||
StringBuffer s; | ||
(fWhenAggregateUpdated)(getFullScope(s).str(), queryScopeType(), kind, value); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
trivial. I don't think the brackets are unneeded and confusing.
} | ||
if (updated) | ||
// 1) Set any values that has changed for this scope and 2) update ALL totals for parent | ||
const unsigned numStats = mapping.numStatistics(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
trivial/style: unusual to make a scalar a const (e.g. not done elsewhere in this function).
StatisticsAggregatesWriter statsAggregatorWriter(wu, aggregateKindsMapping); | ||
statsCollection->visit(statsAggregatorWriter); | ||
} | ||
} aggregateUpdatedCallBackFunc(wu); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would be simpler to use a std::function, e.g.:
AggregateUpdatedCallBackFunc f = [&](const char * scope, StatisticScopeType sst, StatisticKind kind, stat_type value)
{
wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), sst, scope, kind, nullptr, value, 1, 0, StatsMergeReplace);
};
statsCollection->refreshAggregates(aggregateKindsMapping, f);
{ | ||
// descend down to lower level to obtain totals required for aggregates and then aggregate | ||
CRuntimeStatisticCollection childTotals(mapping); | ||
Owned<IBitSet> childTotalUpdated = createBitSet(mapping.numStatistics()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it achieves the goal, but am still finding how it is doing it a bit confusing.
What I'd expect I think, is for each level to do:
- if at level and dirty, do nothing
- If at level and is subgraph, do nothing.
- If dirty, recurse down each scope, and then merge the child values into new totals at current level.
- When finished recursing down each child scope and merging into new totals, update what has changed into current level stats, and push totals (via callback if necessasry).
I had a stab at what that would look like, and I also think it should be done without adding the two quite implementation orientated 'refreshAggregates' methods to IStatisticCollection :
bool refreshAggregates(IStatisticCollection &stats, const StatisticsMapping & mapping, AggregateUpdatedCallBackFunc & fWhenAggregateUpdated)
{
if (stats.queryScopeType() == SSTsubgraph) // don't descend any further
return false;
if (!stats.isDirty())
return false;
bool anyChanges = false;
CRuntimeStatisticCollection totals(mapping);
Owned<IBitSet> updated = createBitSet(mapping.numStatistics());
Owned<IStatisticCollectionIterator> childIter = &stats.getScopes(nullptr, false);
ForEach(*childIter)
{
IStatisticCollection &child = childIter->query();
refreshAggregates(child, mapping, fWhenAggregateUpdated);
// merge in child level stats into current level, and mark which aggregate kinds have updated
unsigned numChildStats = child.getNumStatistics();
for (unsigned s=0; s<numChildStats; s++)
{
StatisticKind kind;
unsigned __int64 value;
child.getStatistic(kind, value, s);
if (kind != (StatisticKind)(kind & StKindMask))
continue; // ignore variants->not supported by CRuntimeStatisticCollection
unsigned index = mapping.getIndex(kind);
if (index != mapping.numStatistics())
{
totals.mergeStatistic(kind, value);
updated->set(index); // NB: may already have been set whilst merging in another child, but that's okay.
}
}
}
// any updates? Set new totals into current level, and call callback
unsigned nextUpdateIndex = 0;
while (true)
{
nextUpdateIndex = updated->scan(nextUpdateIndex, true);
if (NotFound == nextUpdateIndex)
break;
StatisticKind kind = mapping.getKind(nextUpdateIndex);
unsigned __int64 value = totals.queryStatisticByIndex(nextUpdateIndex).get();
if (stats.updateStatistic(kind, value, StatsMergeReplace))
{
if (value || includeStatisticIfZero(kind))
{
StringBuffer s;
fWhenAggregateUpdated(stats.getFullScope(s).str(), stats.queryScopeType(), kind, value);
anyChanges = true;
}
}
}
stats.clearDirty();
return anyChanges;
}
Type of change:
Checklist:
Smoketest:
Testing: