Skip to content

Commit

Permalink
HPCC-32138 Changes following review
Browse files Browse the repository at this point in the history
Signed-off-by: Shamser Ahmed <[email protected]>
  • Loading branch information
shamser committed Jul 11, 2024
1 parent 0878095 commit 0a855ad
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 15 deletions.
9 changes: 6 additions & 3 deletions system/jlib/jstats.h
Original file line number Diff line number Diff line change
Expand Up @@ -875,8 +875,11 @@ void setStat(CRuntimeStatisticCollection & stats, INTERFACE * source, StatisticK
template <class INTERFACE>
void setStat(CRuntimeStatisticCollection & stats, const Shared<INTERFACE> & source, StatisticKind kind) { setStat(stats, source.get(), kind); }


typedef std::map<StatisticKind, StatisticKind> StatKindMap;

template <class INTERFACE>
void mergeRemappedStats(CRuntimeStatisticCollection & stats, INTERFACE * source, const StatisticsMapping & mapping, const std::map<StatisticKind, StatisticKind> & remaps)
void mergeRemappedStats(CRuntimeStatisticCollection & stats, INTERFACE * source, const StatisticsMapping & mapping, const StatKindMap & remaps)
{
if (!source)
return;
Expand All @@ -895,13 +898,13 @@ void mergeRemappedStats(CRuntimeStatisticCollection & stats, INTERFACE * source,
}

template <class INTERFACE>
void mergeRemappedStats(CRuntimeStatisticCollection & stats, INTERFACE * source, const std::map<StatisticKind, StatisticKind> & remaps)
void mergeRemappedStats(CRuntimeStatisticCollection & stats, INTERFACE * source, const StatKindMap & remaps)
{
mergeRemappedStats(stats, source, stats.queryMapping(), remaps);
}

template <class INTERFACE>
void mergeRemappedStats(CRuntimeStatisticCollection & stats, const Shared<INTERFACE> & source, const std::map<StatisticKind, StatisticKind> & remaps)
void mergeRemappedStats(CRuntimeStatisticCollection & stats, const Shared<INTERFACE> & source, const StatKindMap & remaps)
{
mergeRemappedStats(stats, source.get(), stats.queryMapping(), remaps);
}
Expand Down
4 changes: 2 additions & 2 deletions thorlcr/activities/hashdistrib/thhashdistribslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2760,8 +2760,8 @@ class CSpill : implements IRowWriter, public CSimpleInterface
::Release(writer);
writer = NULL;
spillFileIO->flush();
mergeRemappedStats(stats, spillFileIO, diskToSpillStatsMap);
stats.setStatistic(StNumSpills, 1);
mergeRemappedStats(stats, spillFileIO, diskToTempStatsMap);
stats.addStatistic(StNumSpills, 1);
spillFile->noteSize(spillFileIO->getStatistic(StSizeDiskWrite));
spillFileIO.clear();
}
Expand Down
2 changes: 1 addition & 1 deletion thorlcr/activities/nsplitter/thnsplitterslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ class NSplitterSlaveActivity : public CSlaveActivity, implements ISharedSmartBuf
{
PARENT::gatherActiveStats(activeStats);
if (sharedRowStream)
mergeRemappedStats(activeStats, sharedRowStream, diskToSpillStatsMap);
mergeRemappedStats(activeStats, sharedRowStream, diskToTempStatsMap);
}
// ISharedSmartBufferCallback impl.
virtual void paged() { pagedOut = true; }
Expand Down
11 changes: 4 additions & 7 deletions thorlcr/thorutil/thbuf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2482,11 +2482,11 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader
{
if (outputStream)
{
outputStream.clear();
iFileIO->flush();
tempFileOwner->noteSize(iFileIO->getStatistic(StSizeDiskWrite));
::mergeStats(inactiveStats, iFileIO);
iFileIO.clear();
outputStream.clear();
}
}
void createOutputStream()
Expand All @@ -2497,6 +2497,7 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader
outputStream.setown(std::get<0>(res));
iFileIO.setown(std::get<1>(res));
totalInputRowsRead = inMemTotalRows;
inactiveStats.addStatistic(StNumSpills, 1);
}
void writeRowsFromInput()
{
Expand Down Expand Up @@ -2538,6 +2539,7 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader
outputStream->flush();
totalInputRowsRead.fetch_add(newRowsWritten);
tempFileOwner->noteSize(iFileIO->getStatistic(StSizeDiskWrite));
::mergeStats(inactiveStats, iFileIO);
// JCSMORE - could track size written, and start new file at this point (e.g. every 100MB),
// and track their starting points (by row #) in a vector
// We could then tell if/when the readers catch up, and remove consumed files as they do.
Expand Down Expand Up @@ -2715,12 +2717,7 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader
}
virtual unsigned __int64 getStatistic(StatisticKind kind) const override
{
if (kind==StNumSpills)
return 1;
unsigned __int64 v = inactiveStats.getStatisticValue(kind);
if (likely(iFileIO))
v = iFileIO->getStatistic(kind);
return v;
return inactiveStats.getStatisticValue(kind);
}
};

Expand Down
2 changes: 1 addition & 1 deletion thorlcr/thorutil/thormisc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ const StatisticsMapping hashDistribActivityStatistics({StNumLocalRows, StNumRemo
const StatisticsMapping nsplitterActivityStatistics({}, spillStatistics, basicActivityStatistics);
const StatisticsMapping spillingWriteAheadStatistics({}, spillStatistics);

const std::map<StatisticKind, StatisticKind> diskToSpillStatsMap
const StatKindMap diskToTempStatsMap
={ {StSizeDiskWrite, StSizeSpillFile},
{StTimeDiskWriteIO, StTimeSpillElapsed}
};
Expand Down
2 changes: 1 addition & 1 deletion thorlcr/thorutil/thormisc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ extern graph_decl const StatisticsMapping nsplitterActivityStatistics;
extern graph_decl const StatisticsMapping spillingWriteAheadStatistics;

// Maps disk related stats to spill stats
extern graph_decl const std::map<StatisticKind, StatisticKind> diskToSpillStatsMap;
extern graph_decl const std::map<StatisticKind, StatisticKind> diskToTempStatsMap;

class BooleanOnOff
{
Expand Down

0 comments on commit 0a855ad

Please sign in to comment.