Skip to content

Commit

Permalink
HPCC-32000 Changes following review
Browse files Browse the repository at this point in the history
Signed-off-by: Shamser Ahmed <[email protected]>
  • Loading branch information
shamser committed Jun 27, 2024
1 parent 5eae32f commit 58117e1
Showing 1 changed file with 6 additions and 6 deletions.
12 changes: 6 additions & 6 deletions thorlcr/thorutil/thbuf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2465,6 +2465,7 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader
SharedRowStreamReaderOptions options;
size32_t inMemReadAheadGranularity = 0;
CRuntimeStatisticCollection inactiveStats;
StringAttr baseTmpFilename;


rowcount_t getLowestOutput()
Expand Down Expand Up @@ -2503,6 +2504,7 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader
void createOutputStream()
{
// NB: Called once, when spilling starts.
tempFileOwner.setown(activity.createOwnedTempFile(baseTmpFilename));
auto res = createSerialOutputStream(&(tempFileOwner->queryIFile()), compressHandler, options, numOutputs + 1);
outputStream.setown(std::get<0>(res));
iFileIO.setown(std::get<1>(res));
Expand Down Expand Up @@ -2558,8 +2560,8 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader
ReleaseThorRow(std::get<0>(row));
}
public:
explicit CSharedFullSpillingWriteAhead(CActivityBase *_activity, unsigned _numOutputs, IRowStream *_input, bool _inputGrouped, const SharedRowStreamReaderOptions &_options, IThorRowInterfaces *rowIf, const char *tempFileName, ICompressHandler *_compressHandler)
: activity(*_activity), numOutputs(_numOutputs), input(_input), inputGrouped(_inputGrouped), options(_options), compressHandler(_compressHandler),
explicit CSharedFullSpillingWriteAhead(CActivityBase *_activity, unsigned _numOutputs, IRowStream *_input, bool _inputGrouped, const SharedRowStreamReaderOptions &_options, IThorRowInterfaces *rowIf, const char *_baseTmpFilename, ICompressHandler *_compressHandler)
: activity(*_activity), numOutputs(_numOutputs), input(_input), inputGrouped(_inputGrouped), options(_options), compressHandler(_compressHandler), baseTmpFilename(_baseTmpFilename),
meta(rowIf->queryRowMetaData()), serializer(rowIf->queryRowSerializer()), allocator(rowIf->queryRowAllocator()), deserializer(rowIf->queryRowDeserializer()),
inactiveStats(spillingWriteAheadStatistics)
{
Expand All @@ -2572,7 +2574,6 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader

for (unsigned o=0; o<numOutputs; o++)
outputs.push_back(new COutputRowStream(*this, o));
tempFileOwner.setown(activity.createOwnedTempFile(tempFileName));
}
~CSharedFullSpillingWriteAhead()
{
Expand All @@ -2597,7 +2598,7 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader
getFileIOStats(tracing, iFileIO);
activity.ActPrintLog("CSharedFullSpillingWriteAhead::outputStopped closing tempfile writer: %s %s", tempFileOwner->queryIFile().queryFilename(), tracing.str());
closeWriter();
tempFileOwner->noteSize(0);
tempFileOwner.clear();
}
}
}
Expand Down Expand Up @@ -2650,8 +2651,8 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader
if (rowsMemUsage >= options.inMemMaxMem) // too much in memory, spill
{
// NB: this will reset rowMemUsage, however, each reader will continue to consume rows until they catch up (or stop)
ActPrintLog(&activity, "Spilling to temp storage [file = %s, outputRowsAvailable = %" I64F "u, start = %" I64F "u, end = %" I64F "u, count = %u]", tempFileOwner->queryIFile().queryFilename(), outputRowsAvailable, inMemTotalRows - rows.size(), inMemTotalRows, (unsigned)rows.size());
createOutputStream();
ActPrintLog(&activity, "Spilling to temp storage [file = %s, outputRowsAvailable = %" I64F "u, start = %" I64F "u, end = %" I64F "u, count = %u]", tempFileOwner->queryIFile().queryFilename(), outputRowsAvailable, inMemTotalRows - rows.size(), inMemTotalRows, (unsigned)rows.size());
return false;
}

Expand Down Expand Up @@ -2714,7 +2715,6 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader
virtual void reset() override
{
closeWriter();
tempFileOwner->noteSize(0);
for (auto &output: outputs)
output->reset();
freeRows();
Expand Down

0 comments on commit 58117e1

Please sign in to comment.