diff --git a/system/jlib/jstats.h b/system/jlib/jstats.h index 253d640c94d..e3c17798b2c 100644 --- a/system/jlib/jstats.h +++ b/system/jlib/jstats.h @@ -23,6 +23,7 @@ #include "jmutex.hpp" #include #include +#include #include "jstatcodes.h" @@ -874,6 +875,40 @@ void setStat(CRuntimeStatisticCollection & stats, INTERFACE * source, StatisticK template void setStat(CRuntimeStatisticCollection & stats, const Shared & source, StatisticKind kind) { setStat(stats, source.get(), kind); } + +typedef std::map StatKindMap; + +template +void mergeRemappedStats(CRuntimeStatisticCollection & stats, INTERFACE * source, const StatisticsMapping & mapping, const StatKindMap & remaps) +{ + if (!source) + return; + unsigned max = mapping.numStatistics(); + for (unsigned i=0; i < max; i++) + { + StatisticKind kind = mapping.getKind(i); + if (remaps.find(kind) == remaps.end()) + stats.mergeStatistic(kind, source->getStatistic(kind)); + } + for (auto remap: remaps) + { + if (mapping.hasKind(remap.second)) + stats.mergeStatistic(remap.second, source->getStatistic(remap.first)); + } +} + +template +void mergeRemappedStats(CRuntimeStatisticCollection & stats, INTERFACE * source, const StatKindMap & remaps) +{ + mergeRemappedStats(stats, source, stats.queryMapping(), remaps); +} + +template +void mergeRemappedStats(CRuntimeStatisticCollection & stats, const Shared & source, const StatKindMap & remaps) +{ + mergeRemappedStats(stats, source.get(), stats.queryMapping(), remaps); +} + //--------------------------------------------------------------------------------------------------------------------- //A class for minimizing the overhead of collecting timestamps. diff --git a/thorlcr/activities/hashdistrib/thhashdistribslave.cpp b/thorlcr/activities/hashdistrib/thhashdistribslave.cpp index 61a49ced9a9..da214c0f16a 100644 --- a/thorlcr/activities/hashdistrib/thhashdistribslave.cpp +++ b/thorlcr/activities/hashdistrib/thhashdistribslave.cpp @@ -2760,26 +2760,11 @@ class CSpill : implements IRowWriter, public CSimpleInterface ::Release(writer); writer = NULL; spillFileIO->flush(); - mergeStats(stats, this); - spillFile->noteSize(getStatistic(StSizeSpillFile)); + mergeRemappedStats(stats, spillFileIO, diskToTempStatsMap); + stats.addStatistic(StNumSpills, 1); + spillFile->noteSize(spillFileIO->getStatistic(StSizeDiskWrite)); spillFileIO.clear(); } - inline __int64 getStatistic(StatisticKind kind) const - { - switch (kind) - { - case StSizeSpillFile: - return spillFileIO->getStatistic(StSizeDiskWrite); - case StTimeSortElapsed: - return spillFileIO->getStatistic(StTimeDiskWriteIO); - case StSizeDiskWrite: - return 0; // Return file size as StSizeSpillFile kind. To avoid confusion, StSizeDiskWrite will not be returned - case StNumSpills: - return 1; - default: - return spillFileIO->getStatistic(kind); - } - } // IRowWriter virtual void putRow(const void *row) override { diff --git a/thorlcr/activities/nsplitter/thnsplitterslave.cpp b/thorlcr/activities/nsplitter/thnsplitterslave.cpp index ca9ccd0d6df..488f1a582e4 100644 --- a/thorlcr/activities/nsplitter/thnsplitterslave.cpp +++ b/thorlcr/activities/nsplitter/thnsplitterslave.cpp @@ -405,7 +405,7 @@ class NSplitterSlaveActivity : public CSlaveActivity, implements ISharedSmartBuf { PARENT::gatherActiveStats(activeStats); if (sharedRowStream) - ::mergeStats(activeStats, sharedRowStream); + mergeRemappedStats(activeStats, sharedRowStream, diskToTempStatsMap); } // ISharedSmartBufferCallback impl. virtual void paged() { pagedOut = true; } diff --git a/thorlcr/thorutil/thbuf.cpp b/thorlcr/thorutil/thbuf.cpp index 42457777dae..ece18494073 100644 --- a/thorlcr/thorutil/thbuf.cpp +++ b/thorlcr/thorutil/thbuf.cpp @@ -2150,21 +2150,9 @@ class CSharedWriteAheadDisk : public CSharedWriteAheadBase } virtual unsigned __int64 getStatistic(StatisticKind kind) const override { - switch (kind) - { - case StSizeSpillFile: - return tempFileIO->getStatistic(StSizeDiskWrite); - case StCycleDiskWriteIOCycles: - case StTimeDiskWriteIO: - case StSizeDiskWrite: - return 0; - case StNumSpills: - return 1; - case StTimeSpillElapsed: - return tempFileIO->getStatistic(StCycleDiskWriteIOCycles); - default: - return tempFileIO->getStatistic(kind); - } + if (kind==StNumSpills) + return 1; + return tempFileIO->getStatistic(kind); } }; @@ -2493,11 +2481,11 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOfflush(); tempFileOwner->noteSize(iFileIO->getStatistic(StSizeDiskWrite)); ::mergeStats(inactiveStats, iFileIO); iFileIO.clear(); - outputStream.clear(); } } void createOutputStream() @@ -2508,6 +2496,7 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf(res)); iFileIO.setown(std::get<1>(res)); totalInputRowsRead = inMemTotalRows; + inactiveStats.addStatistic(StNumSpills, 1); } void writeRowsFromInput() { @@ -2549,6 +2538,7 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOfflush(); totalInputRowsRead.fetch_add(newRowsWritten); tempFileOwner->noteSize(iFileIO->getStatistic(StSizeDiskWrite)); + ::mergeStats(inactiveStats, iFileIO); // JCSMORE - could track size written, and start new file at this point (e.g. every 100MB), // and track their starting points (by row #) in a vector // We could then tell if/when the readers catch up, and remove consumed files as they do. @@ -2726,29 +2716,7 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOfgetStatistic(useKind); - v += inactiveStats.getStatisticValue(useKind); - return v; + return inactiveStats.getStatisticValue(kind); } }; diff --git a/thorlcr/thorutil/thormisc.cpp b/thorlcr/thorutil/thormisc.cpp index 09b3fc233af..32ab87ebf26 100644 --- a/thorlcr/thorutil/thormisc.cpp +++ b/thorlcr/thorutil/thormisc.cpp @@ -99,6 +99,11 @@ const StatisticsMapping hashDistribActivityStatistics({StNumLocalRows, StNumRemo const StatisticsMapping nsplitterActivityStatistics({}, spillStatistics, basicActivityStatistics); const StatisticsMapping spillingWriteAheadStatistics({}, spillStatistics); +const StatKindMap diskToTempStatsMap +={ {StSizeDiskWrite, StSizeSpillFile}, + {StTimeDiskWriteIO, StTimeSpillElapsed} + }; + MODULE_INIT(INIT_PRIORITY_STANDARD) { ClusterMPAllocator.setown(createMPtagRangeAllocator(MPTAG_THORGLOBAL_BASE,MPTAG_THORGLOBAL_COUNT)); diff --git a/thorlcr/thorutil/thormisc.hpp b/thorlcr/thorutil/thormisc.hpp index cf0b92bb4b0..a5db68707e5 100644 --- a/thorlcr/thorutil/thormisc.hpp +++ b/thorlcr/thorutil/thormisc.hpp @@ -170,6 +170,9 @@ extern graph_decl const StatisticsMapping hashDistribActivityStatistics; extern graph_decl const StatisticsMapping nsplitterActivityStatistics; extern graph_decl const StatisticsMapping spillingWriteAheadStatistics; +// Maps disk related stats to spill stats +extern graph_decl const std::map diskToTempStatsMap; + class BooleanOnOff { bool &tf;