Skip to content

Commit

Permalink
HPCC-32138 Generic remapping merge function to remap disk stats to sp…
Browse files Browse the repository at this point in the history
…ill stats

Signed-off-by: Shamser Ahmed <[email protected]>

HPCC-32138 Changes following review

Signed-off-by: Shamser Ahmed <[email protected]>
  • Loading branch information
shamser committed Jul 16, 2024
1 parent 52757b0 commit 0594162
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 58 deletions.
35 changes: 35 additions & 0 deletions system/jlib/jstats.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "jmutex.hpp"
#include <vector>
#include <initializer_list>
#include <map>

#include "jstatcodes.h"

Expand Down Expand Up @@ -874,6 +875,40 @@ void setStat(CRuntimeStatisticCollection & stats, INTERFACE * source, StatisticK
template <class INTERFACE>
void setStat(CRuntimeStatisticCollection & stats, const Shared<INTERFACE> & source, StatisticKind kind) { setStat(stats, source.get(), kind); }


typedef std::map<StatisticKind, StatisticKind> StatKindMap;

template <class INTERFACE>
void mergeRemappedStats(CRuntimeStatisticCollection & stats, INTERFACE * source, const StatisticsMapping & mapping, const StatKindMap & remaps)
{
if (!source)
return;
unsigned max = mapping.numStatistics();
for (unsigned i=0; i < max; i++)
{
StatisticKind kind = mapping.getKind(i);
if (remaps.find(kind) == remaps.end())
stats.mergeStatistic(kind, source->getStatistic(kind));
}
for (auto remap: remaps)
{
if (mapping.hasKind(remap.second))
stats.mergeStatistic(remap.second, source->getStatistic(remap.first));
}
}

template <class INTERFACE>
void mergeRemappedStats(CRuntimeStatisticCollection & stats, INTERFACE * source, const StatKindMap & remaps)
{
mergeRemappedStats(stats, source, stats.queryMapping(), remaps);
}

template <class INTERFACE>
void mergeRemappedStats(CRuntimeStatisticCollection & stats, const Shared<INTERFACE> & source, const StatKindMap & remaps)
{
mergeRemappedStats(stats, source.get(), stats.queryMapping(), remaps);
}

//---------------------------------------------------------------------------------------------------------------------

//A class for minimizing the overhead of collecting timestamps.
Expand Down
21 changes: 3 additions & 18 deletions thorlcr/activities/hashdistrib/thhashdistribslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2760,26 +2760,11 @@ class CSpill : implements IRowWriter, public CSimpleInterface
::Release(writer);
writer = NULL;
spillFileIO->flush();
mergeStats(stats, this);
spillFile->noteSize(getStatistic(StSizeSpillFile));
mergeRemappedStats(stats, spillFileIO, diskToTempStatsMap);
stats.addStatistic(StNumSpills, 1);
spillFile->noteSize(spillFileIO->getStatistic(StSizeDiskWrite));
spillFileIO.clear();
}
inline __int64 getStatistic(StatisticKind kind) const
{
switch (kind)
{
case StSizeSpillFile:
return spillFileIO->getStatistic(StSizeDiskWrite);
case StTimeSortElapsed:
return spillFileIO->getStatistic(StTimeDiskWriteIO);
case StSizeDiskWrite:
return 0; // Return file size as StSizeSpillFile kind. To avoid confusion, StSizeDiskWrite will not be returned
case StNumSpills:
return 1;
default:
return spillFileIO->getStatistic(kind);
}
}
// IRowWriter
virtual void putRow(const void *row) override
{
Expand Down
2 changes: 1 addition & 1 deletion thorlcr/activities/nsplitter/thnsplitterslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ class NSplitterSlaveActivity : public CSlaveActivity, implements ISharedSmartBuf
{
PARENT::gatherActiveStats(activeStats);
if (sharedRowStream)
::mergeStats(activeStats, sharedRowStream);
mergeRemappedStats(activeStats, sharedRowStream, diskToTempStatsMap);
}
// ISharedSmartBufferCallback impl.
virtual void paged() { pagedOut = true; }
Expand Down
46 changes: 7 additions & 39 deletions thorlcr/thorutil/thbuf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2150,21 +2150,9 @@ class CSharedWriteAheadDisk : public CSharedWriteAheadBase
}
virtual unsigned __int64 getStatistic(StatisticKind kind) const override
{
switch (kind)
{
case StSizeSpillFile:
return tempFileIO->getStatistic(StSizeDiskWrite);
case StCycleDiskWriteIOCycles:
case StTimeDiskWriteIO:
case StSizeDiskWrite:
return 0;
case StNumSpills:
return 1;
case StTimeSpillElapsed:
return tempFileIO->getStatistic(StCycleDiskWriteIOCycles);
default:
return tempFileIO->getStatistic(kind);
}
if (kind==StNumSpills)
return 1;
return tempFileIO->getStatistic(kind);
}
};

Expand Down Expand Up @@ -2493,11 +2481,11 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader
{
if (outputStream)
{
outputStream.clear();
iFileIO->flush();
tempFileOwner->noteSize(iFileIO->getStatistic(StSizeDiskWrite));
::mergeStats(inactiveStats, iFileIO);
iFileIO.clear();
outputStream.clear();
}
}
void createOutputStream()
Expand All @@ -2508,6 +2496,7 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader
outputStream.setown(std::get<0>(res));
iFileIO.setown(std::get<1>(res));
totalInputRowsRead = inMemTotalRows;
inactiveStats.addStatistic(StNumSpills, 1);
}
void writeRowsFromInput()
{
Expand Down Expand Up @@ -2549,6 +2538,7 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader
outputStream->flush();
totalInputRowsRead.fetch_add(newRowsWritten);
tempFileOwner->noteSize(iFileIO->getStatistic(StSizeDiskWrite));
::mergeStats(inactiveStats, iFileIO);
// JCSMORE - could track size written, and start new file at this point (e.g. every 100MB),
// and track their starting points (by row #) in a vector
// We could then tell if/when the readers catch up, and remove consumed files as they do.
Expand Down Expand Up @@ -2726,29 +2716,7 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader
}
virtual unsigned __int64 getStatistic(StatisticKind kind) const override
{
StatisticKind useKind;
switch (kind)
{
case StSizeSpillFile:
useKind = StSizeDiskWrite;
break;
case StCycleDiskWriteIOCycles:
case StTimeDiskWriteIO:
case StSizeDiskWrite:
return 0;
case StNumSpills:
return 1;
case StTimeSpillElapsed:
useKind = StCycleDiskWriteIOCycles;
break;
default:
useKind = kind;
}
unsigned __int64 v = 0;
if (likely(iFileIO))
v = iFileIO->getStatistic(useKind);
v += inactiveStats.getStatisticValue(useKind);
return v;
return inactiveStats.getStatisticValue(kind);
}
};

Expand Down
5 changes: 5 additions & 0 deletions thorlcr/thorutil/thormisc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ const StatisticsMapping hashDistribActivityStatistics({StNumLocalRows, StNumRemo
const StatisticsMapping nsplitterActivityStatistics({}, spillStatistics, basicActivityStatistics);
const StatisticsMapping spillingWriteAheadStatistics({}, spillStatistics);

const StatKindMap diskToTempStatsMap
={ {StSizeDiskWrite, StSizeSpillFile},
{StTimeDiskWriteIO, StTimeSpillElapsed}
};

MODULE_INIT(INIT_PRIORITY_STANDARD)
{
ClusterMPAllocator.setown(createMPtagRangeAllocator(MPTAG_THORGLOBAL_BASE,MPTAG_THORGLOBAL_COUNT));
Expand Down
3 changes: 3 additions & 0 deletions thorlcr/thorutil/thormisc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,9 @@ extern graph_decl const StatisticsMapping hashDistribActivityStatistics;
extern graph_decl const StatisticsMapping nsplitterActivityStatistics;
extern graph_decl const StatisticsMapping spillingWriteAheadStatistics;

// Maps disk related stats to spill stats
extern graph_decl const std::map<StatisticKind, StatisticKind> diskToTempStatsMap;

class BooleanOnOff
{
bool &tf;
Expand Down

0 comments on commit 0594162

Please sign in to comment.