Skip to content

Commit

Permalink
leak fixes and minor clearup
Browse files Browse the repository at this point in the history
Signed-off-by: Jake Smith <[email protected]>
  • Loading branch information
jakesmith committed Jun 25, 2024
1 parent 2611a11 commit 51c7c9c
Showing 1 changed file with 18 additions and 6 deletions.
24 changes: 18 additions & 6 deletions thorlcr/thorutil/thbuf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -674,7 +674,6 @@ class CCompressedSpillingRowStream: public CSimpleInterfaceOf<ISmartRowBuffer>,
CriticalSection outputStreamCS;

// temp read related members
unsigned readTempFileNum = 0;
std::atomic<rowcount_t> currentTempFileEndRow = 0;
Owned<IFileIO> currentInputIFileIO; // keep for stats
Linked<CFileOwner> currentOwnedInputFile;
Expand Down Expand Up @@ -729,10 +728,8 @@ class CCompressedSpillingRowStream: public CSimpleInterfaceOf<ISmartRowBuffer>,
}
void createNextInputStream()
{
dbgassertex(readTempFileNum <= writeTempFileNum);
CFileOwner *dequeuedOwnedIFile = nullptr;
bool tf = outputFiles.dequeue(dequeuedOwnedIFile);
assertex(tf);
assertex(outputFiles.dequeue(dequeuedOwnedIFile));
currentOwnedInputFile.setown(dequeuedOwnedIFile);
IFile *iFile = &currentOwnedInputFile->queryIFile();
trace("READ: reading from %s", iFile->queryFilename());
Expand Down Expand Up @@ -876,7 +873,7 @@ class CCompressedSpillingRowStream: public CSimpleInterfaceOf<ISmartRowBuffer>,
bool queued = false;
size32_t rowSz = row ? thorRowMemoryFootprint(serializer, row) : 0;
if (rowSz + inMemRowsMemoryUsage < options.inMemMaxMem)
queued = inMemRows.enqueue({ row, nextOutputRow, rowSz });
queued = inMemRows.enqueue({ row, nextOutputRow, rowSz }); // takes ownership of 'row' if successful
if (queued)
{
trace("WRITE: Q: nextOutputRow: %" RCPF "u", nextOutputRow.load());
Expand All @@ -887,6 +884,7 @@ class CCompressedSpillingRowStream: public CSimpleInterfaceOf<ISmartRowBuffer>,
{
trace("WRITE: S: nextOutputRow: %" RCPF "u", nextOutputRow.load());
writeRowToStream(row, rowSz); // JCSMORE - rowSz is memory not disk size... does it matter that much?
::ReleaseThorRow(row);
++nextOutputRow;
if (checkFlushToDisk(options.writeAheadSize))
{
Expand All @@ -912,7 +910,7 @@ class CCompressedSpillingRowStream: public CSimpleInterfaceOf<ISmartRowBuffer>,
if (writeRow == nextInputRow)
{
#ifdef STRESSTEST_SPILLING_ROWSTREAM
if (0 == (nextInputRow % 100))
if (stressTest && (0 == (nextInputRow % 100)))
MilliSleep(5);
#endif

Expand Down Expand Up @@ -996,6 +994,8 @@ class CCompressedSpillingRowStream: public CSimpleInterfaceOf<ISmartRowBuffer>,

// let's allow plenty of room. Allow for ~4TB of temp file usage
size32_t capacity = 4000LL*1000*0x100000 / options.tempFileGranularity;
if (capacity > 100000) // cap silly sizes, if someone has customized tempFileGranularity
capacity = 100000;
outputFiles.setCapacity(capacity);
}
~CCompressedSpillingRowStream()
Expand All @@ -1007,6 +1007,18 @@ class CCompressedSpillingRowStream: public CSimpleInterfaceOf<ISmartRowBuffer>,
break;
fileOwner->Release();
}
RowEntry e;
while (true)
{
if (!inMemRows.dequeue(e))
break;
const void *row = std::get<0>(e);
if (row)
ReleaseThorRow(row);
}
const void *markerRow = std::get<0>(readFromStreamMarker);
if (markerRow)
ReleaseThorRow(markerRow);
}

// ISmartRowBuffer
Expand Down

0 comments on commit 51c7c9c

Please sign in to comment.