diff --git a/thorlcr/msort/tsortm.cpp b/thorlcr/msort/tsortm.cpp index 4c346539ece..30b4d0ad55a 100644 --- a/thorlcr/msort/tsortm.cpp +++ b/thorlcr/msort/tsortm.cpp @@ -248,6 +248,7 @@ class CSortMaster : public IThorSorterMaster, public CSimpleInterface Linked rowif; Linked auxrowif; Linked keyIf; + Owned partitionHeap; int AddSlave(ICommunicator *comm,rank_t rank,SocketEndpoint &endpoint,mptag_t mpTagRPC) { @@ -522,9 +523,10 @@ class CSortMaster : public IThorSorterMaster, public CSimpleInterface CriticalSection &asect; unsigned averagesamples; rowcount_t averagerecspernode; + roxiemem::IVariableRowHeap *heap; public: - casyncfor1(CSortMaster &_owner, NodeArray &_slaves, CThorExpandingRowArray &_sample, unsigned _averagesamples, rowcount_t _averagerecspernode, CriticalSection &_asect) - : owner(_owner), slaves(_slaves), sample(_sample), asect(_asect) + casyncfor1(CSortMaster &_owner, NodeArray &_slaves, CThorExpandingRowArray &_sample, unsigned _averagesamples, rowcount_t _averagerecspernode, CriticalSection &_asect, roxiemem::IVariableRowHeap *_heap) + : owner(_owner), slaves(_slaves), sample(_sample), asect(_asect), heap(_heap) { averagesamples = _averagesamples; averagerecspernode = _averagerecspernode; @@ -537,13 +539,28 @@ class CSortMaster : public IThorSorterMaster, public CSimpleInterface if (slavesamples) { size32_t samplebufsize; - void *samplebuf=NULL; - slave.GetMultiNthRow(slavesamples, samplebufsize, samplebuf); - MemoryBuffer mb; - fastLZDecompressToBuffer(mb, samplebuf); - free(samplebuf); + OwnedMalloc samplebuf; + void *p; + slave.GetMultiNthRow(slavesamples, samplebufsize, p); + samplebuf.setown(p); + size32_t expandedSz; + const void *expandedPtr = nullptr; + MemoryBuffer expandedSampleMb; + OwnedConstThorRow expandedSample; + if (heap) + { + expandedSample.setown(fastLZDecompressToRoxieMem(*heap, samplebuf, expandedSz)); + expandedPtr = expandedSample; + } + else + { + fastLZDecompressToBuffer(expandedSampleMb, samplebuf); + expandedSz = expandedSampleMb.length(); + expandedPtr = expandedSampleMb.toByteArray(); + } + samplebuf.clear(); CriticalBlock block(asect); - CThorStreamDeserializerSource d(mb.length(), mb.toByteArray()); + CThorStreamDeserializerSource d(expandedSz, expandedPtr); while (!d.eos()) { RtlDynamicRowBuilder rowBuilder(owner.keyIf->queryRowAllocator()); @@ -552,7 +569,7 @@ class CSortMaster : public IThorSorterMaster, public CSimpleInterface } } } - } afor1(*this, slaves,sample,averagesamples,averagerecspernode,asect); + } afor1(*this, slaves,sample,averagesamples,averagerecspernode,asect,partitionHeap); afor1.For(numnodes, 20, true); #ifdef TRACE_PARTITION2 { @@ -655,7 +672,6 @@ class CSortMaster : public IThorSorterMaster, public CSimpleInterface return splitMap.getClear(); } - rowcount_t *CalcPartition(bool logging) { CriticalBlock block(ECFcrit); @@ -727,9 +743,10 @@ class CSortMaster : public IThorSorterMaster, public CSimpleInterface Semaphore *nextsem; unsigned numsplits; IThorRowInterfaces *keyIf; + roxiemem::IVariableRowHeap *heap; public: - casyncfor2(NodeArray &_slaves, CThorExpandingRowArray &_totmid, unsigned _numsplits, Semaphore *_nextsem, IThorRowInterfaces *_keyIf) - : slaves(_slaves), totmid(_totmid), keyIf(_keyIf) + casyncfor2(NodeArray &_slaves, CThorExpandingRowArray &_totmid, unsigned _numsplits, Semaphore *_nextsem, IThorRowInterfaces *_keyIf, roxiemem::IVariableRowHeap *_heap) + : slaves(_slaves), totmid(_totmid), keyIf(_keyIf), heap(_heap) { nextsem = _nextsem; numsplits = _numsplits; @@ -737,21 +754,40 @@ class CSortMaster : public IThorSorterMaster, public CSimpleInterface void Do(unsigned i) { CSortNode &slave = slaves.item(i); - void *p = NULL; size32_t retlen=0; - if (slave.numrecs!=0) - slave.GetMultiMidPointStop(retlen,p); + OwnedMalloc midPoints; + if (slave.numrecs!=0) + { + // if the rows this SORT is based are large, these midpoints (nodes-1) could be large (but compressed) + // could consider serializing and deserializnig them in chunks.. + void *p; + slave.GetMultiMidPointStop(retlen, p); + midPoints.setown(p); + } if (i) nextsem[i-1].wait(); try { unsigned base = totmid.ordinality(); - if (p) + if (midPoints) { - MemoryBuffer mb; - fastLZDecompressToBuffer(mb, p); - free(p); - CThorStreamDeserializerSource d(mb.length(), mb.toByteArray()); + size32_t expandedSz; + const void *expandedPtr = nullptr; + MemoryBuffer expandedMidPointsMb; + OwnedConstThorRow expandedMidPoints; + if (heap) + { + expandedMidPoints.setown(fastLZDecompressToRoxieMem(*heap, midPoints, expandedSz)); + expandedPtr = expandedMidPoints; + } + else + { + fastLZDecompressToBuffer(expandedMidPointsMb, midPoints); + expandedSz = expandedMidPointsMb.length(); + expandedPtr = expandedMidPointsMb.toByteArray(); + } + midPoints.clear(); + CThorStreamDeserializerSource d(expandedSz, expandedPtr); while (!d.eos()) { RtlDynamicRowBuilder rowBuilder(keyIf->queryRowAllocator()); @@ -777,7 +813,7 @@ class CSortMaster : public IThorSorterMaster, public CSimpleInterface } nextsem[i].signal(); } - } afor2(slaves, totmid, numsplits, nextsem, keyIf); + } afor2(slaves, totmid, numsplits, nextsem, keyIf, partitionHeap); afor2.For(numnodes, 20); delete [] nextsem; @@ -1096,6 +1132,12 @@ class CSortMaster : public IThorSorterMaster, public CSimpleInterface } ActPrintLog(activity, "Sort: canoptimizenullcolumns=%s, usepartitionrow=%s, betweensort=%s skewWarning=%f skewError=%f minisortthreshold=%" I64F "d",canoptimizenullcolumns?"true":"false",usepartitionrow?"true":"false",betweensort?"true":"false",skewWarning,skewError,(__int64)minisortthreshold); assertex(partitioninfo); + + constexpr byte AT_SortPartition=1; + activity_id actId = createCompoundActSeqId(activity->queryId(), AT_SortPartition); + if (activity->getOptBool(THOROPT_ROXIEMEM_GLOBALSORT_PARTITION, true)) + partitionHeap.setown(activity->queryRowManager()->createVariableRowHeap(actId | ACTIVITY_FLAG_ISREGISTERED, roxiemem::RoxieHeapFlags::RHFvariable)); + maxdeviance = _maxdeviance; unsigned i; bool overflowed = false; diff --git a/thorlcr/thorutil/thmem.cpp b/thorlcr/thorutil/thmem.cpp index cb912cf847a..7bc84dc31ce 100644 --- a/thorlcr/thorutil/thmem.cpp +++ b/thorlcr/thorutil/thmem.cpp @@ -2666,4 +2666,20 @@ IOutputMetaData *createOutputMetaDataWithChildRow(IEngineRowAllocator *childAllo return new COutputMetaWithChildRow(childAllocator, extraSz); } - +void *fastLZDecompressToRoxieMem(roxiemem::IVariableRowHeap &heap, const void * src, size32_t &expsz) +{ + size32_t *sz = (size32_t *)src; + expsz = *(sz++); + size32_t cmpsz = *(sz++); + memsize_t capacity; + void *o = heap.allocate(expsz, capacity); + if (cmpsz!=expsz) + { + size32_t written = fastlz_decompress(sz,cmpsz,o,expsz); + if (written!=expsz) + throw MakeStringException(0, "fastLZDecompressToBuffer - corrupt data(1) %d %d",written,expsz); + } + else + memcpy_iflen(o,sz,expsz); + return o; +} diff --git a/thorlcr/thorutil/thmem.hpp b/thorlcr/thorutil/thmem.hpp index a89d4cdd0f2..afed88e3bdb 100644 --- a/thorlcr/thorutil/thmem.hpp +++ b/thorlcr/thorutil/thmem.hpp @@ -573,6 +573,7 @@ extern graph_decl IThorRowLoader *createThorRowLoader(CActivityBase &activity, I extern graph_decl IThorRowCollector *createThorRowCollector(CActivityBase &activity, IThorRowInterfaces *rowIf, ICompare *iCompare=NULL, StableSortFlag stableSort=stableSort_none, RowCollectorSpillFlags diskMemMix=rc_mixed, unsigned spillPriority=SPILL_PRIORITY_DEFAULT, EmptyRowSemantics emptyRowSemantics=ers_forbidden); +extern graph_decl void *fastLZDecompressToRoxieMem(roxiemem::IVariableRowHeap &heap, const void * src, size32_t &expsz); class CSDSServerStatus; diff --git a/thorlcr/thorutil/thormisc.hpp b/thorlcr/thorutil/thormisc.hpp index ade5bf5c688..744fbb31e31 100644 --- a/thorlcr/thorutil/thormisc.hpp +++ b/thorlcr/thorutil/thormisc.hpp @@ -125,6 +125,7 @@ #define THOROPT_LOOKAHEAD_WRITEAHEADK "readAheadWriteAheadK" // Splitter spilling write ahead size (K) (default = 2MB) #define THOROPT_LOOKAHEAD_COMPRESSIONTOTALK "readAheadCompressionTotalK" // Splitter total compression buffer size (shared between writer and readers) (K) (default = 3MB) #define THOROPT_LOOKAHEAD_TEMPFILE_GRANULARITY "readAheadTempFileGranularity" // Splitter temp file granularity (default = 1GB) +#define THOROPT_ROXIEMEM_GLOBALSORT_PARTITION "useRoxieMemGlobalSortPartition" // Use roxiemem for global sort partitioning (default = true) #define INITIAL_SELFJOIN_MATCH_WARNING_LEVEL 20000 // max of row matches before selfjoin emits warning