Skip to content

Commit

Permalink
add inMemReadAheadGranularityRows
Browse files Browse the repository at this point in the history
  • Loading branch information
jakesmith committed Jun 13, 2024
1 parent 3ad21e1 commit e232c84
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 4 deletions.
1 change: 1 addition & 0 deletions thorlcr/activities/nsplitter/thnsplitterslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ class NSplitterSlaveActivity : public CSlaveActivity, implements ISharedSmartBuf
options.inMemMaxMem = getOptInt(THOROPT_SPLITTER_MAXROWMEMK, options.inMemMaxMem / 1024) * 1024;
options.spillWriteAheadSize = getOptInt64(THOROPT_SPLITTER_WRITEAHEADK, options.spillWriteAheadSize / 1024) * 1024;
options.inMemReadAheadGranularity = getOptInt(THOROPT_SPLITTER_READAHEADGRANULARITYK, options.inMemReadAheadGranularity / 1024) * 1024;
options.inMemReadAheadGranularityRows = getOptInt(THOROPT_SPLITTER_READAHEADGRANULARITYROWS, options.inMemReadAheadGranularity / 1024) * 1024;

ICompressHandler *compressHandler = queryDefaultCompressHandler();
sharedRowStream.setown(createSharedFullSpillingWriteAhead(this, numOutputs, inputStream, isGrouped(), options, queryRowInterfaces(input), tempname.str(), compressHandler));
Expand Down
5 changes: 3 additions & 2 deletions thorlcr/thorutil/thbuf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2050,7 +2050,7 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader
return false;
}

// read more, up to inMemReadAheadGranularity before relinquishing
// read more, up to inMemReadAheadGranularity or inMemReadAheadGranularityRows before relinquishing
rowcount_t previousNumRows = rows.size();
while (true)
{
Expand All @@ -2061,7 +2061,8 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader
size32_t sz = thorRowMemoryFootprint(serializer, row);
rows.emplace_back(row, sz);
rowsMemUsage += sz;
if (rowsMemUsage >= options.inMemReadAheadGranularity)
if ((rowsMemUsage >= options.inMemReadAheadGranularity) ||
(rows.size() >= options.inMemReadAheadGranularityRows))
break;
}
else
Expand Down
3 changes: 2 additions & 1 deletion thorlcr/thorutil/thbuf.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ struct SharedRowStreamReaderOptions
offset_t storageBlockSize = 256 * 1024; // block size of read/write streams
memsize_t compressionBlockSize = 256 * 1024; // compression buffer size of read streams
memsize_t inMemMaxMem = 2000 * 1024; // before spilling begins.
memsize_t inMemReadAheadGranularity = 256 * 1024; // granularity of read ahead (will allow readers to get in)
memsize_t inMemReadAheadGranularity = 128 * 1024; // granularity (K) of read ahead
rowcount_t inMemReadAheadGranularityRows = 64; // granularity (rows) of read ahead. NB: whichever granularity is hit first
// JCSMORE - better name (because this is only applicable once starts spilling)
offset_t spillWriteAheadSize = 4000 * 1024; // once spilling, maximum size to write ahead
};
Expand Down
3 changes: 2 additions & 1 deletion thorlcr/thorutil/thormisc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@
#define THOROPT_HDIST_COMPOPTIONS "hdCompressorOptions" // Distribute compressor options, e.g. AES key (default = "")
#define THOROPT_SPLITTER_SPILL "splitterSpill" // Force splitters to spill or not, default is to adhere to helper setting (default = -1)
#define THOROPT_SPLITTER_MAXROWMEMK "splitterRowMemK" // Splitter max memory (K) to use before spilling (default = 2MB)
#define THOROPT_SPLITTER_READAHEADGRANULARITYK "inMemReadAheadGranularity" // Splitter in memory read ahead granularity (K) (default = 256K)
#define THOROPT_SPLITTER_READAHEADGRANULARITYK "inMemReadAheadGranularityK" // Splitter in memory read ahead granularity (K) (default = 128K)
#define THOROPT_SPLITTER_READAHEADGRANULARITYROWS "inMemReadAheadGranularityRows" // Splitter in memory read ahead granularity (K) (default = 64)
#define THOROPT_SPLITTER_WRITEAHEADK "splitterWriteAheadK" // Splitter spilling write ahead size (K) (default = 4MB)
#define THOROPT_SPLITTER_COMPRESSIONBLOCKK "splitterCompressionBlockSizeK" // Splitter compression block size (K) (default = 256K)
#define THOROPT_LOOP_MAX_EMPTY "loopMaxEmpty" // Max # of iterations that LOOP can cycle through with 0 results before errors (default = 1000)
Expand Down

0 comments on commit e232c84

Please sign in to comment.