diff --git a/thorlcr/thorutil/thbuf.cpp b/thorlcr/thorutil/thbuf.cpp index eac053cd854..163630b575a 100644 --- a/thorlcr/thorutil/thbuf.cpp +++ b/thorlcr/thorutil/thbuf.cpp @@ -674,7 +674,6 @@ class CCompressedSpillingRowStream: public CSimpleInterfaceOf, CriticalSection outputStreamCS; // temp read related members - unsigned readTempFileNum = 0; std::atomic currentTempFileEndRow = 0; Owned currentInputIFileIO; // keep for stats Linked currentOwnedInputFile; @@ -729,10 +728,8 @@ class CCompressedSpillingRowStream: public CSimpleInterfaceOf, } void createNextInputStream() { - dbgassertex(readTempFileNum <= writeTempFileNum); CFileOwner *dequeuedOwnedIFile = nullptr; - bool tf = outputFiles.dequeue(dequeuedOwnedIFile); - assertex(tf); + assertex(outputFiles.dequeue(dequeuedOwnedIFile)); currentOwnedInputFile.setown(dequeuedOwnedIFile); IFile *iFile = ¤tOwnedInputFile->queryIFile(); trace("READ: reading from %s", iFile->queryFilename()); @@ -876,7 +873,7 @@ class CCompressedSpillingRowStream: public CSimpleInterfaceOf, bool queued = false; size32_t rowSz = row ? thorRowMemoryFootprint(serializer, row) : 0; if (rowSz + inMemRowsMemoryUsage < options.inMemMaxMem) - queued = inMemRows.enqueue({ row, nextOutputRow, rowSz }); + queued = inMemRows.enqueue({ row, nextOutputRow, rowSz }); // takes ownership of 'row' if successful if (queued) { trace("WRITE: Q: nextOutputRow: %" RCPF "u", nextOutputRow.load()); @@ -887,6 +884,7 @@ class CCompressedSpillingRowStream: public CSimpleInterfaceOf, { trace("WRITE: S: nextOutputRow: %" RCPF "u", nextOutputRow.load()); writeRowToStream(row, rowSz); // JCSMORE - rowSz is memory not disk size... does it matter that much? + ::ReleaseThorRow(row); ++nextOutputRow; if (checkFlushToDisk(options.writeAheadSize)) { @@ -912,7 +910,7 @@ class CCompressedSpillingRowStream: public CSimpleInterfaceOf, if (writeRow == nextInputRow) { #ifdef STRESSTEST_SPILLING_ROWSTREAM - if (0 == (nextInputRow % 100)) + if (stressTest && (0 == (nextInputRow % 100))) MilliSleep(5); #endif @@ -996,6 +994,8 @@ class CCompressedSpillingRowStream: public CSimpleInterfaceOf, // let's allow plenty of room. Allow for ~4TB of temp file usage size32_t capacity = 4000LL*1000*0x100000 / options.tempFileGranularity; + if (capacity > 100000) // cap silly sizes, if someone has customized tempFileGranularity + capacity = 100000; outputFiles.setCapacity(capacity); } ~CCompressedSpillingRowStream() @@ -1007,6 +1007,18 @@ class CCompressedSpillingRowStream: public CSimpleInterfaceOf, break; fileOwner->Release(); } + RowEntry e; + while (true) + { + if (!inMemRows.dequeue(e)) + break; + const void *row = std::get<0>(e); + if (row) + ReleaseThorRow(row); + } + const void *markerRow = std::get<0>(readFromStreamMarker); + if (markerRow) + ReleaseThorRow(markerRow); } // ISmartRowBuffer