From 2116ab4117441615724e8bc4fda656578a2f2315 Mon Sep 17 00:00:00 2001 From: Jake Smith Date: Tue, 4 Jun 2024 22:36:59 +0100 Subject: [PATCH] HPCC-32017 New compressing splitter implementation Signed-off-by: Jake Smith --- rtl/eclrtl/rtlread_imp.hpp | 29 + system/jlib/jutil.cpp | 7 +- system/jlib/jutil.hpp | 2 - .../activities/nsplitter/thnsplitterslave.cpp | 88 ++- thorlcr/thorutil/thbuf.cpp | 530 +++++++++++++++++- thorlcr/thorutil/thbuf.hpp | 32 +- thorlcr/thorutil/thormisc.hpp | 5 + 7 files changed, 650 insertions(+), 43 deletions(-) diff --git a/rtl/eclrtl/rtlread_imp.hpp b/rtl/eclrtl/rtlread_imp.hpp index afa1ba7e0a3..a475e533651 100644 --- a/rtl/eclrtl/rtlread_imp.hpp +++ b/rtl/eclrtl/rtlread_imp.hpp @@ -94,5 +94,34 @@ class ECLRTL_API CThorStreamDeserializerSource : implements IRowDeserializerSour Linked in; // could use a CStreamSerializer class (with inlines to improve) }; +class ECLRTL_API COutputStreamSerializer : public CSimpleInterfaceOf +{ + Linked outputStream; + unsigned nesting = 0; + offset_t outerNestingOffset = 0; + +public: + COutputStreamSerializer(IBufferedSerialOutputStream *_outputStream) : outputStream(_outputStream) + { + } + virtual void put(size32_t len, const void * ptr) override + { + outputStream->put(len, ptr); + } + virtual size32_t beginNested(size32_t count) override + { + outputStream->suspend(sizeof(size32_t)); + if (nesting++ == 0) + outerNestingOffset = outputStream->tell(); + return outputStream->tell()-outerNestingOffset; + } + virtual void endNested(size32_t delta) override + { + size32_t patchedLength = outputStream->tell() - (delta + outerNestingOffset); + outputStream->resume(sizeof(size32_t), &patchedLength); + nesting--; + } +}; + #endif diff --git a/system/jlib/jutil.cpp b/system/jlib/jutil.cpp index dbed0920d38..d5c8acb8917 100644 --- a/system/jlib/jutil.cpp +++ b/system/jlib/jutil.cpp @@ -2759,10 +2759,10 @@ StringBuffer &getFileAccessUrl(StringBuffer &out) return out; } - -#ifdef _CONTAINERIZED bool getDefaultPlane(StringBuffer &ret, const char * componentOption, const char * category) { + if (!isContainerized()) + throwUnexpectedX("getDefaultPlane() called from non-container system"); // If the plane is specified for the component, then use that if (getComponentConfigSP()->getProp(componentOption, ret)) return true; @@ -2780,8 +2780,11 @@ bool getDefaultPlane(StringBuffer &ret, const char * componentOption, const char return false; } +#ifdef _CONTAINERIZED static bool getDefaultPlaneDirectory(StringBuffer &ret, const char * componentOption, const char * category) { + if (!isContainerized()) + throwUnexpectedX("getDefaultPlaneDirectory() called from non-container system"); StringBuffer planeName; if (!getDefaultPlane(planeName, componentOption, category)) return false; diff --git a/system/jlib/jutil.hpp b/system/jlib/jutil.hpp index 445899965be..3bbe204ac15 100644 --- a/system/jlib/jutil.hpp +++ b/system/jlib/jutil.hpp @@ -657,9 +657,7 @@ extern jlib_decl bool checkCreateDaemon(unsigned argc, const char * * argv); //Createpassword of specified length, containing UpperCaseAlphas, LowercaseAlphas, numerics and symbols extern jlib_decl const char * generatePassword(StringBuffer &pwd, int pwdLen); -#ifdef _CONTAINERIZED extern jlib_decl bool getDefaultPlane(StringBuffer &ret, const char * componentOption, const char * category); -#endif extern jlib_decl void getResourceFromJfrog(StringBuffer &localPath, IPropertyTree &item); diff --git a/thorlcr/activities/nsplitter/thnsplitterslave.cpp b/thorlcr/activities/nsplitter/thnsplitterslave.cpp index 56c5dbe0c91..a1767a5a3dd 100644 --- a/thorlcr/activities/nsplitter/thnsplitterslave.cpp +++ b/thorlcr/activities/nsplitter/thnsplitterslave.cpp @@ -30,6 +30,7 @@ class CSplitterOutput : public CSimpleInterfaceOf, pu NSplitterSlaveActivity &activity; Semaphore writeBlockSem; bool started = false, stopped = false; + IRowStream *splitterStream = nullptr; unsigned outIdx; rowcount_t rec = 0, max = 0; @@ -43,6 +44,7 @@ class CSplitterOutput : public CSimpleInterfaceOf, pu { started = stopped = false; rec = max = 0; + splitterStream = nullptr; } inline bool isStopped() const { return stopped; } @@ -80,6 +82,7 @@ class NSplitterSlaveActivity : public CSlaveActivity, implements ISharedSmartBuf typedef CSlaveActivity PARENT; bool spill = false; + bool newSplitter = false; bool eofHit = false; bool writeBlocked = false, pagedOut = false; CriticalSection connectLock, prepareInputLock, writeAheadCrit; @@ -91,7 +94,8 @@ class NSplitterSlaveActivity : public CSlaveActivity, implements ISharedSmartBuf unsigned connectedOutputCount = (unsigned)-1; // uninitialized rowcount_t recsReady = 0; Owned writeAheadException; - Owned smartBuf; + Owned sharedRowStream; + Owned sharedSmartRowWriter; bool inputPrepared = false; bool inputConnected = false; unsigned numOutputs = 0; @@ -161,6 +165,12 @@ class NSplitterSlaveActivity : public CSlaveActivity, implements ISharedSmartBuf spill = dV>0; ForEachItemIn(o, container.outputs) appendOutput(new CSplitterOutput(*this, o)); + newSplitter = getOptBool("newsplitter", false); + if (getOptBool("forcenewsplitter", false)) + { + newSplitter = true; + spill = true; + } } virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData) override { @@ -219,21 +229,50 @@ class NSplitterSlaveActivity : public CSlaveActivity, implements ISharedSmartBuf assertex(activeOutputCount); // must be >=1, as an output start() invoked prepareInput if (1 == activeOutputCount) return; // single output in use which will be read directly - if (smartBuf) - smartBuf->reset(); + if (sharedRowStream) + sharedRowStream->reset(); else { if (spill) { StringBuffer tempname; GetTempFilePath(tempname, "nsplit"); - smartBuf.setown(createSharedSmartDiskBuffer(this, tempname.str(), numOutputs, queryRowInterfaces(input))); - ActPrintLog("Using temp spill file: %s", tempname.str()); + if (newSplitter) + { + SharedRowStreamReaderOptions options; + if (isContainerized()) + { + StringBuffer planeName; + if (!getDefaultPlane(planeName, "@tempPlane", "temp")) + getDefaultPlane(planeName, "@spillPlane", "spill"); + size32_t blockedSequentialIOSize = getPlaneAttributeValue(planeName, BlockedSequentialIO, (size32_t)-1); + if ((size32_t)-1 != blockedSequentialIOSize) + options.storageBlockSize = blockedSequentialIOSize; + } + options.totalCompressionBufferSize = getOptInt(THOROPT_SPLITTER_COMPRESSIONTOALK, options.totalCompressionBufferSize / 1024) * 1024; + options.inMemMaxMem = getOptInt(THOROPT_SPLITTER_MAXROWMEMK, options.inMemMaxMem / 1024) * 1024; + options.spillWriteAheadSize = getOptInt64(THOROPT_SPLITTER_WRITEAHEADK, options.spillWriteAheadSize / 1024) * 1024; + options.inMemReadAheadGranularity = getOptInt(THOROPT_SPLITTER_READAHEADGRANULARITYK, options.inMemReadAheadGranularity / 1024) * 1024; + options.inMemReadAheadGranularityRows = getOptInt(THOROPT_SPLITTER_READAHEADGRANULARITYROWS, options.inMemReadAheadGranularity); + options.heapFlags = getOptInt("spillheapflags", options.heapFlags); + + ICompressHandler *compressHandler = options.totalCompressionBufferSize ? queryDefaultCompressHandler() : nullptr; + sharedRowStream.setown(createSharedFullSpillingWriteAhead(this, numOutputs, inputStream, isGrouped(), options, this, tempname.str(), compressHandler)); + } + else + { + Owned smartBuf = createSharedSmartDiskBuffer(this, tempname.str(), numOutputs, this); + sharedRowStream.set(smartBuf); + sharedSmartRowWriter.setown(smartBuf->getWriter()); + ActPrintLog("Using temp spill file: %s", tempname.str()); + } } else { ActPrintLog("Spill is 'balanced'"); - smartBuf.setown(createSharedSmartMemBuffer(this, numOutputs, queryRowInterfaces(input), NSPLITTER_SPILL_BUFFER_SIZE)); + Owned smartBuf = createSharedSmartMemBuffer(this, numOutputs, this, NSPLITTER_SPILL_BUFFER_SIZE); + sharedRowStream.set(smartBuf); + sharedSmartRowWriter.setown(smartBuf->getWriter()); cachedMetaInfo.canStall = true; } // mark any outputs already stopped @@ -241,7 +280,7 @@ class NSplitterSlaveActivity : public CSlaveActivity, implements ISharedSmartBuf { CSplitterOutput *output = (CSplitterOutput *)outputs.item(o); if (output->isStopped() || !connectedOutputSet->test(o)) - smartBuf->queryOutput(o)->stop(); + sharedRowStream->queryOutput(o)->stop(); } } if (!spill) @@ -261,7 +300,7 @@ class NSplitterSlaveActivity : public CSlaveActivity, implements ISharedSmartBuf return inputStream->nextRow(); if (recsReady == current && writeAheadException.get()) throw LINK(writeAheadException); - return smartBuf->queryOutput(outIdx)->nextRow(); // will block until available + return sharedRowStream->queryOutput(outIdx)->nextRow(); // will block until available } rowcount_t writeahead(rowcount_t current, const bool &stopped, Semaphore &writeBlockSem, unsigned outIdx) { @@ -311,7 +350,7 @@ class NSplitterSlaveActivity : public CSlaveActivity, implements ISharedSmartBuf row.setown(inputStream->nextRow()); if (row) { - smartBuf->putRow(nullptr, this); // may call blocked() (see ISharedSmartBufferCallback impl. below) + sharedSmartRowWriter->putRow(nullptr, this); // may call blocked() (see ISharedSmartBufferCallback impl. below) ++recsReady; } } @@ -321,10 +360,10 @@ class NSplitterSlaveActivity : public CSlaveActivity, implements ISharedSmartBuf { ActPrintLog("Splitter activity, hit end of input @ rec = %" RCPF "d", recsReady); eofHit = true; - smartBuf->flush(); // signals no more rows will be written. + sharedSmartRowWriter->flush(); // signals no more rows will be written. break; } - smartBuf->putRow(row.getClear(), this); // can block if mem limited, but other readers can progress which is the point + sharedSmartRowWriter->putRow(row.getClear(), this); // can block if mem limited, but other readers can progress which is the point ++recsReady; } return recsReady; @@ -339,12 +378,12 @@ class NSplitterSlaveActivity : public CSlaveActivity, implements ISharedSmartBuf } else { - if (smartBuf) + if (sharedRowStream) { /* If no output has started reading (nextRow()), then it will not have been prepared * If only 1 output is left, it will bypass the smart buffer when it starts. */ - smartBuf->queryOutput(outIdx)->stop(); + sharedRowStream->queryOutput(outIdx)->stop(); } ++stoppedOutputs; if (stoppedOutputs == connectedOutputCount) @@ -357,8 +396,8 @@ class NSplitterSlaveActivity : public CSlaveActivity, implements ISharedSmartBuf void abort() { CSlaveActivity::abort(); - if (smartBuf) - smartBuf->cancel(); + if (sharedRowStream) + sharedRowStream->cancel(); } // ISharedSmartBufferCallback impl. virtual void paged() { pagedOut = true; } @@ -460,6 +499,11 @@ void CSplitterOutput::start() activity.prepareInput(); if (1 == activity.activeOutputCount) max = RCMAX; // signals that no writeahead required + else + { + if (activity.newSplitter) + splitterStream = activity.sharedRowStream->queryOutput(outIdx); + } dataLinkStart(); } @@ -476,10 +520,16 @@ void CSplitterOutput::stop() const void *CSplitterOutput::nextRow() { ActivityTimer t(slaveTimerStats, activity.queryTimeActivities()); - if (rec == max) // NB: max will be RCMAX if activeOutputCount == 1 - max = activity.writeahead(max, activity.queryAbortSoon(), writeBlockSem, outIdx); - const void *row = activity.nextRow(outIdx, rec); // pass ptr to max if need more - ++rec; + const void *row; + if (splitterStream) + row = splitterStream->nextRow(); + else + { + if (rec == max) // NB: max will be RCMAX if activeOutputCount == 1 + max = activity.writeahead(max, activity.queryAbortSoon(), writeBlockSem, outIdx); + row = activity.nextRow(outIdx, rec); // pass ptr to max if need more + ++rec; + } if (row) dataLinkIncrement(); return row; diff --git a/thorlcr/thorutil/thbuf.cpp b/thorlcr/thorutil/thbuf.cpp index d64d06da1a2..38054cf60fd 100644 --- a/thorlcr/thorutil/thbuf.cpp +++ b/thorlcr/thorutil/thbuf.cpp @@ -15,6 +15,8 @@ limitations under the License. ############################################################################## */ +#include +#include #include "platform.h" #include #include @@ -735,7 +737,7 @@ int chunkSizeCompare2(Chunk *lhs, Chunk *rhs) } #define MIN_POOL_CHUNKS 10 -class CSharedWriteAheadBase : public CSimpleInterface, implements ISharedSmartBuffer +class CSharedWriteAheadBase : public CSimpleInterface, implements ISharedSmartBuffer, implements ISharedSmartBufferRowWriter { size32_t totalOutChunkSize; bool writeAtEof; @@ -1104,7 +1106,12 @@ class CSharedWriteAheadBase : public CSimpleInterface, implements ISharedSmartBu } // ISharedSmartBuffer - virtual void putRow(const void *row, ISharedSmartBufferCallback *callback) + virtual ISharedSmartBufferRowWriter *getWriter() override + { + return LINK(this); + } +// ISharedSmartBufferRowWriter + virtual void putRow(const void *row, ISharedSmartBufferCallback *callback) override { if (stopped) { @@ -1142,32 +1149,28 @@ class CSharedWriteAheadBase : public CSimpleInterface, implements ISharedSmartBu if (!callback || paged) signalReaders(); } - virtual void putRow(const void *row) + virtual void putRow(const void *row) override { return putRow(row, NULL); } - virtual void flush() + virtual void flush() override { CriticalBlock b(crit); writeAtEof = true; signalReaders(); } - virtual offset_t getPosition() - { - throwUnexpected(); - return 0; - } - virtual IRowStream *queryOutput(unsigned output) +// ISharedRowStreamReader + virtual IRowStream *queryOutput(unsigned output) override { return &outputs.item(output); } - virtual void cancel() + virtual void cancel() override { CriticalBlock b(crit); stopAll(); signalReaders(); } - virtual void reset() + virtual void reset() override { init(); unsigned c=0; @@ -1195,6 +1198,29 @@ bool CRowSet::Release() const return CSimpleInterface::Release(); } +static StringBuffer &getFileIOStats(StringBuffer &output, IFileIO *iFileIO) +{ + __int64 readCycles = iFileIO->getStatistic(StCycleDiskReadIOCycles); + __int64 writeCycles = iFileIO->getStatistic(StCycleDiskWriteIOCycles); + __int64 numReads = iFileIO->getStatistic(StNumDiskReads); + __int64 numWrites = iFileIO->getStatistic(StNumDiskWrites); + offset_t bytesRead = iFileIO->getStatistic(StSizeDiskRead); + offset_t bytesWritten = iFileIO->getStatistic(StSizeDiskWrite); + if (readCycles) + output.appendf(", read-time(ms)=%" I64F "d", cycle_to_millisec(readCycles)); + if (writeCycles) + output.appendf(", write-time(ms)=%" I64F "d", cycle_to_millisec(writeCycles)); + if (numReads) + output.appendf(", numReads=%" I64F "d", numReads); + if (numWrites) + output.appendf(", numWrites=%" I64F "d", numWrites); + if (bytesRead) + output.appendf(", bytesRead=%" I64F "d", bytesRead); + if (bytesWritten) + output.appendf(", bytesWritten=%" I64F "d", bytesWritten); + return output; +} + class CSharedWriteAheadDisk : public CSharedWriteAheadBase { Owned spillFile; @@ -1514,10 +1540,14 @@ class CSharedWriteAheadDisk : public CSharedWriteAheadBase } ~CSharedWriteAheadDisk() { - spillFileIO.clear(); if (spillFile) + { + StringBuffer tracing; + getFileIOStats(tracing, spillFileIO); + activity->ActPrintLog("CSharedWriteAheadDisk: removing spill file: %s%s", spillFile->queryFilename(), tracing.str()); + spillFileIO.clear(); spillFile->remove(); - + } for (;;) { Owned chunk = savedChunks.dequeue(); @@ -1658,6 +1688,478 @@ ISharedSmartBuffer *createSharedSmartMemBuffer(CActivityBase *activity, unsigned } +// This implementation is supplied with the input, and reads from it on demand, initially to memory. +// It will spill to disk if the configurable memory limit is exceeded. +// The leading reader(output) causes the implementation to read more rows from the input. +// Once the leader causes the rows in memory to exceed the memory limit, it will cause a output stream to be created. +// From that point on, the leader will write blocks of rows out to disk, +// and cause all readers to read from it, once they have exhaused the in-memory row set. + +class CSharedFullSpillingWriteAhead : public CInterfaceOf +{ + typedef std::vector Rows; + class COutputRowStream : public CSimpleInterfaceOf + { + CSharedFullSpillingWriteAhead &owner; + unsigned whichOutput = 0; + size32_t localRowsIndex = 0; + rowcount_t lastKnownAvailable = 0; + rowcount_t currentRow = 0; + Rows rows; + OwnedIFileIO iFileIO; + Owned allocator; + Owned inputStream; + CThorStreamDeserializerSource ds; + std::atomic eof = false; + + inline const void *getClearRow(unsigned i) + { + const void *row = rows[i]; + rows[i] = nullptr; + return row; + } + void freeRows() + { + for (auto it = rows.begin() + localRowsIndex; it != rows.end(); ++it) + ReleaseThorRow(*it); + rows.clear(); + localRowsIndex = 0; + allocator->emptyCache(); + } + const void *getRowFromStream() + { + if (currentRow == lastKnownAvailable) + { + if (!owner.checkWriteAhead(lastKnownAvailable)) + { + eof = true; + return nullptr; + } + } + if (owner.inputGrouped) + { + bool eog; + inputStream->read(sizeof(bool), &eog); + if (eog) + { + currentRow++; + return nullptr; + } + } + currentRow++; + RtlDynamicRowBuilder rowBuilder(allocator); + size32_t sz = owner.deserializer->deserialize(rowBuilder, ds); + return rowBuilder.finalizeRowClear(sz); + } + public: + explicit COutputRowStream(CSharedFullSpillingWriteAhead &_owner, unsigned _whichOutput) + : owner(_owner), whichOutput(_whichOutput) + { + allocator.setown(owner.activity.getRowAllocator(owner.meta, (roxiemem::RoxieHeapFlags)owner.options.heapFlags)); + } + ~COutputRowStream() + { + freeRows(); + } + rowcount_t queryLastKnownAvailable() const + { + return lastKnownAvailable; + } + void setLastKnownAvailable(rowcount_t _lastKnownWritten) + { + lastKnownAvailable = _lastKnownWritten; + } + void cancel() + { + eof = true; + } + void reset() + { + freeRows(); + ds.setStream(nullptr); + iFileIO.clear(); + inputStream.clear(); + eof = false; + currentRow = 0; + lastKnownAvailable = 0; + } + virtual const void *nextRow() override + { + if (eof) + return nullptr; + else if (localRowsIndex < rows.size()) // NB: no longer used after inputStream is set + { + currentRow++; + return getClearRow(localRowsIndex++); + } + else if (inputStream) + return getRowFromStream(); // NB: will increment currentRow + else + { + localRowsIndex = 0; + rows.clear(); + + if (owner.getRowsInMem(rows, lastKnownAvailable)) + { + if (rows.empty()) + { + eof = true; + return nullptr; + } + else + { + currentRow++; + return getClearRow(localRowsIndex++); + } + } + else + { + auto [_inputStream, _iFileIO] = owner.getReadStream(); + inputStream.setown(_inputStream); + iFileIO.setown(_iFileIO); + ds.setStream(inputStream); + return getRowFromStream(); // NB: will increment currentRow + } + } + } + virtual void stop() override + { + freeRows(); + ds.setStream(nullptr); + + if (inputStream) + { + StringBuffer tracing; + getFileIOStats(tracing, iFileIO); + owner.activity.ActPrintLog("CSharedFullSpillingWriteAhead::COutputRowStream: input stream finished: output=%u%s", whichOutput, tracing.str()); + + iFileIO.clear(); + inputStream.clear(); + } + + // NB: this will set lastKnownAvailable to max[(rowcount_t)-1] (within owner.readAheadCS) to prevent it being considered as lowest any longer + owner.outputStopped(whichOutput); + + eof = true; + } + }; + CActivityBase &activity; + Linked input; + Linked meta; + Linked serializer; + Linked deserializer; + Linked allocator; + std::vector> outputs; + std::deque> rows; + memsize_t rowsMemUsage = 0; + std::atomic totalInputRowsRead = 0; // not used until spilling begins, represents count of all rows read + rowcount_t inMemTotalRows = 0; // whilst in memory, represents count of all rows seen + CriticalSection readAheadCS; // ensure single reader (leader), reads ahead (updates rows/totalInputRowsRead/inMemTotalRows) + Owned iFile; + Owned iFileIO; + Owned outputStream; + Linked compressHandler; + bool nextInputReadEog = false; + bool endOfInput = false; + bool inputGrouped = false; + SharedRowStreamReaderOptions options; + size32_t inMemReadAheadGranularity = 0; + size32_t compressionBlockSize = 0; + + rowcount_t getLowestOutput() + { + // NB: must be called with readAheadCS held + rowcount_t trailingRowPos = (rowcount_t)-1; + for (auto &output: outputs) + { + rowcount_t outputLastKnownWritten = output->queryLastKnownAvailable(); + if (outputLastKnownWritten < trailingRowPos) + trailingRowPos = outputLastKnownWritten; + } + return trailingRowPos; + } + inline rowcount_t getStartIndex() + { + rowcount_t nr = rows.size(); + return inMemTotalRows - nr; + } + inline unsigned getRelativeIndex(rowcount_t index) + { + rowcount_t startIndex = getStartIndex(); + return (unsigned)(index - startIndex); + } + void closeWriter() + { + iFileIO.clear(); + outputStream.clear(); + } + void createOutputStream() + { + // NB: Called once, when spilling starts. + iFileIO.setown(iFile->open(IFOcreate)); // kept for stats purposes + Owned out = createSerialOutputStream(iFileIO); + outputStream.setown(createBufferedOutputStream(out, options.storageBlockSize)); //prefered plane block size + if (compressHandler) + { + const char *compressOptions = nullptr; + Owned compressor = compressHandler->getCompressor(compressOptions); + Owned compressed = createCompressingOutputStream(outputStream, compressor); + outputStream.setown(createBufferedOutputStream(compressed, compressionBlockSize)); + } + totalInputRowsRead = inMemTotalRows; + } + void writeRowsFromInput() + { + // NB: the leading output will be calling this, and it could populate 'outputRows' as it reads ahead + // but we want to readahead + write to disk, more than we want to retain in memory, so keep it simple, + // flush all to disk, meaning this output will also read them back off disk (hopefully from Linux page cache) + rowcount_t newRowsWritten = 0; + offset_t serializedSz = 0; + COutputStreamSerializer outputStreamSerializer(outputStream); + while (!activity.queryAbortSoon()) + { + OwnedConstThorRow row = input->nextRow(); + if (nullptr == row) + { + if (!inputGrouped || nextInputReadEog) + { + endOfInput = true; + break; + } + nextInputReadEog = true; + outputStream->put(sizeof(bool), &nextInputReadEog); + newRowsWritten++; + } + else + { + if (inputGrouped) + { + nextInputReadEog = false; + outputStream->put(sizeof(bool), &nextInputReadEog); + } + serializer->serialize(outputStreamSerializer, (const byte *)row.get()); + newRowsWritten++; + size32_t rowSz = thorRowMemoryFootprint(serializer, row); + serializedSz += rowSz; + if (serializedSz >= options.spillWriteAheadSize) + break; + } + } + outputStream->flush(); + totalInputRowsRead.fetch_add(newRowsWritten); + + // JCSMORE - could track size written, and start new file at this point (e.g. every 100MB), + // and track their starting points (by row #) in a vector + // We could then tell if/when the readers catch up, and remove consumed files as they do. + } + void freeRows() + { + for (auto &row: rows) + ReleaseThorRow(std::get<0>(row)); + } +public: + explicit CSharedFullSpillingWriteAhead(CActivityBase *_activity, unsigned numOutputs, IRowStream *_input, bool _inputGrouped, const SharedRowStreamReaderOptions &_options, IThorRowInterfaces *rowIf, const char *tempFileName, ICompressHandler *_compressHandler) + : activity(*_activity), input(_input), inputGrouped(_inputGrouped), options(_options), compressHandler(_compressHandler), + meta(rowIf->queryRowMetaData()), serializer(rowIf->queryRowSerializer()), allocator(rowIf->queryRowAllocator()), deserializer(rowIf->queryRowDeserializer()) + { + assertex(input); + + // cap inMemReadAheadGranularity to inMemMaxMem + inMemReadAheadGranularity = options.inMemReadAheadGranularity; + if (inMemReadAheadGranularity > options.inMemMaxMem) + inMemReadAheadGranularity = options.inMemMaxMem; + + constexpr size32_t minCompressionBlockSize = 256 * 1024; + memsize_t totalCompressionBufferSize = options.totalCompressionBufferSize; + if (totalCompressionBufferSize) + { + compressionBlockSize = (size32_t)(totalCompressionBufferSize / (numOutputs + 1)); // +1 for writer + if (compressionBlockSize < minCompressionBlockSize) + { + WARNLOG("Shared totalCompressionBufferSize=%" I64F "u, too small for number of outputs(%u). Using minCompressionBlockSize(%u) for writer and each reader.", (unsigned __int64)totalCompressionBufferSize, numOutputs, minCompressionBlockSize); + compressionBlockSize = minCompressionBlockSize; + } + } + for (unsigned o=0; oremove(); + } + freeRows(); + } + void outputStopped(unsigned output) + { + bool allStopped = false; + { + // Mark finished output with max, so that it is not considered by getLowestOutput() + CriticalBlock b(readAheadCS); // read ahead could be active and considering this output + outputs[output]->setLastKnownAvailable((rowcount_t)-1); + if ((rowcount_t)-1 == getLowestOutput()) + allStopped = true; + } + if (allStopped) + { + if (totalInputRowsRead) // only set if spilt + { + StringBuffer tracing; + getFileIOStats(tracing, iFileIO); + activity.ActPrintLog("CSharedFullSpillingWriteAhead: removing spill file: %s%s", iFile->queryFilename(), tracing.str()); + closeWriter(); + iFile->remove(); + } + } + } + std::tuple getReadStream() // also pass back IFileIO for stats purposes + { + Owned iFileIO = iFile->open(IFOread); + Owned in = createSerialInputStream(iFileIO); + Owned inputStream = createBufferedInputStream(in, options.storageBlockSize, 0); + if (compressHandler) + { + const char *decompressOptions = nullptr; + Owned decompressor = compressHandler->getExpander(decompressOptions); + Owned decompressed = createDecompressingInputStream(inputStream, decompressor); + inputStream.setown(createBufferedInputStream(decompressed, compressionBlockSize, 0)); + } + return { inputStream.getClear(), iFileIO.getClear() }; + } + bool checkWriteAhead(rowcount_t &outputRowsAvailable) + { + if (totalInputRowsRead == outputRowsAvailable) + { + CriticalBlock b(readAheadCS); + if (totalInputRowsRead == outputRowsAvailable) // if not, then since gaining the crit, totalInputRowsRead has changed + { + if (endOfInput) + return false; + writeRowsFromInput(); + if (totalInputRowsRead == outputRowsAvailable) // no more were written + { + dbgassertex(endOfInput); + return false; + } + } + } + outputRowsAvailable = totalInputRowsRead; + return true; + } + bool getRowsInMem(Rows &outputRows, rowcount_t &outputRowsAvailable) + { + CriticalBlock b(readAheadCS); + if (outputRowsAvailable == inMemTotalRows) // load more + { + // prune unused rows + rowcount_t trailingRowPosRelative = getRelativeIndex(getLowestOutput()); + for (auto it = rows.begin(); it != rows.begin() + trailingRowPosRelative; ++it) + { + auto [row, rowSz] = *it; + rowsMemUsage -= rowSz; + ReleaseThorRow(row); + } + rows.erase(rows.begin(), rows.begin() + trailingRowPosRelative); + + if (outputStream) + { + // this will be the last time this output calls getRowsInMem + // it has exhausted 'rows', and will from here on in read from outputStream + return false; + } + + if (rowsMemUsage >= options.inMemMaxMem) // too much in memory, spill + { + // NB: this will reset rowMemUsage, however, each reader will continue to consume rows until they catch up (or stop) + ActPrintLog(&activity, "Spilling to temp storage [file = %s, outputRowsAvailable = %" I64F "u, start = %" I64F "u, end = %" I64F "u, count = %u]", iFile->queryFilename(), outputRowsAvailable, inMemTotalRows - rows.size(), inMemTotalRows, (unsigned)rows.size()); + createOutputStream(); + return false; + } + + // read more, up to inMemReadAheadGranularity or inMemReadAheadGranularityRows before relinquishing + rowcount_t previousNumRows = rows.size(); + while (true) + { + const void *row = input->nextRow(); + if (row) + { + nextInputReadEog = false; + size32_t sz = thorRowMemoryFootprint(serializer, row); + rows.emplace_back(row, sz); + rowsMemUsage += sz; + if ((rowsMemUsage >= options.inMemReadAheadGranularity) || + (rows.size() >= options.inMemReadAheadGranularityRows)) + break; + } + else + { + if (!inputGrouped || nextInputReadEog) + break; + else + { + nextInputReadEog = true; + rows.emplace_back(nullptr, 0); + } + } + } + inMemTotalRows += rows.size() - previousNumRows; + } + else + { + // this output has not yet reached inMemTotalRows + dbgassertex(outputRowsAvailable < inMemTotalRows); + } + + rowcount_t newRowsAdded = 0; + for (auto it = rows.begin() + getRelativeIndex(outputRowsAvailable); it != rows.end(); ++it) + { + const void *row = std::get<0>(*it); + LinkThorRow(row); + outputRows.push_back(row); + newRowsAdded++; + } + outputRowsAvailable = outputRowsAvailable+newRowsAdded; + + return true; + } +// ISharedRowStreamReader impl. + virtual IRowStream *queryOutput(unsigned output) override + { + return outputs[output]; + } + virtual void cancel() override + { + for (auto &output: outputs) + output->cancel(); + } + virtual void reset() override + { + if (outputStream) // should have already been closed when inputs all stopped + { + closeWriter(); + iFile->remove(); + } + for (auto &output: outputs) + output->reset(); + freeRows(); + rows.clear(); + rowsMemUsage = 0; + totalInputRowsRead = 0; + inMemTotalRows = 0; + nextInputReadEog = false; + endOfInput = false; + } +}; + +ISharedRowStreamReader *createSharedFullSpillingWriteAhead(CActivityBase *_activity, unsigned numOutputs, IRowStream *_input, bool _inputGrouped, const SharedRowStreamReaderOptions &options, IThorRowInterfaces *_rowIf, const char *tempFileName, ICompressHandler *compressHandler) +{ + return new CSharedFullSpillingWriteAhead(_activity, numOutputs, _input, _inputGrouped, options, _rowIf, tempFileName, compressHandler); +} + + class CRowMultiWriterReader : public CSimpleInterface, implements IRowMultiWriterReader { rowidx_t readGranularity, writeGranularity, rowPos, limit, rowsToRead; diff --git a/thorlcr/thorutil/thbuf.hpp b/thorlcr/thorutil/thbuf.hpp index f75b353951f..dc64aeb888d 100644 --- a/thorlcr/thorutil/thbuf.hpp +++ b/thorlcr/thorutil/thbuf.hpp @@ -55,6 +55,24 @@ extern graph_decl ISmartRowBuffer * createSmartInMemoryBuffer(CActivityBase *act IThorRowInterfaces *rowIf, size32_t buffsize); +struct SharedRowStreamReaderOptions +{ + offset_t storageBlockSize = 256 * 1024; // block size of read/write streams + memsize_t totalCompressionBufferSize = 3000 * 1024; // compression buffer size of read streams (split between writer and outputs) + memsize_t inMemMaxMem = 2000 * 1024; // before spilling begins. + memsize_t inMemReadAheadGranularity = 128 * 1024; // granularity (K) of read ahead + rowcount_t inMemReadAheadGranularityRows = 64; // granularity (rows) of read ahead. NB: whichever granularity is hit first + offset_t spillWriteAheadSize = 2000 * 1024; // once spilling, maximum size to write ahead + unsigned heapFlags = roxiemem::RHFunique|roxiemem::RHFblocked; +}; +interface ISharedRowStreamReader : extends IInterface +{ + virtual IRowStream *queryOutput(unsigned output) = 0; + virtual void cancel()=0; + virtual void reset() = 0; +}; + + // Multiple readers, one writer interface ISharedSmartBufferCallback { @@ -62,18 +80,20 @@ interface ISharedSmartBufferCallback virtual void blocked() = 0; virtual void unblocked() = 0; }; -interface ISharedSmartBuffer : extends IRowWriter + +interface ISharedSmartBufferRowWriter : extends IRowWriter { - using IRowWriter::putRow; virtual void putRow(const void *row, ISharedSmartBufferCallback *callback) = 0; // extended form of putRow, which signals when pages out via callback - virtual IRowStream *queryOutput(unsigned output) = 0; - virtual void cancel()=0; - virtual void reset() = 0; +}; + +interface ISharedSmartBuffer : extends ISharedRowStreamReader +{ + virtual ISharedSmartBufferRowWriter *getWriter() = 0; }; extern graph_decl ISharedSmartBuffer *createSharedSmartMemBuffer(CActivityBase *activity, unsigned outputs, IThorRowInterfaces *rowif, unsigned buffSize=((unsigned)-1)); extern graph_decl ISharedSmartBuffer *createSharedSmartDiskBuffer(CActivityBase *activity, const char *tempname, unsigned outputs, IThorRowInterfaces *rowif); - +extern graph_decl ISharedRowStreamReader *createSharedFullSpillingWriteAhead(CActivityBase *_activity, unsigned numOutputs, IRowStream *_input, bool _inputGrouped, const SharedRowStreamReaderOptions &options, IThorRowInterfaces *_rowIf, const char *tempFileName, ICompressHandler *compressHandler); interface IRowWriterMultiReader : extends IRowWriter { diff --git a/thorlcr/thorutil/thormisc.hpp b/thorlcr/thorutil/thormisc.hpp index 2bc8f0c776d..a3f394a83c1 100644 --- a/thorlcr/thorutil/thormisc.hpp +++ b/thorlcr/thorutil/thormisc.hpp @@ -55,6 +55,11 @@ #define THOROPT_HDIST_COMP "hdCompressorType" // Distribute compressor to use (default = "LZ4") #define THOROPT_HDIST_COMPOPTIONS "hdCompressorOptions" // Distribute compressor options, e.g. AES key (default = "") #define THOROPT_SPLITTER_SPILL "splitterSpill" // Force splitters to spill or not, default is to adhere to helper setting (default = -1) +#define THOROPT_SPLITTER_MAXROWMEMK "splitterRowMemK" // Splitter max memory (K) to use before spilling (default = 2MB) +#define THOROPT_SPLITTER_READAHEADGRANULARITYK "inMemReadAheadGranularityK" // Splitter in memory read ahead granularity (K) (default = 128K) +#define THOROPT_SPLITTER_READAHEADGRANULARITYROWS "inMemReadAheadGranularityRows" // Splitter in memory read ahead granularity (# rows) (default = 64) +#define THOROPT_SPLITTER_WRITEAHEADK "splitterWriteAheadK" // Splitter spilling write ahead size (K) (default = 2MB) +#define THOROPT_SPLITTER_COMPRESSIONTOALK "splitterCompressionTotalK" // Splitter total compression buffer size (shared between writer and readers) (K) (default = 3MB) #define THOROPT_LOOP_MAX_EMPTY "loopMaxEmpty" // Max # of iterations that LOOP can cycle through with 0 results before errors (default = 1000) #define THOROPT_SMALLSORT "smallSortThreshold" // Use minisort approach, if estimate size of data to sort is below this setting (default = 0) #define THOROPT_PARALLEL_FUNNEL "parallelFunnel" // Use parallel funnel impl. if !ordered (default = true)