Skip to content

Commit

Permalink
HPCC-32138 Generic remapping merge function to remap disk stats to sp…
Browse files Browse the repository at this point in the history
…ill stats

Signed-off-by: Shamser Ahmed <[email protected]>
Signed-off-by: Jake Smith <[email protected]>
  • Loading branch information
shamser authored and jakesmith committed Jul 26, 2024
1 parent 466476f commit da14967
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 59 deletions.
65 changes: 65 additions & 0 deletions system/jlib/jstats.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "jmutex.hpp"
#include <vector>
#include <initializer_list>
#include <map>

#include "jstatcodes.h"

Expand Down Expand Up @@ -838,6 +839,21 @@ void mergeStat(CRuntimeStatisticCollection & stats, INTERFACE * source, Statisti
template <class INTERFACE>
void mergeStat(CRuntimeStatisticCollection & stats, const Shared<INTERFACE> & source, StatisticKind kind) { mergeStat(stats, source.get(), kind); }

// helper templates that add delta of previous vs current (from source) to tgtStats (and update prevStats)
template <class INTERFACE>
void updateStatsDelta(CRuntimeStatisticCollection & tgtStats, CRuntimeStatisticCollection & prevStats, INTERFACE * source)
{
CRuntimeStatisticCollection curStats(tgtStats.queryMapping());
mergeStats(curStats, source);
prevStats.updateDelta(tgtStats, curStats); // NB: adds delta to tgtStats, and updates prevStats
}

template <class INTERFACE>
void updateStatsDelta(CRuntimeStatisticCollection & tgtStats, CRuntimeStatisticCollection & prevStats, const Shared<INTERFACE> & source)
{
updateStatsDelta(tgtStats, prevStats, source.get());
}


//Some template helper classes for overwriting/setting statistics from external sources.

Expand Down Expand Up @@ -874,6 +890,55 @@ void setStat(CRuntimeStatisticCollection & stats, INTERFACE * source, StatisticK
template <class INTERFACE>
void setStat(CRuntimeStatisticCollection & stats, const Shared<INTERFACE> & source, StatisticKind kind) { setStat(stats, source.get(), kind); }


typedef std::map<StatisticKind, StatisticKind> StatKindMap;

template <class INTERFACE>
void mergeRemappedStats(CRuntimeStatisticCollection & stats, INTERFACE * source, const StatisticsMapping & mapping, const StatKindMap & remaps)
{
if (!source)
return;
unsigned max = mapping.numStatistics();
for (unsigned i=0; i < max; i++)
{
StatisticKind kind = mapping.getKind(i);
if (remaps.find(kind) == remaps.end())
stats.mergeStatistic(kind, source->getStatistic(kind));
}
for (auto remap: remaps)
{
if (mapping.hasKind(remap.second))
stats.mergeStatistic(remap.second, source->getStatistic(remap.first));
}
}

template <class INTERFACE>
void mergeRemappedStats(CRuntimeStatisticCollection & stats, INTERFACE * source, const StatKindMap & remaps)
{
mergeRemappedStats(stats, source, stats.queryMapping(), remaps);
}

template <class INTERFACE>
void mergeRemappedStats(CRuntimeStatisticCollection & stats, const Shared<INTERFACE> & source, const StatKindMap & remaps)
{
mergeRemappedStats(stats, source.get(), stats.queryMapping(), remaps);
}

template <class INTERFACE>
void updateRemappedStatsDelta(CRuntimeStatisticCollection & tgtStats, CRuntimeStatisticCollection & prevStats, INTERFACE * source, const StatKindMap & remap)
{
CRuntimeStatisticCollection curStats(tgtStats.queryMapping());
::mergeRemappedStats(curStats, source, remap);
prevStats.updateDelta(tgtStats, curStats); // NB: adds delta to tgtStats, and updates prevStats
}

template <class INTERFACE>
void updateRemappedStatsDelta(CRuntimeStatisticCollection & tgtStats, CRuntimeStatisticCollection & prevStats, const Shared<INTERFACE> & source, const StatKindMap & remap)
{
updateRemappedStatsDelta(tgtStats, prevStats, source.get(), remap);
}


//---------------------------------------------------------------------------------------------------------------------

//A class for minimizing the overhead of collecting timestamps.
Expand Down
21 changes: 3 additions & 18 deletions thorlcr/activities/hashdistrib/thhashdistribslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2760,26 +2760,11 @@ class CSpill : implements IRowWriter, public CSimpleInterface
::Release(writer);
writer = NULL;
spillFileIO->flush();
mergeStats(stats, this);
spillFile->noteSize(getStatistic(StSizeSpillFile));
mergeRemappedStats(stats, spillFileIO, diskToTempStatsMap);
stats.addStatistic(StNumSpills, 1);
spillFile->noteSize(spillFileIO->getStatistic(StSizeDiskWrite));
spillFileIO.clear();
}
inline __int64 getStatistic(StatisticKind kind) const
{
switch (kind)
{
case StSizeSpillFile:
return spillFileIO->getStatistic(StSizeDiskWrite);
case StTimeSortElapsed:
return spillFileIO->getStatistic(StTimeDiskWriteIO);
case StSizeDiskWrite:
return 0; // Return file size as StSizeSpillFile kind. To avoid confusion, StSizeDiskWrite will not be returned
case StNumSpills:
return 1;
default:
return spillFileIO->getStatistic(kind);
}
}
// IRowWriter
virtual void putRow(const void *row) override
{
Expand Down
53 changes: 12 additions & 41 deletions thorlcr/thorutil/thbuf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2150,21 +2150,9 @@ class CSharedWriteAheadDisk : public CSharedWriteAheadBase
}
virtual unsigned __int64 getStatistic(StatisticKind kind) const override
{
switch (kind)
{
case StSizeSpillFile:
return tempFileIO->getStatistic(StSizeDiskWrite);
case StCycleDiskWriteIOCycles:
case StTimeDiskWriteIO:
case StSizeDiskWrite:
return 0;
case StNumSpills:
return 1;
case StTimeSpillElapsed:
return tempFileIO->getStatistic(StCycleDiskWriteIOCycles);
default:
return tempFileIO->getStatistic(kind);
}
if (kind==StNumSpills)
return 1;
return tempFileIO->getStatistic(kind);
}
};

Expand Down Expand Up @@ -2464,6 +2452,7 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader
SharedRowStreamReaderOptions options;
size32_t inMemReadAheadGranularity = 0;
CRuntimeStatisticCollection inactiveStats;
CRuntimeStatisticCollection previousFileStats;
StringAttr baseTmpFilename;


Expand Down Expand Up @@ -2493,21 +2482,24 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader
{
if (outputStream)
{
outputStream.clear();
iFileIO->flush();
tempFileOwner->noteSize(iFileIO->getStatistic(StSizeDiskWrite));
::mergeStats(inactiveStats, iFileIO);
updateRemappedStatsDelta(inactiveStats, previousFileStats, iFileIO, diskToTempStatsMap); // NB: also updates prev to current
previousFileStats.reset();
iFileIO.clear();
outputStream.clear();
}
}
void createOutputStream()
{
closeWriter(); // Ensure stats from closing files are preserved in inactiveStats
// NB: Called once, when spilling starts.
tempFileOwner.setown(activity.createOwnedTempFile(baseTmpFilename));
auto res = createSerialOutputStream(&(tempFileOwner->queryIFile()), compressHandler, options, numOutputs + 1);
outputStream.setown(std::get<0>(res));
iFileIO.setown(std::get<1>(res));
totalInputRowsRead = inMemTotalRows;
inactiveStats.addStatistic(StNumSpills, 1);
}
void writeRowsFromInput()
{
Expand Down Expand Up @@ -2549,6 +2541,7 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader
outputStream->flush();
totalInputRowsRead.fetch_add(newRowsWritten);
tempFileOwner->noteSize(iFileIO->getStatistic(StSizeDiskWrite));
updateRemappedStatsDelta(inactiveStats, previousFileStats, iFileIO, diskToTempStatsMap); // NB: also updates prev to current
// JCSMORE - could track size written, and start new file at this point (e.g. every 100MB),
// and track their starting points (by row #) in a vector
// We could then tell if/when the readers catch up, and remove consumed files as they do.
Expand All @@ -2562,7 +2555,7 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader
explicit CSharedFullSpillingWriteAhead(CActivityBase *_activity, unsigned _numOutputs, IRowStream *_input, bool _inputGrouped, const SharedRowStreamReaderOptions &_options, IThorRowInterfaces *rowIf, const char *_baseTmpFilename, ICompressHandler *_compressHandler)
: activity(*_activity), numOutputs(_numOutputs), input(_input), inputGrouped(_inputGrouped), options(_options), compressHandler(_compressHandler), baseTmpFilename(_baseTmpFilename),
meta(rowIf->queryRowMetaData()), serializer(rowIf->queryRowSerializer()), allocator(rowIf->queryRowAllocator()), deserializer(rowIf->queryRowDeserializer()),
inactiveStats(spillingWriteAheadStatistics)
inactiveStats(spillingWriteAheadStatistics), previousFileStats(spillingWriteAheadStatistics)
{
assertex(input);

Expand Down Expand Up @@ -2726,29 +2719,7 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader
}
virtual unsigned __int64 getStatistic(StatisticKind kind) const override
{
StatisticKind useKind;
switch (kind)
{
case StSizeSpillFile:
useKind = StSizeDiskWrite;
break;
case StCycleDiskWriteIOCycles:
case StTimeDiskWriteIO:
case StSizeDiskWrite:
return 0;
case StNumSpills:
return 1;
case StTimeSpillElapsed:
useKind = StCycleDiskWriteIOCycles;
break;
default:
useKind = kind;
}
unsigned __int64 v = 0;
if (likely(iFileIO))
v = iFileIO->getStatistic(useKind);
v += inactiveStats.getStatisticValue(useKind);
return v;
return inactiveStats.getStatisticValue(kind);
}
};

Expand Down
5 changes: 5 additions & 0 deletions thorlcr/thorutil/thormisc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ const StatisticsMapping hashDistribActivityStatistics({StNumLocalRows, StNumRemo
const StatisticsMapping nsplitterActivityStatistics({}, spillStatistics, basicActivityStatistics);
const StatisticsMapping spillingWriteAheadStatistics({}, spillStatistics);

const StatKindMap diskToTempStatsMap
={ {StSizeDiskWrite, StSizeSpillFile},
{StTimeDiskWriteIO, StTimeSpillElapsed}
};

MODULE_INIT(INIT_PRIORITY_STANDARD)
{
ClusterMPAllocator.setown(createMPtagRangeAllocator(MPTAG_THORGLOBAL_BASE,MPTAG_THORGLOBAL_COUNT));
Expand Down
3 changes: 3 additions & 0 deletions thorlcr/thorutil/thormisc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,9 @@ extern graph_decl const StatisticsMapping hashDistribActivityStatistics;
extern graph_decl const StatisticsMapping nsplitterActivityStatistics;
extern graph_decl const StatisticsMapping spillingWriteAheadStatistics;

// Maps disk related stats to spill stats
extern graph_decl const std::map<StatisticKind, StatisticKind> diskToTempStatsMap;

class BooleanOnOff
{
bool &tf;
Expand Down

0 comments on commit da14967

Please sign in to comment.