diff --git a/system/jlib/jqueue.hpp b/system/jlib/jqueue.hpp index e9447833c65..97ff9d91832 100644 --- a/system/jlib/jqueue.hpp +++ b/system/jlib/jqueue.hpp @@ -546,5 +546,59 @@ class DListOf } }; +// Lockfree Single Producer Single Conumser bounded queue implementation +// No mutexes are required to interact with the queue, as long as there's a single consumer thread, and a single writer thread. +template +class CSPSCQueue +{ + size32_t maxCapacity = 0; + std::vector elements; + std::atomic head = 0; + std::atomic tail = 0; + + inline size32_t increment(size32_t idx) const + { + size32_t next = idx+1; + if (next == maxCapacity) + next = 0; + return next; + } +public: + CSPSCQueue() + { + // should set capacity before using + } + CSPSCQueue(size32_t _maxCapacity) + : maxCapacity(_maxCapacity + 1), // +1 to distinguish full vs empty + elements(maxCapacity) + { + } + void setCapacity(size32_t _maxCapacity) + { + maxCapacity = _maxCapacity + 1; + elements.resize(maxCapacity); + } + bool enqueue(const T e) + { + size32_t currentHead = head; + size32_t nextHead = increment(currentHead); + if (nextHead == tail) + return false; // full + + elements[currentHead] = std::move(e); + head = nextHead; + return true; + } + bool dequeue(T &res) + { + size32_t currentTail = tail; + if (currentTail == head) + return false; // empty + + res = std::move(elements[currentTail]); + tail = increment(currentTail); + return true; + } +}; #endif diff --git a/thorlcr/activities/nsplitter/thnsplitterslave.cpp b/thorlcr/activities/nsplitter/thnsplitterslave.cpp index 191d005fa9a..de22da08908 100644 --- a/thorlcr/activities/nsplitter/thnsplitterslave.cpp +++ b/thorlcr/activities/nsplitter/thnsplitterslave.cpp @@ -251,9 +251,9 @@ class NSplitterSlaveActivity : public CSlaveActivity, implements ISharedSmartBuf if ((size32_t)-1 != blockedSequentialIOSize) options.storageBlockSize = blockedSequentialIOSize; } - options.totalCompressionBufferSize = getOptInt(THOROPT_SPLITTER_COMPRESSIONTOALK, options.totalCompressionBufferSize / 1024) * 1024; + options.totalCompressionBufferSize = getOptInt(THOROPT_SPLITTER_COMPRESSIONTOTALK, options.totalCompressionBufferSize / 1024) * 1024; options.inMemMaxMem = getOptInt(THOROPT_SPLITTER_MAXROWMEMK, options.inMemMaxMem / 1024) * 1024; - options.spillWriteAheadSize = getOptInt64(THOROPT_SPLITTER_WRITEAHEADK, options.spillWriteAheadSize / 1024) * 1024; + options.writeAheadSize = getOptInt64(THOROPT_SPLITTER_WRITEAHEADK, options.writeAheadSize / 1024) * 1024; options.inMemReadAheadGranularity = getOptInt(THOROPT_SPLITTER_READAHEADGRANULARITYK, options.inMemReadAheadGranularity / 1024) * 1024; options.inMemReadAheadGranularityRows = getOptInt(THOROPT_SPLITTER_READAHEADGRANULARITYROWS, options.inMemReadAheadGranularity); options.heapFlags = getOptInt("spillheapflags", options.heapFlags); diff --git a/thorlcr/activities/thactivityutil.cpp b/thorlcr/activities/thactivityutil.cpp index f5701672c18..fe9a960bb74 100644 --- a/thorlcr/activities/thactivityutil.cpp +++ b/thorlcr/activities/thactivityutil.cpp @@ -66,6 +66,8 @@ class CRowStreamLookAhead : public CSimpleInterfaceOf rowcount_t required; Semaphore startSem; Owned getexception; + LookAheadOptions options; + bool newLookAhead = false; class CThread: public Thread { @@ -94,12 +96,19 @@ class CRowStreamLookAhead : public CSimpleInterfaceOf { try { - StringBuffer temp; - if (allowspill) - GetTempFilePath(temp,"lookahd"); assertex(bufsize); if (allowspill) - smartbuf.setown(createSmartBuffer(&activity, temp.str(), bufsize, rowIf)); + { + StringBuffer temp; + GetTempFilePath(temp,"lookahd"); + if (newLookAhead) + { + ICompressHandler *compressHandler = options.totalCompressionBufferSize ? queryDefaultCompressHandler() : nullptr; + smartbuf.setown(createCompressedSpillingRowStream(&activity, temp.str(), preserveGrouping, rowIf, options, compressHandler)); + } + else + smartbuf.setown(createSmartBuffer(&activity, temp.str(), bufsize, rowIf)); + } else smartbuf.setown(createSmartInMemoryBuffer(&activity, rowIf, bufsize)); startSem.signal(); @@ -207,6 +216,29 @@ class CRowStreamLookAhead : public CSimpleInterfaceOf running = true; required = _required; count = 0; + + newLookAhead = activity.getOptBool("newlookahead", false); + if (activity.getOptBool("forcenewlookahead")) + { + newLookAhead = true; + allowspill = true; + } + + // for "newlookahead" only + if (isContainerized()) + { + // JCSMORE - add CJobBase::getTempBlockSize() to calc. once. + StringBuffer planeName; + if (!getDefaultPlane(planeName, "@tempPlane", "temp")) + getDefaultPlane(planeName, "@spillPlane", "spill"); + size32_t blockedSequentialIOSize = getPlaneAttributeValue(planeName, BlockedSequentialIO, (size32_t)-1); + if ((size32_t)-1 != blockedSequentialIOSize) + options.storageBlockSize = blockedSequentialIOSize; + } + options.totalCompressionBufferSize = activity.getOptInt(THOROPT_LOOKAHEAD_COMPRESSIONTOTALK, options.totalCompressionBufferSize / 1024) * 1024; + options.inMemMaxMem = activity.getOptInt(THOROPT_LOOKAHEAD_MAXROWMEMK, options.inMemMaxMem / 1024) * 1024; + options.writeAheadSize = activity.getOptInt64(THOROPT_LOOKAHEAD_WRITEAHEADK, options.writeAheadSize / 1024) * 1024; + options.tempFileGranularity = activity.getOptInt64(THOROPT_LOOKAHEAD_TEMPFILE_GRANULARITY, options.tempFileGranularity / 0x100000) * 0x100000; } ~CRowStreamLookAhead() { diff --git a/thorlcr/thorutil/thbuf.cpp b/thorlcr/thorutil/thbuf.cpp index 951b7db82d2..b1377f6db00 100644 --- a/thorlcr/thorutil/thbuf.cpp +++ b/thorlcr/thorutil/thbuf.cpp @@ -16,11 +16,14 @@ ############################################################################## */ #include +#include #include +#include #include "platform.h" #include #include #include "jlib.hpp" +#include "jqueue.hpp" #include "jmisc.hpp" #include "jio.hpp" #include "jlzw.hpp" @@ -606,6 +609,576 @@ class CSmartRowInMemoryBuffer: public CSimpleInterface, implements ISmartRowBuff } }; + +static std::tuple createSerialInputStream(IFile *iFile, ICompressHandler *compressHandler, const CommonBufferRowRWStreamOptions &options, unsigned numSharingCompressionBuffer) +{ + Owned iFileIO = iFile->open(IFOread); + Owned in = createSerialInputStream(iFileIO); + Owned inputStream = createBufferedInputStream(in, options.storageBlockSize, 0); + if (compressHandler) + { + const char *decompressOptions = nullptr; // at least for now! + Owned decompressor = compressHandler->getExpander(decompressOptions); + Owned decompressed = createDecompressingInputStream(inputStream, decompressor); + + size32_t compressionBlockSize = (size32_t)(options.totalCompressionBufferSize / numSharingCompressionBuffer); + if (compressionBlockSize < options.minCompressionBlockSize) + { + WARNLOG("Shared totalCompressionBufferSize=%" I64F "u, too small for number of numSharingCompressionBuffer(%u). Using minCompressionBlockSize(%u).", (unsigned __int64)options.totalCompressionBufferSize, numSharingCompressionBuffer, options.minCompressionBlockSize); + compressionBlockSize = options.minCompressionBlockSize; + } + inputStream.setown(createBufferedInputStream(decompressed, compressionBlockSize, 0)); + } + return { inputStream.getClear(), iFileIO.getClear() }; +} + +static std::tuple createSerialOutputStream(IFile *iFile, ICompressHandler *compressHandler, const CommonBufferRowRWStreamOptions &options, unsigned numSharingCompressionBuffer) +{ + Owned iFileIO = iFile->open(IFOcreate); // kept for stats purposes + Owned out = createSerialOutputStream(iFileIO); + Owned outputStream = createBufferedOutputStream(out, options.storageBlockSize); //prefered plane block size + if (compressHandler) + { + const char *compressOptions = nullptr; // at least for now! + Owned compressor = compressHandler->getCompressor(compressOptions); + Owned compressed = createCompressingOutputStream(outputStream, compressor); + size32_t compressionBlockSize = (size32_t)(options.totalCompressionBufferSize / numSharingCompressionBuffer); + if (compressionBlockSize < options.minCompressionBlockSize) + { + WARNLOG("Shared totalCompressionBufferSize=%" I64F "u, too small for number of numSharingCompressionBuffer(%u). Using minCompressionBlockSize(%u).", (unsigned __int64)options.totalCompressionBufferSize, numSharingCompressionBuffer, options.minCompressionBlockSize); + compressionBlockSize = options.minCompressionBlockSize; + } + + outputStream.setown(createBufferedOutputStream(compressed, compressionBlockSize)); + } + return { outputStream.getClear(), iFileIO.getClear() }; +} + +// #define TRACE_SPILLING_ROWSTREAM // traces each row read/written, and other events + +// based on query that produces records with a single sequential (from 1) unsigned4 +// #define VERIFY_ROW_IDS_SPILLING_ROWSTREAM + +// for 'stressLookAhead' code. When enabled, reduces buffer sizes etc. to stress test the lookahead spilling +// #define STRESSTEST_SPILLING_ROWSTREAM + + + +/* CCompressedSpillingRowStream implementation details: + - Writer: + - The writer to an in-memory queue, and when the queue is full, or a certain number of rows have been queued, it writes to starts writing to temp files. + - The writer will always write to the queue if it can, even after it has started spilling. + - The writer commits to disk at LookAheadOptions::writeAheadSize granularity + - The writer creates a new temp file when the current one reaches LookAheadOptions::tempFileGranularity + - The writer pushes the current nextOutputRow to a queue when it creates the next output file (used by the reader to know when to move to next) + - NB: writer implements ISmartRowBuffer::flush() which has slightly weird semantics (blocks until everything is read or stopped) +- Reader: + - The reader will read from the queue until it is exhausted, and block to be signalled for more. + - If the reader dequeues a row that is ahead of the expected 'nextInputRow', it will stash it, and read from disk until it catches up to that row. + - If the reader is reading from disk and it catches up with 'committedRows' it will block until the writer has committed more rows. + - When reading from a temp file, it will take ownership the CFileOwner and dispose of the underlying file when it has consumed it. + - The reader will read from the stream until it hits 'currentTempFileEndRow' (initially 0), at which point it will open the next temp file. + */ + +// NB: Supports being read by 1 thread and written to by another only +class CCompressedSpillingRowStream: public CSimpleInterfaceOf, implements IRowWriter +{ + typedef std::tuple RowEntry; + + CActivityBase &activity; // ctor input parameter + StringAttr baseTmpFilename; // ctor input parameter + LookAheadOptions options; // ctor input parameter + Linked compressHandler; // ctor input parameter + + // derived from input paramter (IThorRowInterfaces *rowIf) + Linked meta; + Linked serializer; + Linked allocator; + Linked deserializer; + memsize_t compressionBlockSize = 0; // filled in createOutputStream + + // in-memory related members + CSPSCQueue inMemRows; + std::atomic inMemRowsMemoryUsage = 0; // NB updated from writer and reader threads + Semaphore moreRows; + std::atomic readerWaitingForQ = false; // set by reader, cleared by writer + + // temp write related members + Owned outputStream; + std::unique_ptr outputStreamSerializer; + memsize_t pendingFlushToDiskSz = 0; + offset_t currentTempFileSize = 0; + CFileOwner *currentOwnedOutputFile = nullptr; + Owned currentOutputIFileIO; // keep for stats + CriticalSection outputFilesQCS; + std::queue outputFiles; + unsigned writeTempFileNum = 0; + std::atomic nextOutputRow = 0; // read by reader, updated by writer + std::atomic committedRows = 0; // read by reader, updated by writer + std::atomic spilt = false; // set by createOutputStream, checked by reader + std::queue outputFileEndRowMarkers; + bool lastWriteWasEog = false; + bool outputComplete = false; // only accessed and modified by writer or reader within readerWriterCS + bool recentlyQueued = false; + CriticalSection outputStreamCS; + + // temp read related members + std::atomic currentTempFileEndRow = 0; + Owned currentInputIFileIO; // keep for stats + Linked currentOwnedInputFile; + Owned inputStream; + CThorStreamDeserializerSource inputDeserializerSource; + rowcount_t nextInputRow = 0; + bool readerWaitingForCommit = false; + static constexpr unsigned readerWakeupGranularity = 32; // how often to wake up the reader if it is waiting for more rows + enum ReadState { rs_fromqueue, rs_frommarker, rs_endstream, rs_stopped } readState = rs_fromqueue; + RowEntry readFromStreamMarker = { nullptr, 0, 0 }; + + // misc + bool grouped = false; // ctor input parameter + CriticalSection readerWriterCS; +#ifdef STRESSTEST_SPILLING_ROWSTREAM + bool stressTest = false; +#endif + + // annoying flush semantics + bool flushWaiting = false; + Semaphore flushWaitSem; + + + void trace(const char *format, ...) + { +#ifdef TRACE_SPILLING_ROWSTREAM + va_list args; + va_start(args, format); + VALOG(MCdebugInfo, format, args); + va_end(args); +#endif + } + void createNextOutputStream() + { + VStringBuffer tmpFilename("%s.%u", baseTmpFilename.get(), writeTempFileNum++); + trace("WRITE: writing to %s", tmpFilename.str()); + Owned iFile = createIFile(tmpFilename); + currentOwnedOutputFile = new CFileOwner(iFile, activity.queryTempFileSizeTracker()); // used by checkFlushToDisk to noteSize + { + CriticalBlock b(outputFilesQCS); + outputFiles.push(currentOwnedOutputFile); // NB: takes ownership + } + + auto res = createSerialOutputStream(iFile, compressHandler, options, 2); // (2) input & output sharing totalCompressionBufferSize + outputStream.setown(std::get<0>(res)); + currentOutputIFileIO.setown(std::get<1>(res)); + outputStreamSerializer = std::make_unique(outputStream); + } + void createNextInputStream() + { + CFileOwner *dequeuedOwnedIFile = nullptr; + { + CriticalBlock b(outputFilesQCS); + dequeuedOwnedIFile = outputFiles.front(); + outputFiles.pop(); + } + currentOwnedInputFile.setown(dequeuedOwnedIFile); + IFile *iFile = ¤tOwnedInputFile->queryIFile(); + trace("READ: reading from %s", iFile->queryFilename()); + + auto res = createSerialInputStream(iFile, compressHandler, options, 2); // (2) input & output sharing totalCompressionBufferSize + inputStream.setown(std::get<0>(res)); + currentInputIFileIO.setown(std::get<1>(res)); + inputDeserializerSource.setStream(inputStream); + } + const void *readRowFromStream() + { + // readRowFromStream() called from readToMarker (which will block before calling this if behind committedRows), + // or when outputComplete. + // Either way, it will not enter this method until the writer has committed ahead of the reader nextInputRow + + // NB: currentTempFileEndRow will be 0 if 1st input read + // nextInputRow can be > currentTempFileEndRow, because the writer/read may have used the Q + // beyond this point, the next row in the stream could be anywhere above. + if (nextInputRow >= currentTempFileEndRow) + { + createNextInputStream(); + CriticalBlock b(outputStreamCS); + if (nextInputRow >= currentTempFileEndRow) + { + if (!outputFileEndRowMarkers.empty()) + { + currentTempFileEndRow = outputFileEndRowMarkers.front(); + outputFileEndRowMarkers.pop(); + assertex(currentTempFileEndRow > nextInputRow); + } + else + { + currentTempFileEndRow = (rowcount_t)-1; // unbounded for now, writer will set when it knows + trace("READ: setting currentTempFileEndRow: unbounded"); + } + } + } + if (grouped) + { + bool eog; + inputStream->read(sizeof(bool), &eog); + if (eog) + return nullptr; + } + RtlDynamicRowBuilder rowBuilder(allocator); + size32_t sz = deserializer->deserialize(rowBuilder, inputDeserializerSource); + const void *row = rowBuilder.finalizeRowClear(sz); + checkCurrentRow("S: ", row, nextInputRow); + return row; + } + void writeRowToStream(const void *row, size32_t rowSz) + { + if (!spilt) + { + spilt = true; + ActPrintLog(&activity, "Spilling to temp storage [file = %s]", baseTmpFilename.get()); + createNextOutputStream(); + } + if (grouped) + { + bool eog = (nullptr == row); + outputStream->put(sizeof(bool), &eog); + pendingFlushToDiskSz++; + if (nullptr == row) + return; + } + serializer->serialize(*outputStreamSerializer.get(), (const byte *)row); + pendingFlushToDiskSz += rowSz; + } + void checkReleaseQBlockReader() + { + if (readerWaitingForQ) + { + readerWaitingForQ = false; + moreRows.signal(); + } + } + void checkReleaseReaderCommitBlocked() + { + if (readerWaitingForCommit) + { + readerWaitingForCommit = false; + moreRows.signal(); + } + } + void handleInputComplete() + { + readState = rs_stopped; + if (flushWaiting) + { + flushWaiting = false; + flushWaitSem.signal(); + } + } + bool checkFlushToDisk(size32_t threshold) + { + if (pendingFlushToDiskSz <= threshold) + return false; + rowcount_t currentNextOutputRow = nextOutputRow.load(); + trace("WRITE: Flushed to disk. nextOutputRow = %" RCPF "u", currentNextOutputRow); + outputStream->flush(); + currentTempFileSize += pendingFlushToDiskSz; + currentOwnedOutputFile->noteSize(currentTempFileSize); + pendingFlushToDiskSz = 0; + if (currentTempFileSize > options.tempFileGranularity) + { + currentTempFileSize = 0; + { + CriticalBlock b(outputStreamCS); + // set if reader isn't bounded yet, or queue next boundary + if ((rowcount_t)-1 == currentTempFileEndRow) + { + currentTempFileEndRow = currentNextOutputRow; + trace("WRITE: setting currentTempFileEndRow: %" RCPF "u", currentTempFileEndRow.load()); + } + else + { + outputFileEndRowMarkers.push(currentNextOutputRow); + trace("WRITE: adding to tempFileEndRowMarker(size=%u): %" RCPF "u", (unsigned)outputFileEndRowMarkers.size(), currentNextOutputRow); + } + } + createNextOutputStream(); + } + committedRows = currentNextOutputRow; + return true; + } + void addRow(const void *row) + { + bool queued = false; + size32_t rowSz = row ? thorRowMemoryFootprint(serializer, row) : 0; + if (rowSz + inMemRowsMemoryUsage <= options.inMemMaxMem) + queued = inMemRows.enqueue({ row, nextOutputRow, rowSz }); // takes ownership of 'row' if successful + if (queued) + { + trace("WRITE: Q: nextOutputRow: %" RCPF "u", nextOutputRow.load()); + inMemRowsMemoryUsage += rowSz; + ++nextOutputRow; + recentlyQueued = true; + } + else + { + trace("WRITE: S: nextOutputRow: %" RCPF "u", nextOutputRow.load()); + writeRowToStream(row, rowSz); // JCSMORE - rowSz is memory not disk size... does it matter that much? + ::ReleaseThorRow(row); + ++nextOutputRow; + if (checkFlushToDisk(options.writeAheadSize)) + { + CriticalBlock b(readerWriterCS); + checkReleaseReaderCommitBlocked(); + } + } + + // do not wake up reader every time a row is queued (but granularly) to avoid excessive flapping + if (recentlyQueued && (0 == (nextOutputRow % readerWakeupGranularity))) + { + recentlyQueued = false; + CriticalBlock b(readerWriterCS); + checkReleaseQBlockReader(); + } + } + const void *getQRow(RowEntry &e) + { + rowcount_t writeRow = std::get<1>(e); + inMemRowsMemoryUsage -= std::get<2>(e); + if (writeRow == nextInputRow) + { +#ifdef STRESSTEST_SPILLING_ROWSTREAM + if (stressTest && (0 == (nextInputRow % 100))) + MilliSleep(5); +#endif + + const void *row = std::get<0>(e); + checkCurrentRow("Q: ", row, nextInputRow); + ++nextInputRow; + return row; + } + else + { + // queued row is ahead of reader position, save marker and read from stream until marker + dbgassertex(writeRow > nextInputRow); + readFromStreamMarker = e; + readState = rs_frommarker; + return readToMarker(); + } + + } + inline void checkCurrentRow(const char *msg, const void *row, rowcount_t expectedId) + { +#ifdef VERIFY_ROW_IDS_SPILLING_ROWSTREAM + unsigned id; + memcpy(&id, row, sizeof(unsigned)); + assertex(id-1 == expectedId); + trace("READ: %s nextInputRow: %" RCPF "u", msg, expectedId); +#endif + } + const void *readToMarker() + { + rowcount_t markerRow = std::get<1>(readFromStreamMarker); + if (markerRow == nextInputRow) + { + const void *ret = std::get<0>(readFromStreamMarker); + checkCurrentRow("M: ", ret, nextInputRow); + readFromStreamMarker = { nullptr, 0, 0 }; + readState = rs_fromqueue; + ++nextInputRow; + return ret; + } + else if (nextInputRow >= committedRows) // row we need have not yet been committed to disk. + { + CLeavableCriticalBlock b(readerWriterCS); + if (nextInputRow >= committedRows) + { + // wait for writer to commit + readerWaitingForCommit = true; + b.leave(); + trace("READ: waiting for committedRows(currently = %" RCPF "u) to catch up to nextInputRow = %" RCPF "u", committedRows.load(), nextInputRow); + moreRows.wait(); + assertex(nextInputRow < committedRows); + } + } + const void *row = readRowFromStream(); + ++nextInputRow; + return row; + } +public: + IMPLEMENT_IINTERFACE_O_USING(CSimpleInterfaceOf); + + explicit CCompressedSpillingRowStream(CActivityBase *_activity, const char *_baseTmpFilename, bool _grouped, IThorRowInterfaces *rowIf, const LookAheadOptions &_options, ICompressHandler *_compressHandler) + : activity(*_activity), baseTmpFilename(_baseTmpFilename), grouped(_grouped), options(_options), compressHandler(_compressHandler), + meta(rowIf->queryRowMetaData()), serializer(rowIf->queryRowSerializer()), allocator(rowIf->queryRowAllocator()), deserializer(rowIf->queryRowDeserializer()) + { + size32_t minSize = meta->getMinRecordSize(); + +#ifdef STRESSTEST_SPILLING_ROWSTREAM + stressTest = activity.getOptBool("stressLookAhead"); + if (stressTest) + { + options.inMemMaxMem = minSize * 4; + options.writeAheadSize = options.inMemMaxMem * 2; + options.tempFileGranularity = options.inMemMaxMem * 4; + if (options.tempFileGranularity < 0x10000) // stop silly sizes (NB: this would only be set so small for testing!) + options.tempFileGranularity = 0x10000; + } +#endif + + if (minSize < 16) + minSize = 16; // not too important, just using to cap inMemRows queue size + inMemRows.setCapacity(options.inMemMaxMem / minSize); + + assertex(options.writeAheadSize < options.tempFileGranularity); + } + ~CCompressedSpillingRowStream() + { + while (!outputFiles.empty()) + { + ::Release(outputFiles.front()); + outputFiles.pop(); + } + RowEntry e; + while (true) + { + if (!inMemRows.dequeue(e)) + break; + const void *row = std::get<0>(e); + if (row) + ReleaseThorRow(row); + } + const void *markerRow = std::get<0>(readFromStreamMarker); + if (markerRow) + ReleaseThorRow(markerRow); + } + +// ISmartRowBuffer + virtual IRowWriter *queryWriter() override + { + return this; + } +// IRowStream + virtual const void *nextRow() override + { + switch (readState) + { + case rs_fromqueue: + { + while (true) + { + RowEntry e; + if (inMemRows.dequeue(e)) + return getQRow(e); + else + { + { + CLeavableCriticalBlock b(readerWriterCS); + // Recheck Q now have CS, if reader here and writer ready to signal more, then it may have just released CS + if (inMemRows.dequeue(e)) + { + b.leave(); + return getQRow(e); + } + else if (outputComplete)// && (nextInputRow == nextOutputRow)) + { + if (nextInputRow == nextOutputRow) + { + handleInputComplete(); // sets readState to rs_stopped + return nullptr; + } + else + { + // writer has finished, nothing is on the queue or will be queued, rest is on disk + readState = rs_endstream; + const void *row = readRowFromStream(); + ++nextInputRow; + return row; + } + } + readerWaitingForQ = true; + } + trace("READ: waiting for Q'd rows @ %" RCPF "u (nextOutputRow = %" RCPF "u)", nextInputRow, nextOutputRow.load()); + moreRows.wait(); + } + } + return nullptr; + } + case rs_frommarker: + { + return readToMarker(); + } + case rs_endstream: + { + if (nextInputRow == nextOutputRow) + { + readState = rs_stopped; + return nullptr; + } + const void *row = readRowFromStream(); + ++nextInputRow; + return row; + } + case rs_stopped: + return nullptr; + } + throwUnexpected(); + } + virtual void stop() override + { + CriticalBlock b(readerWriterCS); + handleInputComplete(); + } +// IRowWriter + virtual void putRow(const void *row) override + { + if (outputComplete) + { + // should never get here, but guard against. + OwnedConstThorRow tmpRow(row); + assertex(!row); + return; + } + + if (row) + { + lastWriteWasEog = false; + addRow(row); + } + else // eog + { + if (lastWriteWasEog) // error, should not have two EOGs in a row + return; + else if (grouped) + { + lastWriteWasEog = true; + addRow(nullptr); + } + else // non-grouped nulls unexpected + throwUnexpected(); + } + } + virtual void flush() override + { + // semantics of ISmartRowBuffer::flush: + // - tell smartbuf that there will be no more rows written (BUT should only be called after finished writing) + // - wait for all rows to be read from smartbuf, or smartbuf stopped before returning. + + bool flushedToDisk = checkFlushToDisk(0); + { + CriticalBlock b(readerWriterCS); + outputComplete = true; + if (rs_stopped == readState) + return; + flushWaiting = true; + if (flushedToDisk) + checkReleaseReaderCommitBlocked(); + checkReleaseQBlockReader(); + } + flushWaitSem.wait(); + } +}; + + + ISmartRowBuffer * createSmartBuffer(CActivityBase *activity, const char * tempname, size32_t buffsize, IThorRowInterfaces *rowif) { Owned file = createIFile(tempname); @@ -617,6 +1190,11 @@ ISmartRowBuffer * createSmartInMemoryBuffer(CActivityBase *activity, IThorRowInt return new CSmartRowInMemoryBuffer(activity, rowIf, buffsize); } +ISmartRowBuffer * createCompressedSpillingRowStream(CActivityBase *activity, const char * tempBaseName, bool grouped, IThorRowInterfaces *rowif, const LookAheadOptions &options, ICompressHandler *compressHandler) +{ + return new CCompressedSpillingRowStream(activity, tempBaseName, grouped, rowif, options, compressHandler); +} + class COverflowableBuffer : public CSimpleInterface, implements IRowWriterMultiReader { CActivityBase &activity; @@ -1844,6 +2422,7 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf input; + unsigned numOutputs = 0; Linked meta; Linked serializer; Linked deserializer; @@ -1863,7 +2442,6 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOfopen(IFOcreate)); // kept for stats purposes - Owned out = createSerialOutputStream(iFileIO); - outputStream.setown(createBufferedOutputStream(out, options.storageBlockSize)); //prefered plane block size - if (compressHandler) - { - const char *compressOptions = nullptr; - Owned compressor = compressHandler->getCompressor(compressOptions); - Owned compressed = createCompressingOutputStream(outputStream, compressor); - outputStream.setown(createBufferedOutputStream(compressed, compressionBlockSize)); - } + auto res = createSerialOutputStream(iFile, compressHandler, options, numOutputs + 1); + outputStream.setown(std::get<0>(res)); + iFileIO.setown(std::get<1>(res)); totalInputRowsRead = inMemTotalRows; } void writeRowsFromInput() @@ -1940,7 +2511,7 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf= options.spillWriteAheadSize) + if (serializedSz >= options.writeAheadSize) break; } } @@ -1957,8 +2528,8 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf(row)); } public: - explicit CSharedFullSpillingWriteAhead(CActivityBase *_activity, unsigned numOutputs, IRowStream *_input, bool _inputGrouped, const SharedRowStreamReaderOptions &_options, IThorRowInterfaces *rowIf, const char *tempFileName, ICompressHandler *_compressHandler) - : activity(*_activity), input(_input), inputGrouped(_inputGrouped), options(_options), compressHandler(_compressHandler), + explicit CSharedFullSpillingWriteAhead(CActivityBase *_activity, unsigned _numOutputs, IRowStream *_input, bool _inputGrouped, const SharedRowStreamReaderOptions &_options, IThorRowInterfaces *rowIf, const char *tempFileName, ICompressHandler *_compressHandler) + : activity(*_activity), numOutputs(_numOutputs), input(_input), inputGrouped(_inputGrouped), options(_options), compressHandler(_compressHandler), meta(rowIf->queryRowMetaData()), serializer(rowIf->queryRowSerializer()), allocator(rowIf->queryRowAllocator()), deserializer(rowIf->queryRowDeserializer()) { assertex(input); @@ -1968,17 +2539,6 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf options.inMemMaxMem) inMemReadAheadGranularity = options.inMemMaxMem; - constexpr size32_t minCompressionBlockSize = 256 * 1024; - memsize_t totalCompressionBufferSize = options.totalCompressionBufferSize; - if (totalCompressionBufferSize) - { - compressionBlockSize = (size32_t)(totalCompressionBufferSize / (numOutputs + 1)); // +1 for writer - if (compressionBlockSize < minCompressionBlockSize) - { - WARNLOG("Shared totalCompressionBufferSize=%" I64F "u, too small for number of outputs(%u). Using minCompressionBlockSize(%u) for writer and each reader.", (unsigned __int64)totalCompressionBufferSize, numOutputs, minCompressionBlockSize); - compressionBlockSize = minCompressionBlockSize; - } - } for (unsigned o=0; o getReadStream() // also pass back IFileIO for stats purposes { - Owned iFileIO = iFile->open(IFOread); - Owned in = createSerialInputStream(iFileIO); - Owned inputStream = createBufferedInputStream(in, options.storageBlockSize, 0); - if (compressHandler) - { - const char *decompressOptions = nullptr; - Owned decompressor = compressHandler->getExpander(decompressOptions); - Owned decompressed = createDecompressingInputStream(inputStream, decompressor); - inputStream.setown(createBufferedInputStream(decompressed, compressionBlockSize, 0)); - } - return { inputStream.getClear(), iFileIO.getClear() }; + return createSerialInputStream(iFile, compressHandler, options, numOutputs + 1); // +1 for writer } bool checkWriteAhead(rowcount_t &outputRowsAvailable) { diff --git a/thorlcr/thorutil/thbuf.hpp b/thorlcr/thorutil/thbuf.hpp index dc64aeb888d..1750f63b007 100644 --- a/thorlcr/thorutil/thbuf.hpp +++ b/thorlcr/thorutil/thbuf.hpp @@ -37,6 +37,25 @@ typedef QueueOf ThorRowQueue; +struct CommonBufferRowRWStreamOptions +{ + offset_t storageBlockSize = 256 * 1024; // block size of read/write streams + size32_t minCompressionBlockSize = 256 * 1024; // minimum block size for compression + memsize_t totalCompressionBufferSize = 3000 * 1024; // compression buffer size of read streams (split between writer and outputs) + memsize_t inMemMaxMem = 2000 * 1024; // before spilling begins. + offset_t writeAheadSize = 2000 * 1024; // once spilling, maximum size to write ahead + unsigned heapFlags = roxiemem::RHFunique|roxiemem::RHFblocked; +}; + +struct LookAheadOptions : CommonBufferRowRWStreamOptions +{ + LookAheadOptions() + { + // override defaults + totalCompressionBufferSize = 2000 * 1024; // compression buffer size of read streams (split between writer and outputs) + } + offset_t tempFileGranularity = 1000 * 0x100000; // 1GB +}; interface ISmartRowBuffer: extends IRowStream @@ -55,15 +74,13 @@ extern graph_decl ISmartRowBuffer * createSmartInMemoryBuffer(CActivityBase *act IThorRowInterfaces *rowIf, size32_t buffsize); -struct SharedRowStreamReaderOptions + +extern graph_decl ISmartRowBuffer * createCompressedSpillingRowStream(CActivityBase *activity, const char * tempBasename, bool grouped, IThorRowInterfaces *rowif, const LookAheadOptions &options, ICompressHandler *compressHandler); + +struct SharedRowStreamReaderOptions : public CommonBufferRowRWStreamOptions { - offset_t storageBlockSize = 256 * 1024; // block size of read/write streams - memsize_t totalCompressionBufferSize = 3000 * 1024; // compression buffer size of read streams (split between writer and outputs) - memsize_t inMemMaxMem = 2000 * 1024; // before spilling begins. memsize_t inMemReadAheadGranularity = 128 * 1024; // granularity (K) of read ahead rowcount_t inMemReadAheadGranularityRows = 64; // granularity (rows) of read ahead. NB: whichever granularity is hit first - offset_t spillWriteAheadSize = 2000 * 1024; // once spilling, maximum size to write ahead - unsigned heapFlags = roxiemem::RHFunique|roxiemem::RHFblocked; }; interface ISharedRowStreamReader : extends IInterface { diff --git a/thorlcr/thorutil/thormisc.hpp b/thorlcr/thorutil/thormisc.hpp index cb259a7053a..d760f3d06da 100644 --- a/thorlcr/thorutil/thormisc.hpp +++ b/thorlcr/thorutil/thormisc.hpp @@ -59,7 +59,7 @@ #define THOROPT_SPLITTER_READAHEADGRANULARITYK "inMemReadAheadGranularityK" // Splitter in memory read ahead granularity (K) (default = 128K) #define THOROPT_SPLITTER_READAHEADGRANULARITYROWS "inMemReadAheadGranularityRows" // Splitter in memory read ahead granularity (# rows) (default = 64) #define THOROPT_SPLITTER_WRITEAHEADK "splitterWriteAheadK" // Splitter spilling write ahead size (K) (default = 2MB) -#define THOROPT_SPLITTER_COMPRESSIONTOALK "splitterCompressionTotalK" // Splitter total compression buffer size (shared between writer and readers) (K) (default = 3MB) +#define THOROPT_SPLITTER_COMPRESSIONTOTALK "splitterCompressionTotalK" // Splitter total compression buffer size (shared between writer and readers) (K) (default = 3MB) #define THOROPT_LOOP_MAX_EMPTY "loopMaxEmpty" // Max # of iterations that LOOP can cycle through with 0 results before errors (default = 1000) #define THOROPT_SMALLSORT "smallSortThreshold" // Use minisort approach, if estimate size of data to sort is below this setting (default = 0) #define THOROPT_PARALLEL_FUNNEL "parallelFunnel" // Use parallel funnel impl. if !ordered (default = true) @@ -121,6 +121,11 @@ #define THOROPT_SORT_ALGORITHM "sortAlgorithm" // The algorithm used to sort records (quicksort/mergesort) #define THOROPT_COMPRESS_ALLFILES "compressAllOutputs" // Compress all output files (default: bare-metal=off, cloud=on) #define THOROPT_AVOID_RENAME "avoidRename" // Avoid rename, write directly to target physical filenames (no temp file) +#define THOROPT_LOOKAHEAD_MAXROWMEMK "readAheadRowMemK" // Splitter max memory (K) to use before spilling (default = 2MB) +#define THOROPT_LOOKAHEAD_WRITEAHEADK "readAheadWriteAheadK" // Splitter spilling write ahead size (K) (default = 2MB) +#define THOROPT_LOOKAHEAD_COMPRESSIONTOTALK "readAheadCompressionTotalK" // Splitter total compression buffer size (shared between writer and readers) (K) (default = 3MB) +#define THOROPT_LOOKAHEAD_TEMPFILE_GRANULARITY "readAheadTempFileGranularity" // Splitter temp file granularity (default = 1GB) + #define INITIAL_SELFJOIN_MATCH_WARNING_LEVEL 20000 // max of row matches before selfjoin emits warning