From da14967b506dc1e92c046560a919292da4894b81 Mon Sep 17 00:00:00 2001 From: Shamser Ahmed Date: Thu, 27 Jun 2024 09:49:59 +0100 Subject: [PATCH] HPCC-32138 Generic remapping merge function to remap disk stats to spill stats Signed-off-by: Shamser Ahmed Signed-off-by: Jake Smith --- system/jlib/jstats.h | 65 +++++++++++++++++++ .../hashdistrib/thhashdistribslave.cpp | 21 +----- thorlcr/thorutil/thbuf.cpp | 53 ++++----------- thorlcr/thorutil/thormisc.cpp | 5 ++ thorlcr/thorutil/thormisc.hpp | 3 + 5 files changed, 88 insertions(+), 59 deletions(-) diff --git a/system/jlib/jstats.h b/system/jlib/jstats.h index 253d640c94d..3c76fa60175 100644 --- a/system/jlib/jstats.h +++ b/system/jlib/jstats.h @@ -23,6 +23,7 @@ #include "jmutex.hpp" #include #include +#include #include "jstatcodes.h" @@ -838,6 +839,21 @@ void mergeStat(CRuntimeStatisticCollection & stats, INTERFACE * source, Statisti template void mergeStat(CRuntimeStatisticCollection & stats, const Shared & source, StatisticKind kind) { mergeStat(stats, source.get(), kind); } +// helper templates that add delta of previous vs current (from source) to tgtStats (and update prevStats) +template +void updateStatsDelta(CRuntimeStatisticCollection & tgtStats, CRuntimeStatisticCollection & prevStats, INTERFACE * source) +{ + CRuntimeStatisticCollection curStats(tgtStats.queryMapping()); + mergeStats(curStats, source); + prevStats.updateDelta(tgtStats, curStats); // NB: adds delta to tgtStats, and updates prevStats +} + +template +void updateStatsDelta(CRuntimeStatisticCollection & tgtStats, CRuntimeStatisticCollection & prevStats, const Shared & source) +{ + updateStatsDelta(tgtStats, prevStats, source.get()); +} + //Some template helper classes for overwriting/setting statistics from external sources. @@ -874,6 +890,55 @@ void setStat(CRuntimeStatisticCollection & stats, INTERFACE * source, StatisticK template void setStat(CRuntimeStatisticCollection & stats, const Shared & source, StatisticKind kind) { setStat(stats, source.get(), kind); } + +typedef std::map StatKindMap; + +template +void mergeRemappedStats(CRuntimeStatisticCollection & stats, INTERFACE * source, const StatisticsMapping & mapping, const StatKindMap & remaps) +{ + if (!source) + return; + unsigned max = mapping.numStatistics(); + for (unsigned i=0; i < max; i++) + { + StatisticKind kind = mapping.getKind(i); + if (remaps.find(kind) == remaps.end()) + stats.mergeStatistic(kind, source->getStatistic(kind)); + } + for (auto remap: remaps) + { + if (mapping.hasKind(remap.second)) + stats.mergeStatistic(remap.second, source->getStatistic(remap.first)); + } +} + +template +void mergeRemappedStats(CRuntimeStatisticCollection & stats, INTERFACE * source, const StatKindMap & remaps) +{ + mergeRemappedStats(stats, source, stats.queryMapping(), remaps); +} + +template +void mergeRemappedStats(CRuntimeStatisticCollection & stats, const Shared & source, const StatKindMap & remaps) +{ + mergeRemappedStats(stats, source.get(), stats.queryMapping(), remaps); +} + +template +void updateRemappedStatsDelta(CRuntimeStatisticCollection & tgtStats, CRuntimeStatisticCollection & prevStats, INTERFACE * source, const StatKindMap & remap) +{ + CRuntimeStatisticCollection curStats(tgtStats.queryMapping()); + ::mergeRemappedStats(curStats, source, remap); + prevStats.updateDelta(tgtStats, curStats); // NB: adds delta to tgtStats, and updates prevStats +} + +template +void updateRemappedStatsDelta(CRuntimeStatisticCollection & tgtStats, CRuntimeStatisticCollection & prevStats, const Shared & source, const StatKindMap & remap) +{ + updateRemappedStatsDelta(tgtStats, prevStats, source.get(), remap); +} + + //--------------------------------------------------------------------------------------------------------------------- //A class for minimizing the overhead of collecting timestamps. diff --git a/thorlcr/activities/hashdistrib/thhashdistribslave.cpp b/thorlcr/activities/hashdistrib/thhashdistribslave.cpp index 61a49ced9a9..da214c0f16a 100644 --- a/thorlcr/activities/hashdistrib/thhashdistribslave.cpp +++ b/thorlcr/activities/hashdistrib/thhashdistribslave.cpp @@ -2760,26 +2760,11 @@ class CSpill : implements IRowWriter, public CSimpleInterface ::Release(writer); writer = NULL; spillFileIO->flush(); - mergeStats(stats, this); - spillFile->noteSize(getStatistic(StSizeSpillFile)); + mergeRemappedStats(stats, spillFileIO, diskToTempStatsMap); + stats.addStatistic(StNumSpills, 1); + spillFile->noteSize(spillFileIO->getStatistic(StSizeDiskWrite)); spillFileIO.clear(); } - inline __int64 getStatistic(StatisticKind kind) const - { - switch (kind) - { - case StSizeSpillFile: - return spillFileIO->getStatistic(StSizeDiskWrite); - case StTimeSortElapsed: - return spillFileIO->getStatistic(StTimeDiskWriteIO); - case StSizeDiskWrite: - return 0; // Return file size as StSizeSpillFile kind. To avoid confusion, StSizeDiskWrite will not be returned - case StNumSpills: - return 1; - default: - return spillFileIO->getStatistic(kind); - } - } // IRowWriter virtual void putRow(const void *row) override { diff --git a/thorlcr/thorutil/thbuf.cpp b/thorlcr/thorutil/thbuf.cpp index 42457777dae..dffd5dc39b1 100644 --- a/thorlcr/thorutil/thbuf.cpp +++ b/thorlcr/thorutil/thbuf.cpp @@ -2150,21 +2150,9 @@ class CSharedWriteAheadDisk : public CSharedWriteAheadBase } virtual unsigned __int64 getStatistic(StatisticKind kind) const override { - switch (kind) - { - case StSizeSpillFile: - return tempFileIO->getStatistic(StSizeDiskWrite); - case StCycleDiskWriteIOCycles: - case StTimeDiskWriteIO: - case StSizeDiskWrite: - return 0; - case StNumSpills: - return 1; - case StTimeSpillElapsed: - return tempFileIO->getStatistic(StCycleDiskWriteIOCycles); - default: - return tempFileIO->getStatistic(kind); - } + if (kind==StNumSpills) + return 1; + return tempFileIO->getStatistic(kind); } }; @@ -2464,6 +2452,7 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOfflush(); tempFileOwner->noteSize(iFileIO->getStatistic(StSizeDiskWrite)); - ::mergeStats(inactiveStats, iFileIO); + updateRemappedStatsDelta(inactiveStats, previousFileStats, iFileIO, diskToTempStatsMap); // NB: also updates prev to current + previousFileStats.reset(); iFileIO.clear(); - outputStream.clear(); } } void createOutputStream() { + closeWriter(); // Ensure stats from closing files are preserved in inactiveStats // NB: Called once, when spilling starts. tempFileOwner.setown(activity.createOwnedTempFile(baseTmpFilename)); auto res = createSerialOutputStream(&(tempFileOwner->queryIFile()), compressHandler, options, numOutputs + 1); outputStream.setown(std::get<0>(res)); iFileIO.setown(std::get<1>(res)); totalInputRowsRead = inMemTotalRows; + inactiveStats.addStatistic(StNumSpills, 1); } void writeRowsFromInput() { @@ -2549,6 +2541,7 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOfflush(); totalInputRowsRead.fetch_add(newRowsWritten); tempFileOwner->noteSize(iFileIO->getStatistic(StSizeDiskWrite)); + updateRemappedStatsDelta(inactiveStats, previousFileStats, iFileIO, diskToTempStatsMap); // NB: also updates prev to current // JCSMORE - could track size written, and start new file at this point (e.g. every 100MB), // and track their starting points (by row #) in a vector // We could then tell if/when the readers catch up, and remove consumed files as they do. @@ -2562,7 +2555,7 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOfqueryRowMetaData()), serializer(rowIf->queryRowSerializer()), allocator(rowIf->queryRowAllocator()), deserializer(rowIf->queryRowDeserializer()), - inactiveStats(spillingWriteAheadStatistics) + inactiveStats(spillingWriteAheadStatistics), previousFileStats(spillingWriteAheadStatistics) { assertex(input); @@ -2726,29 +2719,7 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOfgetStatistic(useKind); - v += inactiveStats.getStatisticValue(useKind); - return v; + return inactiveStats.getStatisticValue(kind); } }; diff --git a/thorlcr/thorutil/thormisc.cpp b/thorlcr/thorutil/thormisc.cpp index 09b3fc233af..32ab87ebf26 100644 --- a/thorlcr/thorutil/thormisc.cpp +++ b/thorlcr/thorutil/thormisc.cpp @@ -99,6 +99,11 @@ const StatisticsMapping hashDistribActivityStatistics({StNumLocalRows, StNumRemo const StatisticsMapping nsplitterActivityStatistics({}, spillStatistics, basicActivityStatistics); const StatisticsMapping spillingWriteAheadStatistics({}, spillStatistics); +const StatKindMap diskToTempStatsMap +={ {StSizeDiskWrite, StSizeSpillFile}, + {StTimeDiskWriteIO, StTimeSpillElapsed} + }; + MODULE_INIT(INIT_PRIORITY_STANDARD) { ClusterMPAllocator.setown(createMPtagRangeAllocator(MPTAG_THORGLOBAL_BASE,MPTAG_THORGLOBAL_COUNT)); diff --git a/thorlcr/thorutil/thormisc.hpp b/thorlcr/thorutil/thormisc.hpp index cf0b92bb4b0..a5db68707e5 100644 --- a/thorlcr/thorutil/thormisc.hpp +++ b/thorlcr/thorutil/thormisc.hpp @@ -170,6 +170,9 @@ extern graph_decl const StatisticsMapping hashDistribActivityStatistics; extern graph_decl const StatisticsMapping nsplitterActivityStatistics; extern graph_decl const StatisticsMapping spillingWriteAheadStatistics; +// Maps disk related stats to spill stats +extern graph_decl const std::map diskToTempStatsMap; + class BooleanOnOff { bool &tf;