From 2b71ffb1134a775dd340deee076928c5fcb8c9c8 Mon Sep 17 00:00:00 2001 From: Jake Smith Date: Wed, 19 Jun 2024 12:44:19 +0100 Subject: [PATCH] framework --- system/jlib/jqueue.hpp | 49 ++ .../activities/nsplitter/thnsplitterslave.cpp | 4 +- thorlcr/activities/thactivityutil.cpp | 40 +- thorlcr/thorutil/thbuf.cpp | 537 +++++++++++++++++- thorlcr/thorutil/thbuf.hpp | 23 +- thorlcr/thorutil/thormisc.hpp | 7 +- 6 files changed, 646 insertions(+), 14 deletions(-) diff --git a/system/jlib/jqueue.hpp b/system/jlib/jqueue.hpp index e9447833c65..d9d33d1aed4 100644 --- a/system/jlib/jqueue.hpp +++ b/system/jlib/jqueue.hpp @@ -546,5 +546,54 @@ class DListOf } }; +template +class CSPSCQueue +{ + size32_t maxCapacity = 0; + std::vector elements; + std::atomic head = 0; + std::atomic tail = 0; + + inline size32_t increment(size32_t idx) const + { + return (idx + 1) % maxCapacity; + } +public: + CSPSCQueue() + { + // should set capacity before using + } + CSPSCQueue(size32_t _maxCapacity) + : maxCapacity(_maxCapacity + 1), // +1 to distinguish full vs empty + elements(maxCapacity) + { + } + void setCapacity(size32_t _maxCapacity) + { + maxCapacity = _maxCapacity + 1; + elements.resize(maxCapacity); + } + bool enqueue(const T e) + { + size32_t currentHead = head; + size32_t nextHead = increment(currentHead); + if (nextHead == tail) + return false; // full + + elements[currentHead] = std::move(e); + head = nextHead; + return true; + } + bool dequeue(T &res) + { + size32_t currentTail = tail; + if (currentTail == head) + return false; // empty + + res = std::move(elements[currentTail]); + tail = increment(currentTail); + return true; + } +}; #endif diff --git a/thorlcr/activities/nsplitter/thnsplitterslave.cpp b/thorlcr/activities/nsplitter/thnsplitterslave.cpp index 191d005fa9a..de22da08908 100644 --- a/thorlcr/activities/nsplitter/thnsplitterslave.cpp +++ b/thorlcr/activities/nsplitter/thnsplitterslave.cpp @@ -251,9 +251,9 @@ class NSplitterSlaveActivity : public CSlaveActivity, implements ISharedSmartBuf if ((size32_t)-1 != blockedSequentialIOSize) options.storageBlockSize = blockedSequentialIOSize; } - options.totalCompressionBufferSize = getOptInt(THOROPT_SPLITTER_COMPRESSIONTOALK, options.totalCompressionBufferSize / 1024) * 1024; + options.totalCompressionBufferSize = getOptInt(THOROPT_SPLITTER_COMPRESSIONTOTALK, options.totalCompressionBufferSize / 1024) * 1024; options.inMemMaxMem = getOptInt(THOROPT_SPLITTER_MAXROWMEMK, options.inMemMaxMem / 1024) * 1024; - options.spillWriteAheadSize = getOptInt64(THOROPT_SPLITTER_WRITEAHEADK, options.spillWriteAheadSize / 1024) * 1024; + options.writeAheadSize = getOptInt64(THOROPT_SPLITTER_WRITEAHEADK, options.writeAheadSize / 1024) * 1024; options.inMemReadAheadGranularity = getOptInt(THOROPT_SPLITTER_READAHEADGRANULARITYK, options.inMemReadAheadGranularity / 1024) * 1024; options.inMemReadAheadGranularityRows = getOptInt(THOROPT_SPLITTER_READAHEADGRANULARITYROWS, options.inMemReadAheadGranularity); options.heapFlags = getOptInt("spillheapflags", options.heapFlags); diff --git a/thorlcr/activities/thactivityutil.cpp b/thorlcr/activities/thactivityutil.cpp index f5701672c18..ee7283f2f05 100644 --- a/thorlcr/activities/thactivityutil.cpp +++ b/thorlcr/activities/thactivityutil.cpp @@ -66,6 +66,8 @@ class CRowStreamLookAhead : public CSimpleInterfaceOf rowcount_t required; Semaphore startSem; Owned getexception; + LookAheadOptions options; + bool newLookAhead = false; class CThread: public Thread { @@ -94,12 +96,19 @@ class CRowStreamLookAhead : public CSimpleInterfaceOf { try { - StringBuffer temp; - if (allowspill) - GetTempFilePath(temp,"lookahd"); assertex(bufsize); if (allowspill) - smartbuf.setown(createSmartBuffer(&activity, temp.str(), bufsize, rowIf)); + { + StringBuffer temp; + GetTempFilePath(temp,"lookahd"); + if (newLookAhead) + { + ICompressHandler *compressHandler = options.totalCompressionBufferSize ? queryDefaultCompressHandler() : nullptr; + smartbuf.setown(createCompressedSpillingRowStream(&activity, temp.str(), preserveGrouping, rowIf, options, compressHandler)); + } + else + smartbuf.setown(createSmartBuffer(&activity, temp.str(), bufsize, rowIf)); + } else smartbuf.setown(createSmartInMemoryBuffer(&activity, rowIf, bufsize)); startSem.signal(); @@ -207,6 +216,29 @@ class CRowStreamLookAhead : public CSimpleInterfaceOf running = true; required = _required; count = 0; + + newLookAhead = activity.getOptBool("newlookahead", false); + if (activity.getOptBool("forcenewlookahead", false)) + { + newLookAhead = true; + allowspill = true; + } + + // JCSMORE for "newlookahead" only + if (isContainerized()) + { + // JCSMORE - add CJobBase::getTempBlockSize() to calc. once. + StringBuffer planeName; + if (!getDefaultPlane(planeName, "@tempPlane", "temp")) + getDefaultPlane(planeName, "@spillPlane", "spill"); + size32_t blockedSequentialIOSize = getPlaneAttributeValue(planeName, BlockedSequentialIO, (size32_t)-1); + if ((size32_t)-1 != blockedSequentialIOSize) + options.storageBlockSize = blockedSequentialIOSize; + } + options.totalCompressionBufferSize = activity.getOptInt(THOROPT_LOOKAHEAD_COMPRESSIONTOTALK, options.totalCompressionBufferSize / 1024) * 1024; + options.inMemMaxMem = activity.getOptInt(THOROPT_LOOKAHEAD_MAXROWMEMK, options.inMemMaxMem / 1024) * 1024; + options.writeAheadSize = activity.getOptInt64(THOROPT_LOOKAHEAD_WRITEAHEADK, options.writeAheadSize / 1024) * 1024; + options.tempFileGranularity = activity.getOptInt64(THOROPT_LOOKAHEAD_TEMPFILE_GRANULARITY, options.tempFileGranularity / 0x100000) * 0x100000; } ~CRowStreamLookAhead() { diff --git a/thorlcr/thorutil/thbuf.cpp b/thorlcr/thorutil/thbuf.cpp index 951b7db82d2..d8d3b37af6b 100644 --- a/thorlcr/thorutil/thbuf.cpp +++ b/thorlcr/thorutil/thbuf.cpp @@ -17,10 +17,12 @@ #include #include +#include #include "platform.h" #include #include #include "jlib.hpp" +#include "jqueue.hpp" #include "jmisc.hpp" #include "jio.hpp" #include "jlzw.hpp" @@ -606,6 +608,534 @@ class CSmartRowInMemoryBuffer: public CSimpleInterface, implements ISmartRowBuff } }; +//#define TRACE_SPILLING_ROWSTREAM // traces each row read/written, and other events + +// based on query that produces records with a single sequential (from 1) unsigned4 +// #define VERIFY_ROW_IDS_SPILLING_ROWSTREAM + +//#define STRESSTEST_SPILLING_ROWSTREAM // 'stressLookAhead' code, which when enabled, reduces buffer sizes etc. to stress test the lookahead spilling + + + +/* CCompressedSpillingRowStream implementation details: + - Writer: + - The writer to an in-memory queue, and when the queue is full, or a certain number of rows have been queued, it writes to starts writing to temp files. + - The writer will always write to the queue if it can, even after it has started spilling. + - The writer commits to disk at LookAheadOptions::writeAheadSize granularity + - The writer creates a new temp file when the current one reaches LookAheadOptions::tempFileGranularity + - The writer pushes the current nextOutputRow to a queue when it creates the next output file (used by the reader to know when to move to next) + - NB: writer implements ISmartRowBuffer::flush() which has slightly weird semantics (blocks until everything is read or stopped) +- Reader: + - The reader will read from the queue until it is exhausted, and block to be signalled for more. + - If the reader dequeues a row that is ahead of the expected 'nextInputRow', it will stash it, and read from disk until it catches up to that row. + - If the reader is reading from disk and it catches up with 'committedRows' it will block until the writer has committed more rows. + - When reading from a temp file, it will take ownership the CFileOwner and dispose of the underlying file when it has consumed it. + - The reader will read from the stream until it hits 'currentTempFileEndRow' (initially 0), at which point it will open the next temp file. + */ + +// NB: Supports being read by 1 thread and written to by another only +class CCompressedSpillingRowStream: public CSimpleInterfaceOf, implements IRowWriter +{ + typedef std::tuple RowEntry; + + CActivityBase &activity; // ctor input parameter + StringAttr baseTmpFilename; // ctor input parameter + LookAheadOptions options; // ctor input parameter + Linked compressHandler; // ctor input parameter + + // derived from input paramter (IThorRowInterfaces *rowIf) + Linked meta; + Linked serializer; + Linked allocator; + Linked deserializer; + memsize_t compressionBlockSize = 0; // filled in createOutputStream + + // in-memory related members + CSPSCQueue inMemRows; + std::atomic inMemRowsMemoryUsage = 0; // NB updated from writer and reader threads + Semaphore moreRows; + std::atomic readerWaitingForQ = false; // set by reader, cleared by writer + + // temp write related members + Owned outputStream; + std::unique_ptr outputStreamSerializer; + memsize_t pendingFlushToDiskSz = 0; + offset_t currentTempFileSize = 0; + Owned currentOutputIFileIO; // keep for stats + CSPSCQueue outputFiles; + unsigned writeTempFileNum = 0; + std::atomic nextOutputRow = 0; // read by reader, updated by writer + std::atomic committedRows = 0; // read by reader, updated by writer + std::atomic spilt = false; // set by createOutputStream, checked by reader + std::deque outputFileEndRowMarkers; + bool lastWriteWasEog = false; + bool outputComplete = false; + bool recentlyQueued = false; + CriticalSection outputStreamCS; + + // temp read related members + unsigned readTempFileNum = 0; + std::atomic currentTempFileEndRow = 0; + Owned currentInputIFileIO; // keep for stats + Linked currentOwnedInputFile; + Owned inputStream; + CThorStreamDeserializerSource inputDeserializerSource; + rowcount_t nextInputRow = 0; + bool readerWaitingForCommit = false; + static constexpr unsigned readerWakeupGranularity = 32; // how often to wake up the reader if it is waiting for more rows + enum ReadState { rs_fromqueue, rs_frommarker, rs_endstream, rs_stopped } readState = rs_fromqueue; + RowEntry readFromStreamMarker = { nullptr, 0, 0 }; + + // misc + bool grouped = false; // ctor input parameter + CriticalSection readerWriterCS; +#ifdef STRESSTEST_SPILLING_ROWSTREAM + bool stressTest = false; +#endif + + // annoying flush semantics + bool flushWaiting = false; + Semaphore flushWaitSem; + + + void trace(const char *format, ...) + { +#ifdef TRACE_SPILLING_ROWSTREAM + va_list args; + va_start(args, format); + VALOG(MCdebugInfo, format, args); + va_end(args); +#endif + } + void createNextOutputStream() + { + VStringBuffer tmpFilename("%s.%u", baseTmpFilename.get(), writeTempFileNum++); + trace("WRITE: writing to %s", tmpFilename.str()); + Owned iFile = createIFile(tmpFilename); + assertex(outputFiles.enqueue(new CFileOwner(iFile, activity.queryTempFileSizeTracker()))); + + currentOutputIFileIO.setown(iFile->open(IFOcreate)); // kept for stats purposes + Owned out = createSerialOutputStream(currentOutputIFileIO); + outputStream.setown(createBufferedOutputStream(out, options.storageBlockSize)); //prefered plane block size + if (compressHandler) + { + const char *compressOptions = nullptr; + Owned compressor = compressHandler->getCompressor(compressOptions); + Owned compressed = createCompressingOutputStream(outputStream, compressor); + compressionBlockSize = std::max(options.totalCompressionBufferSize, (memsize_t)(256 * 1024)); + outputStream.setown(createBufferedOutputStream(compressed, compressionBlockSize)); + } + outputStreamSerializer = std::make_unique(outputStream); + } + void createNextInputStream() + { + dbgassertex(readTempFileNum <= writeTempFileNum); + CFileOwner *dequeuedOwnedIFile = nullptr; + bool tf = outputFiles.dequeue(dequeuedOwnedIFile); + assertex(tf); + currentOwnedInputFile.setown(dequeuedOwnedIFile); + IFile *iFile = ¤tOwnedInputFile->queryIFile(); + trace("READ: reading from %s", iFile->queryFilename()); + currentInputIFileIO.setown(iFile->open(IFOread)); + Owned in = createSerialInputStream(currentInputIFileIO); + inputStream.setown(createBufferedInputStream(in, options.storageBlockSize, 0)); + if (compressHandler) + { + const char *decompressOptions = nullptr; + Owned decompressor = compressHandler->getExpander(decompressOptions); + Owned decompressed = createDecompressingInputStream(inputStream, decompressor); + inputStream.setown(createBufferedInputStream(decompressed, compressionBlockSize, 0)); + inputDeserializerSource.setStream(inputStream); + } + } + const void *readRowFromStream() + { + // NB: currentTempFileEndRow will be 0 if 1st input read + // nextInputRow can be > currentTempFileEndRow, because the writer/read may have used the Q + // beyond this point, the next row in the stream could be anywhere above. + if (nextInputRow >= currentTempFileEndRow) + { + createNextInputStream(); + CriticalBlock b(outputStreamCS); + if (nextInputRow >= currentTempFileEndRow) + { + if (!outputFileEndRowMarkers.empty()) + { + // if the read caught up, and had previously set currentTempFileEndRow to unbounded, + // but now we have enties, spin through until outputFileEndRowMarkers ahead of nextInputRow + while (true) + { + currentTempFileEndRow = outputFileEndRowMarkers.front(); + outputFileEndRowMarkers.pop_front(); + if (currentTempFileEndRow > nextInputRow) + { + trace("READ: setting currentTempFileEndRow: %" RCPF "u", currentTempFileEndRow.load()); + break; + } + else if (outputFileEndRowMarkers.empty()) + { + trace("READ: setting currentTempFileEndRow: unbounded (2)"); + currentTempFileEndRow = (rowcount_t)-1; // unbounded for now, writer will set when it knows + break; + } + } + } + else + { + currentTempFileEndRow = (rowcount_t)-1; // unbounded for now, writer will set when it knows + trace("READ: setting currentTempFileEndRow: unbounded"); + } + } + } + if (grouped) + { + bool eog; + inputStream->read(sizeof(bool), &eog); + if (eog) + return nullptr; + } + RtlDynamicRowBuilder rowBuilder(allocator); + size32_t sz = deserializer->deserialize(rowBuilder, inputDeserializerSource); + const void *row = rowBuilder.finalizeRowClear(sz); + checkCurrentRow("S: ", row, nextInputRow); + return row; + } + void writeRowToStream(const void *row, size32_t rowSz) + { + if (!spilt) + { + spilt = true; + ActPrintLog(&activity, "Spilling to temp storage [file = %s]", baseTmpFilename.get()); + createNextOutputStream(); + } + if (grouped) + { + bool eog = (nullptr == row); + outputStream->put(sizeof(bool), &eog); + pendingFlushToDiskSz++; + if (nullptr == row) + return; + } + serializer->serialize(*outputStreamSerializer.get(), (const byte *)row); + pendingFlushToDiskSz += rowSz; + } + void checkReleaseQBlockReader() + { + if (readerWaitingForQ) + { + readerWaitingForQ = false; + moreRows.signal(); + } + } + void checkReleaseReaderCommitBlocked() + { + if (readerWaitingForCommit) + { + readerWaitingForCommit = false; + moreRows.signal(); + } + } + void handleInputComplete() + { + readState = rs_stopped; + if (flushWaiting) + { + flushWaiting = false; + flushWaitSem.signal(); + } + } + bool checkFlushToDisk(size32_t threshold) + { + if (pendingFlushToDiskSz <= threshold) + return false; + rowcount_t _nextOutputRow = nextOutputRow.load(); + trace("WRITE: Flushed to disk. nextOutputRow = %" RCPF "u", _nextOutputRow); + outputStream->flush(); + currentTempFileSize += pendingFlushToDiskSz; + pendingFlushToDiskSz = 0; + if (currentTempFileSize > options.tempFileGranularity) + { + currentTempFileSize = 0; + { + CriticalBlock b(outputStreamCS); + if ((rowcount_t)-1 == currentTempFileEndRow) + { + currentTempFileEndRow = _nextOutputRow; + trace("WRITE: setting currentTempFileEndRow: %" RCPF "u", currentTempFileEndRow.load()); + } + outputFileEndRowMarkers.push_back(nextOutputRow); + trace("WRITE: adding to tempFileEndRowMarker(size=%u): %" RCPF "u", (unsigned)outputFileEndRowMarkers.size(), _nextOutputRow); + } + createNextOutputStream(); + } + committedRows = _nextOutputRow; + return true; + } + void addRow(const void *row) + { + bool queued = false; + size32_t rowSz = row ? thorRowMemoryFootprint(serializer, row) : 0; + if (rowSz + inMemRowsMemoryUsage < options.inMemMaxMem) + queued = inMemRows.enqueue({ row, nextOutputRow, rowSz }); + if (queued) + { + trace("WRITE: Q: nextOutputRow: %" RCPF "u", nextOutputRow.load()); + inMemRowsMemoryUsage += rowSz; + ++nextOutputRow; + } + else + { + trace("WRITE: S: nextOutputRow: %" RCPF "u", nextOutputRow.load()); + writeRowToStream(row, rowSz); // JCSMORE - rowSz is memory not disk size... does it matter that much? + ++nextOutputRow; + if (checkFlushToDisk(options.writeAheadSize)) + { + CriticalBlock b(readerWriterCS); + checkReleaseReaderCommitBlocked(); + } + } + + // do not wake up reader every time a row is queued (but granularly) to avoid excessive flapping + if (queued) + recentlyQueued = true; + if (recentlyQueued && (0 == (nextOutputRow % readerWakeupGranularity))) + { + recentlyQueued = false; + CriticalBlock b(readerWriterCS); + checkReleaseQBlockReader(); + } + } + const void *getQRow(RowEntry &e) + { + rowcount_t writeRow = std::get<1>(e); + inMemRowsMemoryUsage -= std::get<2>(e); + if (writeRow == nextInputRow) + { +#ifdef STRESSTEST_SPILLING_ROWSTREAM + if (0 == (nextInputRow % 100)) + MilliSleep(5); +#endif + + const void *row = std::get<0>(e); + checkCurrentRow("Q: ", row, nextInputRow); + ++nextInputRow; + return row; + } + else + { + // queued row is ahead of reader position, save marker and read from stream until marker + dbgassertex(writeRow > nextInputRow); + readFromStreamMarker = e; + readState = rs_frommarker; + return readToMarker(); + } + + } + inline void checkCurrentRow(const char *msg, const void *row, rowcount_t expectedId) + { +#ifdef VERIFY_ROW_IDS_SPILLING_ROWSTREAM + unsigned id; + memcpy(&id, row, sizeof(unsigned)); + assertex(id-1 == expectedId); + trace("READ: %s nextInputRow: %" RCPF "u", msg, expectedId); +#endif + } + const void *readToMarker() + { + rowcount_t markerRow = std::get<1>(readFromStreamMarker); + if (markerRow == nextInputRow) + { + const void *ret = std::get<0>(readFromStreamMarker); + checkCurrentRow("M: ", ret, nextInputRow); + readFromStreamMarker = { nullptr, 0, 0 }; + readState = rs_fromqueue; + ++nextInputRow; + return ret; + } + else if (markerRow > committedRows) // rows we need have not yet been committed to disk + { + CLeavableCriticalBlock b(readerWriterCS); + if (markerRow > committedRows) // JCSMORE this is waiting too long, I don't need markerRow to be committed, I need nextInputRow to be committed .. + { + // wait for writer to commit + readerWaitingForCommit = true; + b.leave(); + trace("READ: waiting for committedRows(currently = %" RCPF "u) to catch up to markerRow(%" RCPF "u), nextInputRow = %" RCPF "u", committedRows.load(), markerRow, nextInputRow); + moreRows.wait(); + assertex(markerRow <= committedRows); + } + } + const void *row = readRowFromStream(); + ++nextInputRow; + return row; + } +public: + IMPLEMENT_IINTERFACE_O_USING(CSimpleInterfaceOf); + + explicit CCompressedSpillingRowStream(CActivityBase *_activity, const char *_baseTmpFilename, bool _grouped, IThorRowInterfaces *rowIf, const LookAheadOptions &_options, ICompressHandler *_compressHandler) + : activity(*_activity), baseTmpFilename(_baseTmpFilename), grouped(_grouped), options(_options), compressHandler(_compressHandler), + meta(rowIf->queryRowMetaData()), serializer(rowIf->queryRowSerializer()), allocator(rowIf->queryRowAllocator()), deserializer(rowIf->queryRowDeserializer()) + { + size32_t minSize = meta->getMinRecordSize(); + +#ifdef STRESSTEST_SPILLING_ROWSTREAM + stressTest = activity.getOptBool("stressLookAhead"); + if (stressTest) + { + options.inMemMaxMem = minSize * 4; + options.writeAheadSize = options.inMemMaxMem * 2; + options.tempFileGranularity = options.inMemMaxMem * 4; + } +#endif + + if (minSize < 16) + minSize = 16; // not too important, just using to cap inMemRows queue size + inMemRows.setCapacity(options.inMemMaxMem / minSize); + + assertex(options.writeAheadSize < options.tempFileGranularity); + + // let's allow plenty of room. Allow for ~4TB of temp file usage + size32_t capacity = 4000LL*1000*0x100000 / options.tempFileGranularity; + outputFiles.setCapacity(capacity); + } + ~CCompressedSpillingRowStream() + { + while (true) + { + CFileOwner *fileOwner; + if (!outputFiles.dequeue(fileOwner)) + break; + fileOwner->Release(); + } + } + +// ISmartRowBuffer + virtual IRowWriter *queryWriter() override + { + return this; + } +// IRowStream + virtual const void *nextRow() override + { + switch (readState) + { + case rs_fromqueue: + { + while (true) + { + RowEntry e; + if (inMemRows.dequeue(e)) + return getQRow(e); + else + { + { + CLeavableCriticalBlock b(readerWriterCS); + // Recheck Q now have CS, if reader here and writer ready to signal more, then it may have just released CS + if (inMemRows.dequeue(e)) + { + b.leave(); + return getQRow(e); + } + else if (outputComplete)// && (nextInputRow == nextOutputRow)) + { + if (nextInputRow == nextOutputRow) + { + handleInputComplete(); // sets readState to rs_stopped + return nullptr; + } + else + { + // writer has finished, nothing is on the queue or will be queued, rest is on disk + readState = rs_endstream; + const void *row = readRowFromStream(); + ++nextInputRow; + return row; + } + } + readerWaitingForQ = true; + } + trace("READ: waiting for Q'd rows @ %" RCPF "u (nextOutputRow = %" RCPF "u)", nextInputRow, nextOutputRow.load()); + moreRows.wait(); + } + } + return nullptr; + } + case rs_frommarker: + { + return readToMarker(); + } + case rs_endstream: + { + if (nextInputRow == nextOutputRow) + { + readState = rs_stopped; + return nullptr; + } + const void *row = readRowFromStream(); + ++nextInputRow; + return row; + } + case rs_stopped: + return nullptr; + } + } + virtual void stop() override + { + CriticalBlock b(readerWriterCS); + handleInputComplete(); + } +// IRowWriter + virtual void putRow(const void *row) override + { + if (outputComplete) + { + // should never get here, but guard against. + OwnedConstThorRow tmpRow(row); + assertex(!row); + return; + } + + if (row) + { + lastWriteWasEog = false; + addRow(row); + } + else // eog + { + if (lastWriteWasEog) // error, should not have two EOGs in a row + return; + else if (grouped) + { + lastWriteWasEog = true; + addRow(nullptr); + } + else // non-grouped nulls unexpected + throwUnexpected(); + } + } + virtual void flush() override + { + // semantics of ISmartRowBuffer::flush: + // - tell smartbuf that there will be no more rows written (BUT should only be called after finished writing) + // - wait for all rows to be read from smartbuf, or smartbuf stopped before returning. + + bool flushedToDisk = checkFlushToDisk(0); + { + CriticalBlock b(readerWriterCS); + outputComplete = true; + if (rs_stopped == readState) + return; + flushWaiting = true; + if (flushedToDisk) + checkReleaseReaderCommitBlocked(); + checkReleaseQBlockReader(); + } + flushWaitSem.wait(); + } +}; + + + ISmartRowBuffer * createSmartBuffer(CActivityBase *activity, const char * tempname, size32_t buffsize, IThorRowInterfaces *rowif) { Owned file = createIFile(tempname); @@ -617,6 +1147,11 @@ ISmartRowBuffer * createSmartInMemoryBuffer(CActivityBase *activity, IThorRowInt return new CSmartRowInMemoryBuffer(activity, rowIf, buffsize); } +ISmartRowBuffer * createCompressedSpillingRowStream(CActivityBase *activity, const char * tempBaseName, bool grouped, IThorRowInterfaces *rowif, const LookAheadOptions &options, ICompressHandler *compressHandler) +{ + return new CCompressedSpillingRowStream(activity, tempBaseName, grouped, rowif, options, compressHandler); +} + class COverflowableBuffer : public CSimpleInterface, implements IRowWriterMultiReader { CActivityBase &activity; @@ -1940,7 +2475,7 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf= options.spillWriteAheadSize) + if (serializedSz >= options.writeAheadSize) break; } } diff --git a/thorlcr/thorutil/thbuf.hpp b/thorlcr/thorutil/thbuf.hpp index dc64aeb888d..3ee9a496929 100644 --- a/thorlcr/thorutil/thbuf.hpp +++ b/thorlcr/thorutil/thbuf.hpp @@ -37,6 +37,19 @@ typedef QueueOf ThorRowQueue; +struct CommonBufferRowRWStreamOptions +{ + offset_t storageBlockSize = 256 * 1024; // block size of read/write streams + memsize_t totalCompressionBufferSize = 3000 * 1024; // compression buffer size of read streams (split between writer and outputs) + memsize_t inMemMaxMem = 2000 * 1024; // before spilling begins. + offset_t writeAheadSize = 2000 * 1024; // once spilling, maximum size to write ahead + unsigned heapFlags = roxiemem::RHFunique|roxiemem::RHFblocked; +}; + +struct LookAheadOptions : CommonBufferRowRWStreamOptions +{ + offset_t tempFileGranularity = 1000 * 0x100000; // 1GB +}; interface ISmartRowBuffer: extends IRowStream @@ -55,15 +68,13 @@ extern graph_decl ISmartRowBuffer * createSmartInMemoryBuffer(CActivityBase *act IThorRowInterfaces *rowIf, size32_t buffsize); -struct SharedRowStreamReaderOptions + +extern graph_decl ISmartRowBuffer * createCompressedSpillingRowStream(CActivityBase *activity, const char * tempBasename, bool grouped, IThorRowInterfaces *rowif, const LookAheadOptions &options, ICompressHandler *compressHandler); + +struct SharedRowStreamReaderOptions : public CommonBufferRowRWStreamOptions { - offset_t storageBlockSize = 256 * 1024; // block size of read/write streams - memsize_t totalCompressionBufferSize = 3000 * 1024; // compression buffer size of read streams (split between writer and outputs) - memsize_t inMemMaxMem = 2000 * 1024; // before spilling begins. memsize_t inMemReadAheadGranularity = 128 * 1024; // granularity (K) of read ahead rowcount_t inMemReadAheadGranularityRows = 64; // granularity (rows) of read ahead. NB: whichever granularity is hit first - offset_t spillWriteAheadSize = 2000 * 1024; // once spilling, maximum size to write ahead - unsigned heapFlags = roxiemem::RHFunique|roxiemem::RHFblocked; }; interface ISharedRowStreamReader : extends IInterface { diff --git a/thorlcr/thorutil/thormisc.hpp b/thorlcr/thorutil/thormisc.hpp index cb259a7053a..d760f3d06da 100644 --- a/thorlcr/thorutil/thormisc.hpp +++ b/thorlcr/thorutil/thormisc.hpp @@ -59,7 +59,7 @@ #define THOROPT_SPLITTER_READAHEADGRANULARITYK "inMemReadAheadGranularityK" // Splitter in memory read ahead granularity (K) (default = 128K) #define THOROPT_SPLITTER_READAHEADGRANULARITYROWS "inMemReadAheadGranularityRows" // Splitter in memory read ahead granularity (# rows) (default = 64) #define THOROPT_SPLITTER_WRITEAHEADK "splitterWriteAheadK" // Splitter spilling write ahead size (K) (default = 2MB) -#define THOROPT_SPLITTER_COMPRESSIONTOALK "splitterCompressionTotalK" // Splitter total compression buffer size (shared between writer and readers) (K) (default = 3MB) +#define THOROPT_SPLITTER_COMPRESSIONTOTALK "splitterCompressionTotalK" // Splitter total compression buffer size (shared between writer and readers) (K) (default = 3MB) #define THOROPT_LOOP_MAX_EMPTY "loopMaxEmpty" // Max # of iterations that LOOP can cycle through with 0 results before errors (default = 1000) #define THOROPT_SMALLSORT "smallSortThreshold" // Use minisort approach, if estimate size of data to sort is below this setting (default = 0) #define THOROPT_PARALLEL_FUNNEL "parallelFunnel" // Use parallel funnel impl. if !ordered (default = true) @@ -121,6 +121,11 @@ #define THOROPT_SORT_ALGORITHM "sortAlgorithm" // The algorithm used to sort records (quicksort/mergesort) #define THOROPT_COMPRESS_ALLFILES "compressAllOutputs" // Compress all output files (default: bare-metal=off, cloud=on) #define THOROPT_AVOID_RENAME "avoidRename" // Avoid rename, write directly to target physical filenames (no temp file) +#define THOROPT_LOOKAHEAD_MAXROWMEMK "readAheadRowMemK" // Splitter max memory (K) to use before spilling (default = 2MB) +#define THOROPT_LOOKAHEAD_WRITEAHEADK "readAheadWriteAheadK" // Splitter spilling write ahead size (K) (default = 2MB) +#define THOROPT_LOOKAHEAD_COMPRESSIONTOTALK "readAheadCompressionTotalK" // Splitter total compression buffer size (shared between writer and readers) (K) (default = 3MB) +#define THOROPT_LOOKAHEAD_TEMPFILE_GRANULARITY "readAheadTempFileGranularity" // Splitter temp file granularity (default = 1GB) + #define INITIAL_SELFJOIN_MATCH_WARNING_LEVEL 20000 // max of row matches before selfjoin emits warning