Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPCC-32138 Generic remapping merge function to remap disk stats to spill stats #18819

Merged
merged 1 commit into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions system/jlib/jstats.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "jmutex.hpp"
#include <vector>
#include <initializer_list>
#include <map>

#include "jstatcodes.h"

Expand Down Expand Up @@ -838,6 +839,21 @@ void mergeStat(CRuntimeStatisticCollection & stats, INTERFACE * source, Statisti
template <class INTERFACE>
void mergeStat(CRuntimeStatisticCollection & stats, const Shared<INTERFACE> & source, StatisticKind kind) { mergeStat(stats, source.get(), kind); }

// helper templates that add delta of previous vs current (from source) to tgtStats (and update prevStats)
template <class INTERFACE>
void updateStatsDelta(CRuntimeStatisticCollection & tgtStats, CRuntimeStatisticCollection & prevStats, INTERFACE * source)
{
CRuntimeStatisticCollection curStats(tgtStats.queryMapping());
mergeStats(curStats, source);
prevStats.updateDelta(tgtStats, curStats); // NB: adds delta to tgtStats, and updates prevStats
}

template <class INTERFACE>
void updateStatsDelta(CRuntimeStatisticCollection & tgtStats, CRuntimeStatisticCollection & prevStats, const Shared<INTERFACE> & source)
{
updateStatsDelta(tgtStats, prevStats, source.get());
}


//Some template helper classes for overwriting/setting statistics from external sources.

Expand Down Expand Up @@ -874,6 +890,55 @@ void setStat(CRuntimeStatisticCollection & stats, INTERFACE * source, StatisticK
template <class INTERFACE>
void setStat(CRuntimeStatisticCollection & stats, const Shared<INTERFACE> & source, StatisticKind kind) { setStat(stats, source.get(), kind); }


typedef std::map<StatisticKind, StatisticKind> StatKindMap;

template <class INTERFACE>
void mergeRemappedStats(CRuntimeStatisticCollection & stats, INTERFACE * source, const StatisticsMapping & mapping, const StatKindMap & remaps)
{
if (!source)
return;
unsigned max = mapping.numStatistics();
for (unsigned i=0; i < max; i++)
{
StatisticKind kind = mapping.getKind(i);
if (remaps.find(kind) == remaps.end())
stats.mergeStatistic(kind, source->getStatistic(kind));
}
for (auto remap: remaps)
{
if (mapping.hasKind(remap.second))
stats.mergeStatistic(remap.second, source->getStatistic(remap.first));
}
}

template <class INTERFACE>
void mergeRemappedStats(CRuntimeStatisticCollection & stats, INTERFACE * source, const StatKindMap & remaps)
{
mergeRemappedStats(stats, source, stats.queryMapping(), remaps);
}

template <class INTERFACE>
void mergeRemappedStats(CRuntimeStatisticCollection & stats, const Shared<INTERFACE> & source, const StatKindMap & remaps)
{
mergeRemappedStats(stats, source.get(), stats.queryMapping(), remaps);
}

template <class INTERFACE>
void updateRemappedStatsDelta(CRuntimeStatisticCollection & tgtStats, CRuntimeStatisticCollection & prevStats, INTERFACE * source, const StatKindMap & remap)
{
CRuntimeStatisticCollection curStats(tgtStats.queryMapping());
::mergeRemappedStats(curStats, source, remap);
prevStats.updateDelta(tgtStats, curStats); // NB: adds delta to tgtStats, and updates prevStats
}

template <class INTERFACE>
void updateRemappedStatsDelta(CRuntimeStatisticCollection & tgtStats, CRuntimeStatisticCollection & prevStats, const Shared<INTERFACE> & source, const StatKindMap & remap)
{
updateRemappedStatsDelta(tgtStats, prevStats, source.get(), remap);
}


//---------------------------------------------------------------------------------------------------------------------

//A class for minimizing the overhead of collecting timestamps.
Expand Down
21 changes: 3 additions & 18 deletions thorlcr/activities/hashdistrib/thhashdistribslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2760,26 +2760,11 @@ class CSpill : implements IRowWriter, public CSimpleInterface
::Release(writer);
writer = NULL;
spillFileIO->flush();
mergeStats(stats, this);
spillFile->noteSize(getStatistic(StSizeSpillFile));
mergeRemappedStats(stats, spillFileIO, diskToTempStatsMap);
stats.addStatistic(StNumSpills, 1);
spillFile->noteSize(spillFileIO->getStatistic(StSizeDiskWrite));
spillFileIO.clear();
}
inline __int64 getStatistic(StatisticKind kind) const
{
switch (kind)
{
case StSizeSpillFile:
return spillFileIO->getStatistic(StSizeDiskWrite);
case StTimeSortElapsed:
return spillFileIO->getStatistic(StTimeDiskWriteIO);
case StSizeDiskWrite:
return 0; // Return file size as StSizeSpillFile kind. To avoid confusion, StSizeDiskWrite will not be returned
case StNumSpills:
return 1;
default:
return spillFileIO->getStatistic(kind);
}
}
// IRowWriter
virtual void putRow(const void *row) override
{
Expand Down
53 changes: 12 additions & 41 deletions thorlcr/thorutil/thbuf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2150,21 +2150,9 @@ class CSharedWriteAheadDisk : public CSharedWriteAheadBase
}
virtual unsigned __int64 getStatistic(StatisticKind kind) const override
{
switch (kind)
{
case StSizeSpillFile:
return tempFileIO->getStatistic(StSizeDiskWrite);
case StCycleDiskWriteIOCycles:
case StTimeDiskWriteIO:
case StSizeDiskWrite:
return 0;
case StNumSpills:
return 1;
case StTimeSpillElapsed:
return tempFileIO->getStatistic(StCycleDiskWriteIOCycles);
default:
return tempFileIO->getStatistic(kind);
}
if (kind==StNumSpills)
return 1;
jakesmith marked this conversation as resolved.
Show resolved Hide resolved
return tempFileIO->getStatistic(kind);
}
};

Expand Down Expand Up @@ -2464,6 +2452,7 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader
SharedRowStreamReaderOptions options;
size32_t inMemReadAheadGranularity = 0;
CRuntimeStatisticCollection inactiveStats;
CRuntimeStatisticCollection previousFileStats;
StringAttr baseTmpFilename;


Expand Down Expand Up @@ -2493,21 +2482,24 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader
{
if (outputStream)
{
outputStream.clear();
iFileIO->flush();
tempFileOwner->noteSize(iFileIO->getStatistic(StSizeDiskWrite));
::mergeStats(inactiveStats, iFileIO);
updateRemappedStatsDelta(inactiveStats, previousFileStats, iFileIO, diskToTempStatsMap); // NB: also updates prev to current
previousFileStats.reset();
iFileIO.clear();
outputStream.clear();
}
}
void createOutputStream()
{
closeWriter(); // Ensure stats from closing files are preserved in inactiveStats
// NB: Called once, when spilling starts.
tempFileOwner.setown(activity.createOwnedTempFile(baseTmpFilename));
auto res = createSerialOutputStream(&(tempFileOwner->queryIFile()), compressHandler, options, numOutputs + 1);
outputStream.setown(std::get<0>(res));
iFileIO.setown(std::get<1>(res));
totalInputRowsRead = inMemTotalRows;
inactiveStats.addStatistic(StNumSpills, 1);
jakesmith marked this conversation as resolved.
Show resolved Hide resolved
}
void writeRowsFromInput()
{
Expand Down Expand Up @@ -2549,6 +2541,7 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader
outputStream->flush();
totalInputRowsRead.fetch_add(newRowsWritten);
tempFileOwner->noteSize(iFileIO->getStatistic(StSizeDiskWrite));
updateRemappedStatsDelta(inactiveStats, previousFileStats, iFileIO, diskToTempStatsMap); // NB: also updates prev to current
// JCSMORE - could track size written, and start new file at this point (e.g. every 100MB),
// and track their starting points (by row #) in a vector
// We could then tell if/when the readers catch up, and remove consumed files as they do.
Expand All @@ -2562,7 +2555,7 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader
explicit CSharedFullSpillingWriteAhead(CActivityBase *_activity, unsigned _numOutputs, IRowStream *_input, bool _inputGrouped, const SharedRowStreamReaderOptions &_options, IThorRowInterfaces *rowIf, const char *_baseTmpFilename, ICompressHandler *_compressHandler)
: activity(*_activity), numOutputs(_numOutputs), input(_input), inputGrouped(_inputGrouped), options(_options), compressHandler(_compressHandler), baseTmpFilename(_baseTmpFilename),
meta(rowIf->queryRowMetaData()), serializer(rowIf->queryRowSerializer()), allocator(rowIf->queryRowAllocator()), deserializer(rowIf->queryRowDeserializer()),
inactiveStats(spillingWriteAheadStatistics)
inactiveStats(spillingWriteAheadStatistics), previousFileStats(spillingWriteAheadStatistics)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't quite the right mapping - it shouldn't include StSizePeakTempDisk, but it is good enough for this PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think changed in the other pending PR - which will therefore need rebasing once this is merged.

{
assertex(input);

Expand Down Expand Up @@ -2726,29 +2719,7 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader
}
virtual unsigned __int64 getStatistic(StatisticKind kind) const override
{
StatisticKind useKind;
switch (kind)
{
case StSizeSpillFile:
useKind = StSizeDiskWrite;
break;
case StCycleDiskWriteIOCycles:
case StTimeDiskWriteIO:
case StSizeDiskWrite:
return 0;
case StNumSpills:
return 1;
case StTimeSpillElapsed:
useKind = StCycleDiskWriteIOCycles;
break;
default:
useKind = kind;
}
unsigned __int64 v = 0;
if (likely(iFileIO))
v = iFileIO->getStatistic(useKind);
v += inactiveStats.getStatisticValue(useKind);
return v;
return inactiveStats.getStatisticValue(kind);
}
};

Expand Down
5 changes: 5 additions & 0 deletions thorlcr/thorutil/thormisc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ const StatisticsMapping hashDistribActivityStatistics({StNumLocalRows, StNumRemo
const StatisticsMapping nsplitterActivityStatistics({}, spillStatistics, basicActivityStatistics);
const StatisticsMapping spillingWriteAheadStatistics({}, spillStatistics);

const StatKindMap diskToTempStatsMap
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should ideally have a comment to indicate which is the source and which is the target

={ {StSizeDiskWrite, StSizeSpillFile},
{StTimeDiskWriteIO, StTimeSpillElapsed}
jakesmith marked this conversation as resolved.
Show resolved Hide resolved
};

MODULE_INIT(INIT_PRIORITY_STANDARD)
{
ClusterMPAllocator.setown(createMPtagRangeAllocator(MPTAG_THORGLOBAL_BASE,MPTAG_THORGLOBAL_COUNT));
Expand Down
3 changes: 3 additions & 0 deletions thorlcr/thorutil/thormisc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,9 @@ extern graph_decl const StatisticsMapping hashDistribActivityStatistics;
extern graph_decl const StatisticsMapping nsplitterActivityStatistics;
extern graph_decl const StatisticsMapping spillingWriteAheadStatistics;

// Maps disk related stats to spill stats
extern graph_decl const std::map<StatisticKind, StatisticKind> diskToTempStatsMap;

class BooleanOnOff
{
bool &tf;
Expand Down
Loading