Skip to content

Commit

Permalink
HPCC-32017 review changes
Browse files Browse the repository at this point in the history
Signed-off-by: Jake Smith <[email protected]>
  • Loading branch information
jakesmith committed Jun 12, 2024
1 parent 19132da commit b892ad8
Showing 1 changed file with 36 additions and 22 deletions.
58 changes: 36 additions & 22 deletions thorlcr/thorutil/thbuf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1661,6 +1661,13 @@ ISharedSmartBuffer *createSharedSmartMemBuffer(CActivityBase *activity, unsigned
}


// This implementation is supplied with the input, and reads from it on demand, initially to memory.
// It will spill to disk if the configurable memory limit is exceeded.
// The leading reader(output) causes the implementation to read more rows from the input.
// Once the leader causes the rows in memory to exceed the memory limit, it will cause a output stream to be created.
// From that point on, the leader will write blocks of rows out to disk,
// and cause all readers to read from it, once they have exhaused the in-memory row set.

class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader>
{
typedef std::vector<const void *> Rows;
Expand All @@ -1669,6 +1676,7 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader
CSharedFullSpillingWriteAhead &owner;
unsigned whichOutput = 0;
size32_t localRowsIndex = 0;
rowcount_t lastKnownAvailable = 0;
rowcount_t currentRow = 0;
Rows rows;
Owned<IBufferedSerialInputStream> inputStream;
Expand All @@ -1690,9 +1698,9 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader
}
const void *getRowFromStream()
{
if (currentRow == lastKnownWritten)
if (currentRow == lastKnownAvailable)
{
if (!owner.checkWriteAhead(lastKnownWritten))
if (!owner.checkWriteAhead(lastKnownAvailable))
{
eof = true;
return nullptr;
Expand All @@ -1714,8 +1722,6 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader
return rowBuilder.finalizeRowClear(sz);
}
public:
rowcount_t lastKnownWritten = 0;

explicit COutputRowStream(CSharedFullSpillingWriteAhead &_owner, unsigned _whichOutput)
: owner(_owner), whichOutput(_whichOutput)
{
Expand All @@ -1724,6 +1730,14 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader
{
freeRows();
}
rowcount_t queryLastKnownAvailable() const
{
return lastKnownAvailable;
}
void setLastKnownAvailable(rowcount_t _lastKnownWritten)
{
lastKnownAvailable = _lastKnownWritten;
}
void cancel()
{
eof = true;
Expand All @@ -1735,7 +1749,7 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader
inputStream.clear();
eof = false;
currentRow = 0;
lastKnownWritten = 0;
lastKnownAvailable = 0;
}
virtual const void *nextRow() override
{
Expand All @@ -1753,7 +1767,7 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader
localRowsIndex = 0;
rows.clear();

if (owner.getRowsInMem(rows, lastKnownWritten))
if (owner.getRowsInMem(rows, lastKnownAvailable))
{
if (rows.empty())
{
Expand All @@ -1779,7 +1793,10 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader
freeRows();
ds.setStream(nullptr);
inputStream.clear();

// NB: this will set lastKnownAvailable to max[(rowcount_t)-1] (within owner.readAheadCS) to prevent it being considered as lowest any longer
owner.outputStopped(whichOutput);

eof = true;
}
};
Expand Down Expand Up @@ -1823,18 +1840,15 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader

CActivityBase &activity;
Linked<IRowStream> input;
IThorRowInterfaces *rowIf = nullptr;
Linked<IOutputMetaData> meta;
Linked<IOutputRowSerializer> serializer;
Linked<IOutputRowDeserializer> deserializer;
Linked<IEngineRowAllocator> allocator;
IOutputMetaData *serializeMeta = nullptr;
std::vector<Owned<COutputRowStream>> outputs;
std::deque<std::tuple<const void *, size32_t>> rows;
memsize_t rowsMemUsage = 0;
std::atomic<rowcount_t> totalInputRowsRead = 0; // not used until spilling begins, represents count of all rows read
rowcount_t inMemTotalRows = 0; // whilst in memory, represents count of all rows seen
CriticalSection crit;
CriticalSection readAheadCS; // ensure single reader (leader), reads ahead (updates rows/totalInputRowsRead/inMemTotalRows)
Owned<IFile> iFile;
Owned<IBufferedSerialOutputStream> outputStream;
Linked<ICompressHandler> compressHandler;
Expand All @@ -1845,12 +1859,13 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader

rowcount_t getLowestOutput()
{
// NB: must be called in crit
// NB: must be called with readAheadCS held
rowcount_t trailingRowPos = (rowcount_t)-1;
for (auto &output: outputs)
{
if (output->lastKnownWritten < trailingRowPos)
trailingRowPos = output->lastKnownWritten;
rowcount_t outputLastKnownWritten = output->queryLastKnownAvailable();
if (outputLastKnownWritten < trailingRowPos)
trailingRowPos = outputLastKnownWritten;
}
return trailingRowPos;
}
Expand All @@ -1870,7 +1885,7 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader
}
void createOutputStream()
{
// NB: this flushes existing unread rows from memory. Called once, when spilling starts.
// NB: Called once, when spilling starts.
Owned<IFileIO> io = iFile->open(IFOcreate);
Owned<ISerialOutputStream> out = createSerialOutputStream(io);
outputStream.setown(createBufferedOutputStream(out, options.storageBlockSize)); //prefered plane block size
Expand Down Expand Up @@ -1933,10 +1948,9 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader
ReleaseThorRow(std::get<0>(row));
}
public:
explicit CSharedFullSpillingWriteAhead(CActivityBase *_activity, unsigned numOutputs, IRowStream *_input, bool _inputGrouped, const SharedRowStreamReaderOptions &_options, IThorRowInterfaces *_rowIf, const char *tempFileName, ICompressHandler *_compressHandler)
: activity(*_activity), input(_input), inputGrouped(_inputGrouped), options(_options), rowIf(_rowIf), compressHandler(_compressHandler),
meta(rowIf->queryRowMetaData()), serializer(rowIf->queryRowSerializer()), allocator(rowIf->queryRowAllocator()),
deserializer(rowIf->queryRowDeserializer()), serializeMeta(meta->querySerializedDiskMeta())
explicit CSharedFullSpillingWriteAhead(CActivityBase *_activity, unsigned numOutputs, IRowStream *_input, bool _inputGrouped, const SharedRowStreamReaderOptions &_options, IThorRowInterfaces *rowIf, const char *tempFileName, ICompressHandler *_compressHandler)
: activity(*_activity), input(_input), inputGrouped(_inputGrouped), options(_options), compressHandler(_compressHandler),
serializer(rowIf->queryRowSerializer()), allocator(rowIf->queryRowAllocator()), deserializer(rowIf->queryRowDeserializer())
{
assertex(input);
assertex(options.inMemReadAheadGranularity < options.inMemMaxMem);
Expand All @@ -1954,8 +1968,8 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader
void outputStopped(unsigned output)
{
// Mark finished output with max, so that it is not considered by getLowestOutput()
CriticalBlock b(crit);
outputs[output]->lastKnownWritten = (rowcount_t)-1;
CriticalBlock b(readAheadCS); // read ahead could be active and considering this output
outputs[output]->setLastKnownAvailable((rowcount_t)-1);

}
IBufferedSerialInputStream *getReadStream()
Expand All @@ -1976,7 +1990,7 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader
{
if (totalInputRowsRead == outputRowsAvailable)
{
CriticalBlock b(crit);
CriticalBlock b(readAheadCS);
if (totalInputRowsRead == outputRowsAvailable) // if not, then since gaining the crit, totalInputRowsRead has changed
{
if (endOfInput)
Expand All @@ -1994,7 +2008,7 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader
}
bool getRowsInMem(Rows &outputRows, rowcount_t &outputRowsAvailable)
{
CriticalBlock b(crit);
CriticalBlock b(readAheadCS);
if (outputRowsAvailable == inMemTotalRows) // load more
{
// prune unused rows
Expand Down

0 comments on commit b892ad8

Please sign in to comment.