From ceef1ae5e7980ad85b8acbb7f3738d5ee3070348 Mon Sep 17 00:00:00 2001 From: Shamser Ahmed Date: Fri, 13 Oct 2023 16:28:56 +0100 Subject: [PATCH] HPCC-29657 Changes following review Signed-off-by: Shamser Ahmed --- common/workunit/workunit.cpp | 27 ++++++++++---------- system/jlib/jstats.cpp | 48 ++++++++++++++++++++++++------------ system/jlib/jstats.h | 7 +++--- 3 files changed, 50 insertions(+), 32 deletions(-) diff --git a/common/workunit/workunit.cpp b/common/workunit/workunit.cpp index ed309c46c75..454bdfc5157 100644 --- a/common/workunit/workunit.cpp +++ b/common/workunit/workunit.cpp @@ -2667,6 +2667,7 @@ cost_type aggregateCost(const IConstWorkUnit * wu, const char *scope, bool exclu GlobalStatisticCollection::GlobalStatisticCollection() { + // Construct statsCollection here as call to GlobalStatisticCollection::load() is optional StatsScopeId globalScopeId(SSTglobal, (unsigned)0); statsCollection.setown(createStatisticCollection(nullptr, globalScopeId)); } @@ -2674,13 +2675,11 @@ GlobalStatisticCollection::GlobalStatisticCollection() void GlobalStatisticCollection::load(IConstWorkUnit &workunit, const char * graphName, bool aggregatesOnly, bool missingScopesOnly) { const char * _wuid = workunit.queryWuid(); - if (strcmp(_wuid, wuid.str())!=0) // Make sure stats collection is for this workunit + if (!streq(_wuid, wuid.str())) // New statsCollection if collection for different workunit { - if (!wuid.isEmpty()) - { - StatsScopeId globalScopeId(SSTglobal, (unsigned)0); - statsCollection.setown(createStatisticCollection(nullptr, globalScopeId)); - } + // future: consider caching so that stats are not lost between jobs + StatsScopeId globalScopeId(SSTglobal, (unsigned)0); + statsCollection.setown(createStatisticCollection(nullptr, globalScopeId)); wuid.set(_wuid); } @@ -2695,7 +2694,7 @@ void GlobalStatisticCollection::load(IConstWorkUnit &workunit, const char * grap StatsScopeId wfScopeId(SSTworkflow, graphPT->getPropInt("@wfid", 0)); StatsScopeId graphScopeId(SSTgraph, graphName); - Owned iter = graphPT->getElements("./*"); + Owned iter = graphPT->getElements("*"); ForEach(*iter) { StatsScopeId sgScopeId; @@ -2729,11 +2728,11 @@ void GlobalStatisticCollection::load(IConstWorkUnit &workunit, const char * grap int statsMinDepth = 0, statsMaxDepth = INT_MAX; if (aggregatesOnly) { + // Only store stats for subgraph level statsMinDepth = 3; statsMaxDepth = 3; } - // deserialize the subgraph stats, excluding any descendant stats - // Note: not all sg stats required for generating aggregates, so it would be + // future: not all sg stats required for generating aggregates, so consider if it is // more efficient to deserialize just the stats in the aggregateKinds list. statsCollection->deserializeChild(childId, serialized, version, statsMinDepth, statsMaxDepth); } @@ -2773,11 +2772,12 @@ void GlobalStatisticCollection::loadGlobalAggregates(IConstWorkUnit &workunit) } // getCollectionForUpdate() returns IStatisticCollection for the given rootScope -// if clearStats==true then the existing stats are cleared for the given scope +// if clearStats==true then the existing stats are cleared for the given scope and descendants IStatisticCollection * GlobalStatisticCollection::getCollectionForUpdate(const StatsScopeId & wfScopeId, const StatsScopeId & graphScopeId, const StatsScopeId & sgScopeId, StatisticCreatorType creatorType, const char * creator, bool clearStats) { - IStatisticCollection * sgScopeCollection = statsCollection->ensureSubScopePath({wfScopeId,graphScopeId, sgScopeId}); - if (clearStats) + bool wasCreated; + IStatisticCollection * sgScopeCollection = statsCollection->ensureSubScopePath({wfScopeId,graphScopeId, sgScopeId}, &wasCreated); + if (clearStats && wasCreated) sgScopeCollection->clearStats(); sgScopeCollection->markDirty(); return createRootStatisticCollection(creatorType, creator, wfScopeId, graphScopeId, sgScopeCollection); @@ -2787,7 +2787,8 @@ IStatisticCollection * GlobalStatisticCollection::getCollectionForUpdate(const S bool GlobalStatisticCollection::refreshAggregates() { std::vector totals(aggregateKinds.size()); - return statsCollection->refreshAggregates(aggregateKinds, totals); + std::vector isTotalUpdated(aggregateKinds.size()); + return statsCollection->refreshAggregates(aggregateKinds, totals, isTotalUpdated); } // Refresh aggregates and write them to global stats diff --git a/system/jlib/jstats.cpp b/system/jlib/jstats.cpp index 85c5bb9495c..ccf715756ad 100644 --- a/system/jlib/jstats.cpp +++ b/system/jlib/jstats.cpp @@ -1799,8 +1799,6 @@ class CStatisticCollection : public CInterfaceOf } } public: - CStatisticCollection(IStatisticCollection * _parent=nullptr) : parent(_parent) {} - CStatisticCollection(IStatisticCollection * _parent, const StatsScopeId & _id) : id(_id), parent(_parent) { } @@ -1966,24 +1964,30 @@ class CStatisticCollection : public CInterfaceOf stats.append(s); return true; } - - virtual IStatisticCollection * ensureSubScope(const StatsScopeId & search, bool hasChildren) override + virtual IStatisticCollection * ensureSubScope(const StatsScopeId & search, bool hasChildren) + { + bool wasCreated; + return ensureSubScope(search, hasChildren, &wasCreated); + } + virtual IStatisticCollection * ensureSubScope(const StatsScopeId & search, bool hasChildren, bool * wasCreated) override { //Once the CStatisticCollection is created it should not be replaced - so that returned pointers remain valid. + *wasCreated = false; IStatisticCollection * match = children.find(&search); if (match) return match; IStatisticCollection * ret = new CStatisticCollection(this, search); children.add(*ret); + *wasCreated=true; return ret; } - virtual IStatisticCollection * ensureSubScopePath(std::initializer_list path) override + virtual IStatisticCollection * ensureSubScopePath(std::initializer_list path, bool * wasCreated) override { IStatisticCollection * curScope = this; for (auto it = path.begin(); it != path.end(); ++it) - curScope = curScope->ensureSubScope(*it, true); // n.b. this will always return a valid pointer + curScope = curScope->ensureSubScope(*it, true, wasCreated); // n.b. this will always return a valid pointer return curScope; } @@ -2021,7 +2025,7 @@ class CStatisticCollection : public CInterfaceOf stats.ensureCapacity(numStats); while (numStats-- > 0) { - Statistic next (in, version); + Statistic next(in, version); if (minDepth <= 0) stats.append(next); } @@ -2038,7 +2042,7 @@ class CStatisticCollection : public CInterfaceOf } } - virtual void deserializeChild(StatsScopeId childId, MemoryBuffer & in, unsigned version, int minDepth, int maxDepth) override + virtual void deserializeChild(const StatsScopeId childId, MemoryBuffer & in, unsigned version, int minDepth, int maxDepth) override { if (maxDepth > 0) { @@ -2078,7 +2082,7 @@ class CStatisticCollection : public CInterfaceOf cur.visit(visitor); } - virtual bool refreshAggregates(std::vector & aggregateKinds, std::vector & totals) override + virtual bool refreshAggregates(const std::vector & aggregateKinds, std::vector & totals, std::vector & isTotalUpdated) override { assertex(aggregateKinds.size()==totals.size()); @@ -2096,38 +2100,47 @@ class CStatisticCollection : public CInterfaceOf unsigned pos = iteratorVec-aggregateKinds.begin(); totals[pos] += mergeStatisticValue(totals[pos], stat.queryValue(), mergeAction); updated = true; + isTotalUpdated[pos]=true; } } } else { std::vector childTotals(aggregateKinds.size()); + std::vector isChildTotalUpdated(aggregateKinds.size()); // Every entry defaults false for (auto & child : children) { - if (child.refreshAggregates(aggregateKinds, childTotals)) + if (child.refreshAggregates(aggregateKinds, childTotals, isChildTotalUpdated)) updated = true; } if (updated) { - updated=false; std::vector::iterator totalIter = totals.begin(); std::vector::iterator subTotalIter = childTotals.begin(); - std::vector::iterator kindIter = aggregateKinds.begin(); + std::vector::const_iterator kindIter = aggregateKinds.begin(); + std::vector::const_iterator isChildTotalUpdatedIter = isChildTotalUpdated.begin(); + std::vector::iterator isTotalUpdatedIter = isTotalUpdated.begin(); while (totalIter != totals.end()) { - if (updateStatistic(*kindIter, *subTotalIter, StatsMergeReplace)) - updated=true; - StatsMergeAction mergeAction = queryMergeMode(*kindIter); - (*totalIter) += mergeStatisticValue(*totalIter, *subTotalIter, mergeAction); + if (*isChildTotalUpdatedIter) + { + updateStatistic(*kindIter, *subTotalIter, StatsMergeReplace); + StatsMergeAction mergeAction = queryMergeMode(*kindIter); + *totalIter = mergeStatisticValue(*totalIter, *subTotalIter, mergeAction); + *isTotalUpdatedIter = true; + } ++totalIter; ++subTotalIter; ++kindIter; + ++isChildTotalUpdatedIter; + ++isTotalUpdatedIter; } } isDirty=false; } return updated; } + virtual void clearStats() override { stats.clear(true); @@ -2136,15 +2149,18 @@ class CStatisticCollection : public CInterfaceOf for (iter.first(); iter.isValid(); iter.next()) iter.query().clearStats(); } + virtual void addChild(IStatisticCollection *stats) override { children.add(*LINK(stats)); } + virtual void markDirty() override { isDirty=true; if (parent) parent->markDirty(); } + private: StatsScopeId id; IStatisticCollection * parent; diff --git a/system/jlib/jstats.h b/system/jlib/jstats.h index 1504ebb46af..d49589cc388 100644 --- a/system/jlib/jstats.h +++ b/system/jlib/jstats.h @@ -137,13 +137,14 @@ interface IStatisticCollection : public IInterface virtual void visit(IStatisticVisitor & target) const = 0; virtual void visitChildren(IStatisticVisitor & target) const = 0; virtual IStatisticCollection * ensureSubScope(const StatsScopeId & search, bool hasChildren) = 0; - virtual IStatisticCollection * ensureSubScopePath(std::initializer_list path) = 0; + virtual IStatisticCollection * ensureSubScope(const StatsScopeId & search, bool hasChildren, bool * wasCreated) = 0; + virtual IStatisticCollection * ensureSubScopePath(std::initializer_list path, bool * wasCreated) = 0; virtual IStatisticCollection * querySubScopePath(std::initializer_list path) = 0; virtual void addStatistic(StatisticKind kind, unsigned __int64 value) = 0; virtual bool updateStatistic(StatisticKind kind, unsigned __int64 value, StatsMergeAction mergeAction) = 0; - virtual bool refreshAggregates(std::vector & aggregateKinds, std::vector & totals) = 0; + virtual bool refreshAggregates(const std::vector & aggregateKinds, std::vector & totals, std::vector & isKindUpdated) = 0; virtual void deserialize(MemoryBuffer & in, unsigned version, int minDepth, int maxDepth) = 0; - virtual void deserializeChild(StatsScopeId scopeId, MemoryBuffer & in, unsigned version, int minDepth, int maxDepth) = 0; + virtual void deserializeChild(const StatsScopeId scopeId, MemoryBuffer & in, unsigned version, int minDepth, int maxDepth) = 0; virtual void clearStats() = 0; virtual void addChild(IStatisticCollection *stats) = 0; virtual void markDirty() = 0;