Skip to content

Commit

Permalink
HPCC-32138 Fix remapping issues
Browse files Browse the repository at this point in the history
And add updateStatsDelta and updateRemappedStatsDelta template
helpers

Signed-off-by: Jake Smith <[email protected]>
  • Loading branch information
jakesmith committed Jul 26, 2024
1 parent a746d7f commit 342d6c6
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 7 deletions.
30 changes: 30 additions & 0 deletions system/jlib/jstats.h
Original file line number Diff line number Diff line change
Expand Up @@ -839,6 +839,21 @@ void mergeStat(CRuntimeStatisticCollection & stats, INTERFACE * source, Statisti
template <class INTERFACE>
void mergeStat(CRuntimeStatisticCollection & stats, const Shared<INTERFACE> & source, StatisticKind kind) { mergeStat(stats, source.get(), kind); }

// helper templates that add delta of previous vs current (from source) to tgtStats (and update prevStats)
template <class INTERFACE>
void updateStatsDelta(CRuntimeStatisticCollection & tgtStats, CRuntimeStatisticCollection & prevStats, INTERFACE * source)
{
CRuntimeStatisticCollection curStats(tgtStats.queryMapping());
mergeStats(curStats, source);
prevStats.updateDelta(tgtStats, curStats); // NB: adds delta to tgtStats, and updates prevStats
}

template <class INTERFACE>
void updateStatsDelta(CRuntimeStatisticCollection & tgtStats, CRuntimeStatisticCollection & prevStats, const Shared<INTERFACE> & source)
{
updateStatsDelta(tgtStats, prevStats, source.get());
}


//Some template helper classes for overwriting/setting statistics from external sources.

Expand Down Expand Up @@ -909,6 +924,21 @@ void mergeRemappedStats(CRuntimeStatisticCollection & stats, const Shared<INTERF
mergeRemappedStats(stats, source.get(), stats.queryMapping(), remaps);
}

template <class INTERFACE>
void updateRemappedStatsDelta(CRuntimeStatisticCollection & tgtStats, CRuntimeStatisticCollection & prevStats, INTERFACE * source, const StatKindMap & remap)
{
CRuntimeStatisticCollection curStats(tgtStats.queryMapping());
::mergeRemappedStats(curStats, source, remap);
prevStats.updateDelta(tgtStats, curStats); // NB: adds delta to tgtStats, and updates prevStats
}

template <class INTERFACE>
void updateRemappedStatsDelta(CRuntimeStatisticCollection & tgtStats, CRuntimeStatisticCollection & prevStats, const Shared<INTERFACE> & source, const StatKindMap & remap)
{
updateRemappedStatsDelta(tgtStats, prevStats, source.get(), remap);
}


//---------------------------------------------------------------------------------------------------------------------

//A class for minimizing the overhead of collecting timestamps.
Expand Down
2 changes: 1 addition & 1 deletion thorlcr/activities/nsplitter/thnsplitterslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ class NSplitterSlaveActivity : public CSlaveActivity, implements ISharedSmartBuf
{
PARENT::gatherActiveStats(activeStats);
if (sharedRowStream)
mergeRemappedStats(activeStats, sharedRowStream, diskToTempStatsMap);
::mergeStats(activeStats, sharedRowStream);
}
// ISharedSmartBufferCallback impl.
virtual void paged() { pagedOut = true; }
Expand Down
8 changes: 2 additions & 6 deletions thorlcr/thorutil/thbuf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2485,9 +2485,7 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader
outputStream.clear();
iFileIO->flush();
tempFileOwner->noteSize(iFileIO->getStatistic(StSizeDiskWrite));
CRuntimeStatisticCollection currentFileStats(inactiveStats.queryMapping());
::mergeStats(currentFileStats, iFileIO);
previousFileStats.updateDelta(inactiveStats, currentFileStats);
updateRemappedStatsDelta(inactiveStats, previousFileStats, iFileIO, diskToTempStatsMap); // NB: also updates prev to current
previousFileStats.reset();
iFileIO.clear();
}
Expand Down Expand Up @@ -2543,9 +2541,7 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader
outputStream->flush();
totalInputRowsRead.fetch_add(newRowsWritten);
tempFileOwner->noteSize(iFileIO->getStatistic(StSizeDiskWrite));
CRuntimeStatisticCollection currentFileStats(inactiveStats.queryMapping());
::mergeStats(currentFileStats, iFileIO);
previousFileStats.updateDelta(inactiveStats, currentFileStats);
updateRemappedStatsDelta(inactiveStats, previousFileStats, iFileIO, diskToTempStatsMap); // NB: also updates prev to current
// JCSMORE - could track size written, and start new file at this point (e.g. every 100MB),
// and track their starting points (by row #) in a vector
// We could then tell if/when the readers catch up, and remove consumed files as they do.
Expand Down

0 comments on commit 342d6c6

Please sign in to comment.