Skip to content

Commit

Permalink
Merge pull request #18805 from jakesmith/HPCC-32132-compressed-lookahead
Browse files Browse the repository at this point in the history
HPCC-32132 New compressed spilling lookahead implementation

Reviewed-by: Gavin Halliday <[email protected]>
Merged-by: Gavin Halliday <[email protected]>
  • Loading branch information
ghalliday authored Jun 27, 2024
2 parents 29ad0a3 + ee5c128 commit ca1439c
Show file tree
Hide file tree
Showing 6 changed files with 707 additions and 49 deletions.
54 changes: 54 additions & 0 deletions system/jlib/jqueue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -546,5 +546,59 @@ class DListOf
}
};

// Lockfree Single Producer Single Conumser bounded queue implementation
// No mutexes are required to interact with the queue, as long as there's a single consumer thread, and a single writer thread.
template <typename T>
class CSPSCQueue
{
size32_t maxCapacity = 0;
std::vector<T> elements;
std::atomic<size32_t> head = 0;
std::atomic<size32_t> tail = 0;

inline size32_t increment(size32_t idx) const
{
size32_t next = idx+1;
if (next == maxCapacity)
next = 0;
return next;
}
public:
CSPSCQueue()
{
// should set capacity before using
}
CSPSCQueue(size32_t _maxCapacity)
: maxCapacity(_maxCapacity + 1), // +1 to distinguish full vs empty
elements(maxCapacity)
{
}
void setCapacity(size32_t _maxCapacity)
{
maxCapacity = _maxCapacity + 1;
elements.resize(maxCapacity);
}
bool enqueue(const T e)
{
size32_t currentHead = head;
size32_t nextHead = increment(currentHead);
if (nextHead == tail)
return false; // full

elements[currentHead] = std::move(e);
head = nextHead;
return true;
}
bool dequeue(T &res)
{
size32_t currentTail = tail;
if (currentTail == head)
return false; // empty

res = std::move(elements[currentTail]);
tail = increment(currentTail);
return true;
}
};

#endif
4 changes: 2 additions & 2 deletions thorlcr/activities/nsplitter/thnsplitterslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,9 +251,9 @@ class NSplitterSlaveActivity : public CSlaveActivity, implements ISharedSmartBuf
if ((size32_t)-1 != blockedSequentialIOSize)
options.storageBlockSize = blockedSequentialIOSize;
}
options.totalCompressionBufferSize = getOptInt(THOROPT_SPLITTER_COMPRESSIONTOALK, options.totalCompressionBufferSize / 1024) * 1024;
options.totalCompressionBufferSize = getOptInt(THOROPT_SPLITTER_COMPRESSIONTOTALK, options.totalCompressionBufferSize / 1024) * 1024;
options.inMemMaxMem = getOptInt(THOROPT_SPLITTER_MAXROWMEMK, options.inMemMaxMem / 1024) * 1024;
options.spillWriteAheadSize = getOptInt64(THOROPT_SPLITTER_WRITEAHEADK, options.spillWriteAheadSize / 1024) * 1024;
options.writeAheadSize = getOptInt64(THOROPT_SPLITTER_WRITEAHEADK, options.writeAheadSize / 1024) * 1024;
options.inMemReadAheadGranularity = getOptInt(THOROPT_SPLITTER_READAHEADGRANULARITYK, options.inMemReadAheadGranularity / 1024) * 1024;
options.inMemReadAheadGranularityRows = getOptInt(THOROPT_SPLITTER_READAHEADGRANULARITYROWS, options.inMemReadAheadGranularity);
options.heapFlags = getOptInt("spillheapflags", options.heapFlags);
Expand Down
40 changes: 36 additions & 4 deletions thorlcr/activities/thactivityutil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ class CRowStreamLookAhead : public CSimpleInterfaceOf<IStartableEngineRowStream>
rowcount_t required;
Semaphore startSem;
Owned<IException> getexception;
LookAheadOptions options;
bool newLookAhead = false;

class CThread: public Thread
{
Expand Down Expand Up @@ -94,12 +96,19 @@ class CRowStreamLookAhead : public CSimpleInterfaceOf<IStartableEngineRowStream>
{
try
{
StringBuffer temp;
if (allowspill)
GetTempFilePath(temp,"lookahd");
assertex(bufsize);
if (allowspill)
smartbuf.setown(createSmartBuffer(&activity, temp.str(), bufsize, rowIf));
{
StringBuffer temp;
GetTempFilePath(temp,"lookahd");
if (newLookAhead)
{
ICompressHandler *compressHandler = options.totalCompressionBufferSize ? queryDefaultCompressHandler() : nullptr;
smartbuf.setown(createCompressedSpillingRowStream(&activity, temp.str(), preserveGrouping, rowIf, options, compressHandler));
}
else
smartbuf.setown(createSmartBuffer(&activity, temp.str(), bufsize, rowIf));
}
else
smartbuf.setown(createSmartInMemoryBuffer(&activity, rowIf, bufsize));
startSem.signal();
Expand Down Expand Up @@ -207,6 +216,29 @@ class CRowStreamLookAhead : public CSimpleInterfaceOf<IStartableEngineRowStream>
running = true;
required = _required;
count = 0;

newLookAhead = activity.getOptBool("newlookahead", false);
if (activity.getOptBool("forcenewlookahead"))
{
newLookAhead = true;
allowspill = true;
}

// for "newlookahead" only
if (isContainerized())
{
// JCSMORE - add CJobBase::getTempBlockSize() to calc. once.
StringBuffer planeName;
if (!getDefaultPlane(planeName, "@tempPlane", "temp"))
getDefaultPlane(planeName, "@spillPlane", "spill");
size32_t blockedSequentialIOSize = getPlaneAttributeValue(planeName, BlockedSequentialIO, (size32_t)-1);
if ((size32_t)-1 != blockedSequentialIOSize)
options.storageBlockSize = blockedSequentialIOSize;
}
options.totalCompressionBufferSize = activity.getOptInt(THOROPT_LOOKAHEAD_COMPRESSIONTOTALK, options.totalCompressionBufferSize / 1024) * 1024;
options.inMemMaxMem = activity.getOptInt(THOROPT_LOOKAHEAD_MAXROWMEMK, options.inMemMaxMem / 1024) * 1024;
options.writeAheadSize = activity.getOptInt64(THOROPT_LOOKAHEAD_WRITEAHEADK, options.writeAheadSize / 1024) * 1024;
options.tempFileGranularity = activity.getOptInt64(THOROPT_LOOKAHEAD_TEMPFILE_GRANULARITY, options.tempFileGranularity / 0x100000) * 0x100000;
}
~CRowStreamLookAhead()
{
Expand Down
Loading

0 comments on commit ca1439c

Please sign in to comment.