diff --git a/system/jlib/jqueue.hpp b/system/jlib/jqueue.hpp index d9d33d1aed4..97ff9d91832 100644 --- a/system/jlib/jqueue.hpp +++ b/system/jlib/jqueue.hpp @@ -546,6 +546,8 @@ class DListOf } }; +// Lockfree Single Producer Single Conumser bounded queue implementation +// No mutexes are required to interact with the queue, as long as there's a single consumer thread, and a single writer thread. template class CSPSCQueue { @@ -556,7 +558,10 @@ class CSPSCQueue inline size32_t increment(size32_t idx) const { - return (idx + 1) % maxCapacity; + size32_t next = idx+1; + if (next == maxCapacity) + next = 0; + return next; } public: CSPSCQueue() diff --git a/thorlcr/thorutil/thbuf.cpp b/thorlcr/thorutil/thbuf.cpp index 88a8904c694..f52f4a2225d 100644 --- a/thorlcr/thorutil/thbuf.cpp +++ b/thorlcr/thorutil/thbuf.cpp @@ -16,6 +16,7 @@ ############################################################################## */ #include +#include #include #include #include "platform.h" @@ -608,6 +609,51 @@ class CSmartRowInMemoryBuffer: public CSimpleInterface, implements ISmartRowBuff } }; + +static std::tuple createSerialInputStream(IFile *iFile, ICompressHandler *compressHandler, const CommonBufferRowRWStreamOptions &options, unsigned numSharingCompressionBuffer) +{ + Owned iFileIO = iFile->open(IFOread); + Owned in = createSerialInputStream(iFileIO); + Owned inputStream = createBufferedInputStream(in, options.storageBlockSize, 0); + if (compressHandler) + { + const char *decompressOptions = nullptr; // at least for now! + Owned decompressor = compressHandler->getExpander(decompressOptions); + Owned decompressed = createDecompressingInputStream(inputStream, decompressor); + + size32_t compressionBlockSize = (size32_t)(options.totalCompressionBufferSize / numSharingCompressionBuffer); + if (compressionBlockSize < options.minCompressionBlockSize) + { + WARNLOG("Shared totalCompressionBufferSize=%" I64F "u, too small for number of numSharingCompressionBuffer(%u). Using minCompressionBlockSize(%u).", (unsigned __int64)options.totalCompressionBufferSize, numSharingCompressionBuffer, options.minCompressionBlockSize); + compressionBlockSize = options.minCompressionBlockSize; + } + inputStream.setown(createBufferedInputStream(decompressed, compressionBlockSize, 0)); + } + return { inputStream.getClear(), iFileIO.getClear() }; +} + +static std::tuple createSerialOutputStream(IFile *iFile, ICompressHandler *compressHandler, const CommonBufferRowRWStreamOptions &options, unsigned numSharingCompressionBuffer) +{ + Owned iFileIO = iFile->open(IFOcreate); // kept for stats purposes + Owned out = createSerialOutputStream(iFileIO); + Owned outputStream = createBufferedOutputStream(out, options.storageBlockSize); //prefered plane block size + if (compressHandler) + { + const char *compressOptions = nullptr; // at least for now! + Owned compressor = compressHandler->getCompressor(compressOptions); + Owned compressed = createCompressingOutputStream(outputStream, compressor); + size32_t compressionBlockSize = (size32_t)(options.totalCompressionBufferSize / numSharingCompressionBuffer); + if (compressionBlockSize < options.minCompressionBlockSize) + { + WARNLOG("Shared totalCompressionBufferSize=%" I64F "u, too small for number of numSharingCompressionBuffer(%u). Using minCompressionBlockSize(%u).", (unsigned __int64)options.totalCompressionBufferSize, numSharingCompressionBuffer, options.minCompressionBlockSize); + compressionBlockSize = options.minCompressionBlockSize; + } + + outputStream.setown(createBufferedOutputStream(compressed, compressionBlockSize)); + } + return { outputStream.getClear(), iFileIO.getClear() }; +} + //#define TRACE_SPILLING_ROWSTREAM // traces each row read/written, and other events // based on query that produces records with a single sequential (from 1) unsigned4 @@ -661,15 +707,17 @@ class CCompressedSpillingRowStream: public CSimpleInterfaceOf, std::unique_ptr outputStreamSerializer; memsize_t pendingFlushToDiskSz = 0; offset_t currentTempFileSize = 0; + CFileOwner *currentOwnedOutputFile = nullptr; Owned currentOutputIFileIO; // keep for stats - CSPSCQueue outputFiles; + CriticalSection outputFilesQCS; + std::queue outputFiles; unsigned writeTempFileNum = 0; std::atomic nextOutputRow = 0; // read by reader, updated by writer std::atomic committedRows = 0; // read by reader, updated by writer std::atomic spilt = false; // set by createOutputStream, checked by reader - std::deque outputFileEndRowMarkers; + std::queue outputFileEndRowMarkers; bool lastWriteWasEog = false; - bool outputComplete = false; + bool outputComplete = false; // only accessed and modified by writer or reader within readerWriterCS bool recentlyQueued = false; CriticalSection outputStreamCS; @@ -711,42 +759,40 @@ class CCompressedSpillingRowStream: public CSimpleInterfaceOf, VStringBuffer tmpFilename("%s.%u", baseTmpFilename.get(), writeTempFileNum++); trace("WRITE: writing to %s", tmpFilename.str()); Owned iFile = createIFile(tmpFilename); - assertex(outputFiles.enqueue(new CFileOwner(iFile, activity.queryTempFileSizeTracker()))); - - currentOutputIFileIO.setown(iFile->open(IFOcreate)); // kept for stats purposes - Owned out = createSerialOutputStream(currentOutputIFileIO); - outputStream.setown(createBufferedOutputStream(out, options.storageBlockSize)); //prefered plane block size - if (compressHandler) + currentOwnedOutputFile = new CFileOwner(iFile, activity.queryTempFileSizeTracker()); // used by checkFlushToDisk to noteSize { - const char *compressOptions = nullptr; - Owned compressor = compressHandler->getCompressor(compressOptions); - Owned compressed = createCompressingOutputStream(outputStream, compressor); - compressionBlockSize = std::max(options.totalCompressionBufferSize, (memsize_t)(256 * 1024)); - outputStream.setown(createBufferedOutputStream(compressed, compressionBlockSize)); + CriticalBlock b(outputFilesQCS); + outputFiles.push(currentOwnedOutputFile); // NB: takes ownership } + + auto res = createSerialOutputStream(iFile, compressHandler, options, 2); // (2) input & output sharing totalCompressionBufferSize + outputStream.setown(std::get<0>(res)); + currentOutputIFileIO.setown(std::get<1>(res)); outputStreamSerializer = std::make_unique(outputStream); } void createNextInputStream() { CFileOwner *dequeuedOwnedIFile = nullptr; - assertex(outputFiles.dequeue(dequeuedOwnedIFile)); + { + CriticalBlock b(outputFilesQCS); + dequeuedOwnedIFile = outputFiles.front(); + outputFiles.pop(); + } currentOwnedInputFile.setown(dequeuedOwnedIFile); IFile *iFile = ¤tOwnedInputFile->queryIFile(); trace("READ: reading from %s", iFile->queryFilename()); - currentInputIFileIO.setown(iFile->open(IFOread)); - Owned in = createSerialInputStream(currentInputIFileIO); - inputStream.setown(createBufferedInputStream(in, options.storageBlockSize, 0)); - if (compressHandler) - { - const char *decompressOptions = nullptr; - Owned decompressor = compressHandler->getExpander(decompressOptions); - Owned decompressed = createDecompressingInputStream(inputStream, decompressor); - inputStream.setown(createBufferedInputStream(decompressed, compressionBlockSize, 0)); - inputDeserializerSource.setStream(inputStream); - } + + auto res = createSerialInputStream(iFile, compressHandler, options, 2); // (2) input & output sharing totalCompressionBufferSize + inputStream.setown(std::get<0>(res)); + currentInputIFileIO.setown(std::get<1>(res)); + inputDeserializerSource.setStream(inputStream); } const void *readRowFromStream() { + // readRowFromStream() called from readToMarker (which will block before calling this if behind committedRows), + // or when outputComplete. + // Either way, it will not enter this method until the writer has committed ahead of the reader nextInputRow + // NB: currentTempFileEndRow will be 0 if 1st input read // nextInputRow can be > currentTempFileEndRow, because the writer/read may have used the Q // beyond this point, the next row in the stream could be anywhere above. @@ -758,24 +804,9 @@ class CCompressedSpillingRowStream: public CSimpleInterfaceOf, { if (!outputFileEndRowMarkers.empty()) { - // if the read caught up, and had previously set currentTempFileEndRow to unbounded, - // but now we have enties, spin through until outputFileEndRowMarkers ahead of nextInputRow - while (true) - { - currentTempFileEndRow = outputFileEndRowMarkers.front(); - outputFileEndRowMarkers.pop_front(); - if (currentTempFileEndRow > nextInputRow) - { - trace("READ: setting currentTempFileEndRow: %" RCPF "u", currentTempFileEndRow.load()); - break; - } - else if (outputFileEndRowMarkers.empty()) - { - trace("READ: setting currentTempFileEndRow: unbounded (2)"); - currentTempFileEndRow = (rowcount_t)-1; // unbounded for now, writer will set when it knows - break; - } - } + currentTempFileEndRow = outputFileEndRowMarkers.front(); + outputFileEndRowMarkers.pop(); + assertex(currentTempFileEndRow > nextInputRow); } else { @@ -845,40 +876,46 @@ class CCompressedSpillingRowStream: public CSimpleInterfaceOf, { if (pendingFlushToDiskSz <= threshold) return false; - rowcount_t _nextOutputRow = nextOutputRow.load(); - trace("WRITE: Flushed to disk. nextOutputRow = %" RCPF "u", _nextOutputRow); + rowcount_t currentNextOutputRow = nextOutputRow.load(); + trace("WRITE: Flushed to disk. nextOutputRow = %" RCPF "u", currentNextOutputRow); outputStream->flush(); currentTempFileSize += pendingFlushToDiskSz; + currentOwnedOutputFile->noteSize(currentTempFileSize); pendingFlushToDiskSz = 0; if (currentTempFileSize > options.tempFileGranularity) { currentTempFileSize = 0; { CriticalBlock b(outputStreamCS); + // set if reader isn't bounded yet, or queue next boundary if ((rowcount_t)-1 == currentTempFileEndRow) { - currentTempFileEndRow = _nextOutputRow; + currentTempFileEndRow = currentNextOutputRow; trace("WRITE: setting currentTempFileEndRow: %" RCPF "u", currentTempFileEndRow.load()); } - outputFileEndRowMarkers.push_back(nextOutputRow); - trace("WRITE: adding to tempFileEndRowMarker(size=%u): %" RCPF "u", (unsigned)outputFileEndRowMarkers.size(), _nextOutputRow); + else + { + outputFileEndRowMarkers.push(currentNextOutputRow); + trace("WRITE: adding to tempFileEndRowMarker(size=%u): %" RCPF "u", (unsigned)outputFileEndRowMarkers.size(), currentNextOutputRow); + } } createNextOutputStream(); } - committedRows = _nextOutputRow; + committedRows = currentNextOutputRow; return true; } void addRow(const void *row) { bool queued = false; size32_t rowSz = row ? thorRowMemoryFootprint(serializer, row) : 0; - if (rowSz + inMemRowsMemoryUsage < options.inMemMaxMem) + if (rowSz + inMemRowsMemoryUsage <= options.inMemMaxMem) queued = inMemRows.enqueue({ row, nextOutputRow, rowSz }); // takes ownership of 'row' if successful if (queued) { trace("WRITE: Q: nextOutputRow: %" RCPF "u", nextOutputRow.load()); inMemRowsMemoryUsage += rowSz; ++nextOutputRow; + recentlyQueued = true; } else { @@ -894,8 +931,6 @@ class CCompressedSpillingRowStream: public CSimpleInterfaceOf, } // do not wake up reader every time a row is queued (but granularly) to avoid excessive flapping - if (queued) - recentlyQueued = true; if (recentlyQueued && (0 == (nextOutputRow % readerWakeupGranularity))) { recentlyQueued = false; @@ -950,17 +985,17 @@ class CCompressedSpillingRowStream: public CSimpleInterfaceOf, ++nextInputRow; return ret; } - else if (markerRow > committedRows) // rows we need have not yet been committed to disk + else if (nextInputRow >= committedRows) // row we need have not yet been committed to disk. { CLeavableCriticalBlock b(readerWriterCS); - if (markerRow > committedRows) // JCSMORE this is waiting too long, I don't need markerRow to be committed, I need nextInputRow to be committed .. + if (nextInputRow >= committedRows) { // wait for writer to commit readerWaitingForCommit = true; b.leave(); - trace("READ: waiting for committedRows(currently = %" RCPF "u) to catch up to markerRow(%" RCPF "u), nextInputRow = %" RCPF "u", committedRows.load(), markerRow, nextInputRow); + trace("READ: waiting for committedRows(currently = %" RCPF "u) to catch up to nextInputRow = %" RCPF "u", committedRows.load(), nextInputRow); moreRows.wait(); - assertex(markerRow <= committedRows); + assertex(nextInputRow < committedRows); } } const void *row = readRowFromStream(); @@ -993,21 +1028,13 @@ class CCompressedSpillingRowStream: public CSimpleInterfaceOf, inMemRows.setCapacity(options.inMemMaxMem / minSize); assertex(options.writeAheadSize < options.tempFileGranularity); - - // let's allow plenty of room. Allow for ~4TB of temp file usage (with default tempFileGranularity of 1GB) - size32_t capacity = 4000LL*1000*0x100000 / options.tempFileGranularity; - if (capacity > 100'000'000) // would only be so high if tempFileGranulity low (for testing) - capacity = 100'000'000; - outputFiles.setCapacity(capacity); } ~CCompressedSpillingRowStream() { - while (true) + while (!outputFiles.empty()) { - CFileOwner *fileOwner; - if (!outputFiles.dequeue(fileOwner)) - break; - fileOwner->Release(); + ::Release(outputFiles.front()); + outputFiles.pop(); } RowEntry e; while (true) @@ -2394,6 +2421,7 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf input; + unsigned numOutputs = 0; Linked meta; Linked serializer; Linked deserializer; @@ -2413,7 +2441,6 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOfopen(IFOcreate)); // kept for stats purposes - Owned out = createSerialOutputStream(iFileIO); - outputStream.setown(createBufferedOutputStream(out, options.storageBlockSize)); //prefered plane block size - if (compressHandler) - { - const char *compressOptions = nullptr; - Owned compressor = compressHandler->getCompressor(compressOptions); - Owned compressed = createCompressingOutputStream(outputStream, compressor); - outputStream.setown(createBufferedOutputStream(compressed, compressionBlockSize)); - } + auto res = createSerialOutputStream(iFile, compressHandler, options, numOutputs + 1); + outputStream.setown(std::get<0>(res)); + iFileIO.setown(std::get<1>(res)); totalInputRowsRead = inMemTotalRows; } void writeRowsFromInput() @@ -2507,8 +2527,8 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf(row)); } public: - explicit CSharedFullSpillingWriteAhead(CActivityBase *_activity, unsigned numOutputs, IRowStream *_input, bool _inputGrouped, const SharedRowStreamReaderOptions &_options, IThorRowInterfaces *rowIf, const char *tempFileName, ICompressHandler *_compressHandler) - : activity(*_activity), input(_input), inputGrouped(_inputGrouped), options(_options), compressHandler(_compressHandler), + explicit CSharedFullSpillingWriteAhead(CActivityBase *_activity, unsigned _numOutputs, IRowStream *_input, bool _inputGrouped, const SharedRowStreamReaderOptions &_options, IThorRowInterfaces *rowIf, const char *tempFileName, ICompressHandler *_compressHandler) + : activity(*_activity), numOutputs(_numOutputs), input(_input), inputGrouped(_inputGrouped), options(_options), compressHandler(_compressHandler), meta(rowIf->queryRowMetaData()), serializer(rowIf->queryRowSerializer()), allocator(rowIf->queryRowAllocator()), deserializer(rowIf->queryRowDeserializer()) { assertex(input); @@ -2518,17 +2538,6 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf options.inMemMaxMem) inMemReadAheadGranularity = options.inMemMaxMem; - constexpr size32_t minCompressionBlockSize = 256 * 1024; - memsize_t totalCompressionBufferSize = options.totalCompressionBufferSize; - if (totalCompressionBufferSize) - { - compressionBlockSize = (size32_t)(totalCompressionBufferSize / (numOutputs + 1)); // +1 for writer - if (compressionBlockSize < minCompressionBlockSize) - { - WARNLOG("Shared totalCompressionBufferSize=%" I64F "u, too small for number of outputs(%u). Using minCompressionBlockSize(%u) for writer and each reader.", (unsigned __int64)totalCompressionBufferSize, numOutputs, minCompressionBlockSize); - compressionBlockSize = minCompressionBlockSize; - } - } for (unsigned o=0; o getReadStream() // also pass back IFileIO for stats purposes { - Owned iFileIO = iFile->open(IFOread); - Owned in = createSerialInputStream(iFileIO); - Owned inputStream = createBufferedInputStream(in, options.storageBlockSize, 0); - if (compressHandler) - { - const char *decompressOptions = nullptr; - Owned decompressor = compressHandler->getExpander(decompressOptions); - Owned decompressed = createDecompressingInputStream(inputStream, decompressor); - inputStream.setown(createBufferedInputStream(decompressed, compressionBlockSize, 0)); - } - return { inputStream.getClear(), iFileIO.getClear() }; + return createSerialInputStream(iFile, compressHandler, options, numOutputs + 1); // +1 for writer } bool checkWriteAhead(rowcount_t &outputRowsAvailable) { diff --git a/thorlcr/thorutil/thbuf.hpp b/thorlcr/thorutil/thbuf.hpp index 3ee9a496929..1750f63b007 100644 --- a/thorlcr/thorutil/thbuf.hpp +++ b/thorlcr/thorutil/thbuf.hpp @@ -39,15 +39,21 @@ typedef QueueOf ThorRowQueue; struct CommonBufferRowRWStreamOptions { - offset_t storageBlockSize = 256 * 1024; // block size of read/write streams + offset_t storageBlockSize = 256 * 1024; // block size of read/write streams + size32_t minCompressionBlockSize = 256 * 1024; // minimum block size for compression memsize_t totalCompressionBufferSize = 3000 * 1024; // compression buffer size of read streams (split between writer and outputs) - memsize_t inMemMaxMem = 2000 * 1024; // before spilling begins. - offset_t writeAheadSize = 2000 * 1024; // once spilling, maximum size to write ahead + memsize_t inMemMaxMem = 2000 * 1024; // before spilling begins. + offset_t writeAheadSize = 2000 * 1024; // once spilling, maximum size to write ahead unsigned heapFlags = roxiemem::RHFunique|roxiemem::RHFblocked; }; struct LookAheadOptions : CommonBufferRowRWStreamOptions { + LookAheadOptions() + { + // override defaults + totalCompressionBufferSize = 2000 * 1024; // compression buffer size of read streams (split between writer and outputs) + } offset_t tempFileGranularity = 1000 * 0x100000; // 1GB };