Skip to content

Commit

Permalink
HPCC-32138 Changes following review
Browse files Browse the repository at this point in the history
Signed-off-by: Shamser Ahmed <[email protected]>
  • Loading branch information
shamser committed Jul 19, 2024
1 parent 385d8f3 commit a746d7f
Showing 1 changed file with 10 additions and 6 deletions.
16 changes: 10 additions & 6 deletions thorlcr/thorutil/thbuf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2452,6 +2452,7 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader
SharedRowStreamReaderOptions options;
size32_t inMemReadAheadGranularity = 0;
CRuntimeStatisticCollection inactiveStats;
CRuntimeStatisticCollection previousFileStats;
StringAttr baseTmpFilename;


Expand Down Expand Up @@ -2484,7 +2485,10 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader
outputStream.clear();
iFileIO->flush();
tempFileOwner->noteSize(iFileIO->getStatistic(StSizeDiskWrite));
::mergeStats(inactiveStats, iFileIO);
CRuntimeStatisticCollection currentFileStats(inactiveStats.queryMapping());
::mergeStats(currentFileStats, iFileIO);
previousFileStats.updateDelta(inactiveStats, currentFileStats);
previousFileStats.reset();
iFileIO.clear();
}
}
Expand Down Expand Up @@ -2539,6 +2543,9 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader
outputStream->flush();
totalInputRowsRead.fetch_add(newRowsWritten);
tempFileOwner->noteSize(iFileIO->getStatistic(StSizeDiskWrite));
CRuntimeStatisticCollection currentFileStats(inactiveStats.queryMapping());
::mergeStats(currentFileStats, iFileIO);
previousFileStats.updateDelta(inactiveStats, currentFileStats);
// JCSMORE - could track size written, and start new file at this point (e.g. every 100MB),
// and track their starting points (by row #) in a vector
// We could then tell if/when the readers catch up, and remove consumed files as they do.
Expand All @@ -2552,7 +2559,7 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader
explicit CSharedFullSpillingWriteAhead(CActivityBase *_activity, unsigned _numOutputs, IRowStream *_input, bool _inputGrouped, const SharedRowStreamReaderOptions &_options, IThorRowInterfaces *rowIf, const char *_baseTmpFilename, ICompressHandler *_compressHandler)
: activity(*_activity), numOutputs(_numOutputs), input(_input), inputGrouped(_inputGrouped), options(_options), compressHandler(_compressHandler), baseTmpFilename(_baseTmpFilename),
meta(rowIf->queryRowMetaData()), serializer(rowIf->queryRowSerializer()), allocator(rowIf->queryRowAllocator()), deserializer(rowIf->queryRowDeserializer()),
inactiveStats(spillingWriteAheadStatistics)
inactiveStats(spillingWriteAheadStatistics), previousFileStats(spillingWriteAheadStatistics)
{
assertex(input);

Expand Down Expand Up @@ -2716,10 +2723,7 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader
}
virtual unsigned __int64 getStatistic(StatisticKind kind) const override
{
unsigned __int64 v = inactiveStats.getStatisticValue(kind);
if (iFileIO)
v += iFileIO->getStatistic(kind);
return v;
return inactiveStats.getStatisticValue(kind);
}
};

Expand Down

0 comments on commit a746d7f

Please sign in to comment.