Skip to content

Commit

Permalink
Merge pull request #18738 from jakesmith/HPCC-32017-newsplitter
Browse files Browse the repository at this point in the history
HPCC-32017 New compressing splitter implementation

Reviewed-by: Gavin Halliday <[email protected]>
Merged-by: Gavin Halliday <[email protected]>
  • Loading branch information
ghalliday authored Jun 14, 2024
2 parents 65e845e + 2116ab4 commit fcb0440
Show file tree
Hide file tree
Showing 7 changed files with 650 additions and 43 deletions.
29 changes: 29 additions & 0 deletions rtl/eclrtl/rtlread_imp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,5 +94,34 @@ class ECLRTL_API CThorStreamDeserializerSource : implements IRowDeserializerSour
Linked<ISerialStream> in; // could use a CStreamSerializer class (with inlines to improve)
};

class ECLRTL_API COutputStreamSerializer : public CSimpleInterfaceOf<IRowSerializerTarget>
{
Linked<IBufferedSerialOutputStream> outputStream;
unsigned nesting = 0;
offset_t outerNestingOffset = 0;

public:
COutputStreamSerializer(IBufferedSerialOutputStream *_outputStream) : outputStream(_outputStream)
{
}
virtual void put(size32_t len, const void * ptr) override
{
outputStream->put(len, ptr);
}
virtual size32_t beginNested(size32_t count) override
{
outputStream->suspend(sizeof(size32_t));
if (nesting++ == 0)
outerNestingOffset = outputStream->tell();
return outputStream->tell()-outerNestingOffset;
}
virtual void endNested(size32_t delta) override
{
size32_t patchedLength = outputStream->tell() - (delta + outerNestingOffset);
outputStream->resume(sizeof(size32_t), &patchedLength);
nesting--;
}
};


#endif
7 changes: 5 additions & 2 deletions system/jlib/jutil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2759,10 +2759,10 @@ StringBuffer &getFileAccessUrl(StringBuffer &out)
return out;
}


#ifdef _CONTAINERIZED
bool getDefaultPlane(StringBuffer &ret, const char * componentOption, const char * category)
{
if (!isContainerized())
throwUnexpectedX("getDefaultPlane() called from non-container system");
// If the plane is specified for the component, then use that
if (getComponentConfigSP()->getProp(componentOption, ret))
return true;
Expand All @@ -2780,8 +2780,11 @@ bool getDefaultPlane(StringBuffer &ret, const char * componentOption, const char
return false;
}

#ifdef _CONTAINERIZED
static bool getDefaultPlaneDirectory(StringBuffer &ret, const char * componentOption, const char * category)
{
if (!isContainerized())
throwUnexpectedX("getDefaultPlaneDirectory() called from non-container system");
StringBuffer planeName;
if (!getDefaultPlane(planeName, componentOption, category))
return false;
Expand Down
2 changes: 0 additions & 2 deletions system/jlib/jutil.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -657,9 +657,7 @@ extern jlib_decl bool checkCreateDaemon(unsigned argc, const char * * argv);
//Createpassword of specified length, containing UpperCaseAlphas, LowercaseAlphas, numerics and symbols
extern jlib_decl const char * generatePassword(StringBuffer &pwd, int pwdLen);

#ifdef _CONTAINERIZED
extern jlib_decl bool getDefaultPlane(StringBuffer &ret, const char * componentOption, const char * category);
#endif

extern jlib_decl void getResourceFromJfrog(StringBuffer &localPath, IPropertyTree &item);

Expand Down
88 changes: 69 additions & 19 deletions thorlcr/activities/nsplitter/thnsplitterslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class CSplitterOutput : public CSimpleInterfaceOf<IStartableEngineRowStream>, pu
NSplitterSlaveActivity &activity;
Semaphore writeBlockSem;
bool started = false, stopped = false;
IRowStream *splitterStream = nullptr;

unsigned outIdx;
rowcount_t rec = 0, max = 0;
Expand All @@ -43,6 +44,7 @@ class CSplitterOutput : public CSimpleInterfaceOf<IStartableEngineRowStream>, pu
{
started = stopped = false;
rec = max = 0;
splitterStream = nullptr;
}
inline bool isStopped() const { return stopped; }

Expand Down Expand Up @@ -80,6 +82,7 @@ class NSplitterSlaveActivity : public CSlaveActivity, implements ISharedSmartBuf
typedef CSlaveActivity PARENT;

bool spill = false;
bool newSplitter = false;
bool eofHit = false;
bool writeBlocked = false, pagedOut = false;
CriticalSection connectLock, prepareInputLock, writeAheadCrit;
Expand All @@ -91,7 +94,8 @@ class NSplitterSlaveActivity : public CSlaveActivity, implements ISharedSmartBuf
unsigned connectedOutputCount = (unsigned)-1; // uninitialized
rowcount_t recsReady = 0;
Owned<IException> writeAheadException;
Owned<ISharedSmartBuffer> smartBuf;
Owned<ISharedRowStreamReader> sharedRowStream;
Owned<ISharedSmartBufferRowWriter> sharedSmartRowWriter;
bool inputPrepared = false;
bool inputConnected = false;
unsigned numOutputs = 0;
Expand Down Expand Up @@ -161,6 +165,12 @@ class NSplitterSlaveActivity : public CSlaveActivity, implements ISharedSmartBuf
spill = dV>0;
ForEachItemIn(o, container.outputs)
appendOutput(new CSplitterOutput(*this, o));
newSplitter = getOptBool("newsplitter", false);
if (getOptBool("forcenewsplitter", false))
{
newSplitter = true;
spill = true;
}
}
virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData) override
{
Expand Down Expand Up @@ -219,29 +229,58 @@ class NSplitterSlaveActivity : public CSlaveActivity, implements ISharedSmartBuf
assertex(activeOutputCount); // must be >=1, as an output start() invoked prepareInput
if (1 == activeOutputCount)
return; // single output in use which will be read directly
if (smartBuf)
smartBuf->reset();
if (sharedRowStream)
sharedRowStream->reset();
else
{
if (spill)
{
StringBuffer tempname;
GetTempFilePath(tempname, "nsplit");
smartBuf.setown(createSharedSmartDiskBuffer(this, tempname.str(), numOutputs, queryRowInterfaces(input)));
ActPrintLog("Using temp spill file: %s", tempname.str());
if (newSplitter)
{
SharedRowStreamReaderOptions options;
if (isContainerized())
{
StringBuffer planeName;
if (!getDefaultPlane(planeName, "@tempPlane", "temp"))
getDefaultPlane(planeName, "@spillPlane", "spill");
size32_t blockedSequentialIOSize = getPlaneAttributeValue(planeName, BlockedSequentialIO, (size32_t)-1);
if ((size32_t)-1 != blockedSequentialIOSize)
options.storageBlockSize = blockedSequentialIOSize;
}
options.totalCompressionBufferSize = getOptInt(THOROPT_SPLITTER_COMPRESSIONTOALK, options.totalCompressionBufferSize / 1024) * 1024;
options.inMemMaxMem = getOptInt(THOROPT_SPLITTER_MAXROWMEMK, options.inMemMaxMem / 1024) * 1024;
options.spillWriteAheadSize = getOptInt64(THOROPT_SPLITTER_WRITEAHEADK, options.spillWriteAheadSize / 1024) * 1024;
options.inMemReadAheadGranularity = getOptInt(THOROPT_SPLITTER_READAHEADGRANULARITYK, options.inMemReadAheadGranularity / 1024) * 1024;
options.inMemReadAheadGranularityRows = getOptInt(THOROPT_SPLITTER_READAHEADGRANULARITYROWS, options.inMemReadAheadGranularity);
options.heapFlags = getOptInt("spillheapflags", options.heapFlags);

ICompressHandler *compressHandler = options.totalCompressionBufferSize ? queryDefaultCompressHandler() : nullptr;
sharedRowStream.setown(createSharedFullSpillingWriteAhead(this, numOutputs, inputStream, isGrouped(), options, this, tempname.str(), compressHandler));
}
else
{
Owned<ISharedSmartBuffer> smartBuf = createSharedSmartDiskBuffer(this, tempname.str(), numOutputs, this);
sharedRowStream.set(smartBuf);
sharedSmartRowWriter.setown(smartBuf->getWriter());
ActPrintLog("Using temp spill file: %s", tempname.str());
}
}
else
{
ActPrintLog("Spill is 'balanced'");
smartBuf.setown(createSharedSmartMemBuffer(this, numOutputs, queryRowInterfaces(input), NSPLITTER_SPILL_BUFFER_SIZE));
Owned<ISharedSmartBuffer> smartBuf = createSharedSmartMemBuffer(this, numOutputs, this, NSPLITTER_SPILL_BUFFER_SIZE);
sharedRowStream.set(smartBuf);
sharedSmartRowWriter.setown(smartBuf->getWriter());
cachedMetaInfo.canStall = true;
}
// mark any outputs already stopped
ForEachItemIn(o, outputs)
{
CSplitterOutput *output = (CSplitterOutput *)outputs.item(o);
if (output->isStopped() || !connectedOutputSet->test(o))
smartBuf->queryOutput(o)->stop();
sharedRowStream->queryOutput(o)->stop();
}
}
if (!spill)
Expand All @@ -261,7 +300,7 @@ class NSplitterSlaveActivity : public CSlaveActivity, implements ISharedSmartBuf
return inputStream->nextRow();
if (recsReady == current && writeAheadException.get())
throw LINK(writeAheadException);
return smartBuf->queryOutput(outIdx)->nextRow(); // will block until available
return sharedRowStream->queryOutput(outIdx)->nextRow(); // will block until available
}
rowcount_t writeahead(rowcount_t current, const bool &stopped, Semaphore &writeBlockSem, unsigned outIdx)
{
Expand Down Expand Up @@ -311,7 +350,7 @@ class NSplitterSlaveActivity : public CSlaveActivity, implements ISharedSmartBuf
row.setown(inputStream->nextRow());
if (row)
{
smartBuf->putRow(nullptr, this); // may call blocked() (see ISharedSmartBufferCallback impl. below)
sharedSmartRowWriter->putRow(nullptr, this); // may call blocked() (see ISharedSmartBufferCallback impl. below)
++recsReady;
}
}
Expand All @@ -321,10 +360,10 @@ class NSplitterSlaveActivity : public CSlaveActivity, implements ISharedSmartBuf
{
ActPrintLog("Splitter activity, hit end of input @ rec = %" RCPF "d", recsReady);
eofHit = true;
smartBuf->flush(); // signals no more rows will be written.
sharedSmartRowWriter->flush(); // signals no more rows will be written.
break;
}
smartBuf->putRow(row.getClear(), this); // can block if mem limited, but other readers can progress which is the point
sharedSmartRowWriter->putRow(row.getClear(), this); // can block if mem limited, but other readers can progress which is the point
++recsReady;
}
return recsReady;
Expand All @@ -339,12 +378,12 @@ class NSplitterSlaveActivity : public CSlaveActivity, implements ISharedSmartBuf
}
else
{
if (smartBuf)
if (sharedRowStream)
{
/* If no output has started reading (nextRow()), then it will not have been prepared
* If only 1 output is left, it will bypass the smart buffer when it starts.
*/
smartBuf->queryOutput(outIdx)->stop();
sharedRowStream->queryOutput(outIdx)->stop();
}
++stoppedOutputs;
if (stoppedOutputs == connectedOutputCount)
Expand All @@ -357,8 +396,8 @@ class NSplitterSlaveActivity : public CSlaveActivity, implements ISharedSmartBuf
void abort()
{
CSlaveActivity::abort();
if (smartBuf)
smartBuf->cancel();
if (sharedRowStream)
sharedRowStream->cancel();
}
// ISharedSmartBufferCallback impl.
virtual void paged() { pagedOut = true; }
Expand Down Expand Up @@ -460,6 +499,11 @@ void CSplitterOutput::start()
activity.prepareInput();
if (1 == activity.activeOutputCount)
max = RCMAX; // signals that no writeahead required
else
{
if (activity.newSplitter)
splitterStream = activity.sharedRowStream->queryOutput(outIdx);
}
dataLinkStart();
}

Expand All @@ -476,10 +520,16 @@ void CSplitterOutput::stop()
const void *CSplitterOutput::nextRow()
{
ActivityTimer t(slaveTimerStats, activity.queryTimeActivities());
if (rec == max) // NB: max will be RCMAX if activeOutputCount == 1
max = activity.writeahead(max, activity.queryAbortSoon(), writeBlockSem, outIdx);
const void *row = activity.nextRow(outIdx, rec); // pass ptr to max if need more
++rec;
const void *row;
if (splitterStream)
row = splitterStream->nextRow();
else
{
if (rec == max) // NB: max will be RCMAX if activeOutputCount == 1
max = activity.writeahead(max, activity.queryAbortSoon(), writeBlockSem, outIdx);
row = activity.nextRow(outIdx, rec); // pass ptr to max if need more
++rec;
}
if (row)
dataLinkIncrement();
return row;
Expand Down
Loading

0 comments on commit fcb0440

Please sign in to comment.