Skip to content

Commit

Permalink
Merge pull request #19178 from jakesmith/HPCC-32766-globalsort-roxiemem
Browse files Browse the repository at this point in the history
HPCC-32766 Decompress partition points to roxiemem in global sort

Reviewed-by: Gavin Halliday <[email protected]>
Merged-by: Gavin Halliday <[email protected]>
  • Loading branch information
ghalliday authored Oct 3, 2024
2 parents 1848747 + 1d6ea22 commit a6cacdc
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 22 deletions.
84 changes: 63 additions & 21 deletions thorlcr/msort/tsortm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ class CSortMaster : public IThorSorterMaster, public CSimpleInterface
Linked<IThorRowInterfaces> rowif;
Linked<IThorRowInterfaces> auxrowif;
Linked<IThorRowInterfaces> keyIf;
Owned<roxiemem::IVariableRowHeap> partitionHeap;

int AddSlave(ICommunicator *comm,rank_t rank,SocketEndpoint &endpoint,mptag_t mpTagRPC)
{
Expand Down Expand Up @@ -522,9 +523,10 @@ class CSortMaster : public IThorSorterMaster, public CSimpleInterface
CriticalSection &asect;
unsigned averagesamples;
rowcount_t averagerecspernode;
roxiemem::IVariableRowHeap *heap;
public:
casyncfor1(CSortMaster &_owner, NodeArray &_slaves, CThorExpandingRowArray &_sample, unsigned _averagesamples, rowcount_t _averagerecspernode, CriticalSection &_asect)
: owner(_owner), slaves(_slaves), sample(_sample), asect(_asect)
casyncfor1(CSortMaster &_owner, NodeArray &_slaves, CThorExpandingRowArray &_sample, unsigned _averagesamples, rowcount_t _averagerecspernode, CriticalSection &_asect, roxiemem::IVariableRowHeap *_heap)
: owner(_owner), slaves(_slaves), sample(_sample), asect(_asect), heap(_heap)
{
averagesamples = _averagesamples;
averagerecspernode = _averagerecspernode;
Expand All @@ -537,13 +539,28 @@ class CSortMaster : public IThorSorterMaster, public CSimpleInterface
if (slavesamples)
{
size32_t samplebufsize;
void *samplebuf=NULL;
slave.GetMultiNthRow(slavesamples, samplebufsize, samplebuf);
MemoryBuffer mb;
fastLZDecompressToBuffer(mb, samplebuf);
free(samplebuf);
OwnedMalloc<void> samplebuf;
void *p;
slave.GetMultiNthRow(slavesamples, samplebufsize, p);
samplebuf.setown(p);
size32_t expandedSz;
const void *expandedPtr = nullptr;
MemoryBuffer expandedSampleMb;
OwnedConstThorRow expandedSample;
if (heap)
{
expandedSample.setown(fastLZDecompressToRoxieMem(*heap, samplebuf, expandedSz));
expandedPtr = expandedSample;
}
else
{
fastLZDecompressToBuffer(expandedSampleMb, samplebuf);
expandedSz = expandedSampleMb.length();
expandedPtr = expandedSampleMb.toByteArray();
}
samplebuf.clear();
CriticalBlock block(asect);
CThorStreamDeserializerSource d(mb.length(), mb.toByteArray());
CThorStreamDeserializerSource d(expandedSz, expandedPtr);
while (!d.eos())
{
RtlDynamicRowBuilder rowBuilder(owner.keyIf->queryRowAllocator());
Expand All @@ -552,7 +569,7 @@ class CSortMaster : public IThorSorterMaster, public CSimpleInterface
}
}
}
} afor1(*this, slaves,sample,averagesamples,averagerecspernode,asect);
} afor1(*this, slaves,sample,averagesamples,averagerecspernode,asect,partitionHeap);
afor1.For(numnodes, 20, true);
#ifdef TRACE_PARTITION2
{
Expand Down Expand Up @@ -655,7 +672,6 @@ class CSortMaster : public IThorSorterMaster, public CSimpleInterface
return splitMap.getClear();
}


rowcount_t *CalcPartition(bool logging)
{
CriticalBlock block(ECFcrit);
Expand Down Expand Up @@ -727,31 +743,51 @@ class CSortMaster : public IThorSorterMaster, public CSimpleInterface
Semaphore *nextsem;
unsigned numsplits;
IThorRowInterfaces *keyIf;
roxiemem::IVariableRowHeap *heap;
public:
casyncfor2(NodeArray &_slaves, CThorExpandingRowArray &_totmid, unsigned _numsplits, Semaphore *_nextsem, IThorRowInterfaces *_keyIf)
: slaves(_slaves), totmid(_totmid), keyIf(_keyIf)
casyncfor2(NodeArray &_slaves, CThorExpandingRowArray &_totmid, unsigned _numsplits, Semaphore *_nextsem, IThorRowInterfaces *_keyIf, roxiemem::IVariableRowHeap *_heap)
: slaves(_slaves), totmid(_totmid), keyIf(_keyIf), heap(_heap)
{
nextsem = _nextsem;
numsplits = _numsplits;
}
void Do(unsigned i)
{
CSortNode &slave = slaves.item(i);
void *p = NULL;
size32_t retlen=0;
if (slave.numrecs!=0)
slave.GetMultiMidPointStop(retlen,p);
OwnedMalloc<void> midPoints;
if (slave.numrecs!=0)
{
// if the rows this SORT is based are large, these midpoints (nodes-1) could be large (but compressed)
// could consider serializing and deserializnig them in chunks..
void *p;
slave.GetMultiMidPointStop(retlen, p);
midPoints.setown(p);
}
if (i)
nextsem[i-1].wait();
try
{
unsigned base = totmid.ordinality();
if (p)
if (midPoints)
{
MemoryBuffer mb;
fastLZDecompressToBuffer(mb, p);
free(p);
CThorStreamDeserializerSource d(mb.length(), mb.toByteArray());
size32_t expandedSz;
const void *expandedPtr = nullptr;
MemoryBuffer expandedMidPointsMb;
OwnedConstThorRow expandedMidPoints;
if (heap)
{
expandedMidPoints.setown(fastLZDecompressToRoxieMem(*heap, midPoints, expandedSz));
expandedPtr = expandedMidPoints;
}
else
{
fastLZDecompressToBuffer(expandedMidPointsMb, midPoints);
expandedSz = expandedMidPointsMb.length();
expandedPtr = expandedMidPointsMb.toByteArray();
}
midPoints.clear();
CThorStreamDeserializerSource d(expandedSz, expandedPtr);
while (!d.eos())
{
RtlDynamicRowBuilder rowBuilder(keyIf->queryRowAllocator());
Expand All @@ -777,7 +813,7 @@ class CSortMaster : public IThorSorterMaster, public CSimpleInterface
}
nextsem[i].signal();
}
} afor2(slaves, totmid, numsplits, nextsem, keyIf);
} afor2(slaves, totmid, numsplits, nextsem, keyIf, partitionHeap);
afor2.For(numnodes, 20);

delete [] nextsem;
Expand Down Expand Up @@ -1096,6 +1132,12 @@ class CSortMaster : public IThorSorterMaster, public CSimpleInterface
}
ActPrintLog(activity, "Sort: canoptimizenullcolumns=%s, usepartitionrow=%s, betweensort=%s skewWarning=%f skewError=%f minisortthreshold=%" I64F "d",canoptimizenullcolumns?"true":"false",usepartitionrow?"true":"false",betweensort?"true":"false",skewWarning,skewError,(__int64)minisortthreshold);
assertex(partitioninfo);

constexpr byte AT_SortPartition=1;
activity_id actId = createCompoundActSeqId(activity->queryId(), AT_SortPartition);
if (activity->getOptBool(THOROPT_ROXIEMEM_GLOBALSORT_PARTITION, true))
partitionHeap.setown(activity->queryRowManager()->createVariableRowHeap(actId | ACTIVITY_FLAG_ISREGISTERED, roxiemem::RoxieHeapFlags::RHFvariable));

maxdeviance = _maxdeviance;
unsigned i;
bool overflowed = false;
Expand Down
18 changes: 17 additions & 1 deletion thorlcr/thorutil/thmem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2666,4 +2666,20 @@ IOutputMetaData *createOutputMetaDataWithChildRow(IEngineRowAllocator *childAllo
return new COutputMetaWithChildRow(childAllocator, extraSz);
}


void *fastLZDecompressToRoxieMem(roxiemem::IVariableRowHeap &heap, const void * src, size32_t &expsz)
{
size32_t *sz = (size32_t *)src;
expsz = *(sz++);
size32_t cmpsz = *(sz++);
memsize_t capacity;
void *o = heap.allocate(expsz, capacity);
if (cmpsz!=expsz)
{
size32_t written = fastlz_decompress(sz,cmpsz,o,expsz);
if (written!=expsz)
throw MakeStringException(0, "fastLZDecompressToBuffer - corrupt data(1) %d %d",written,expsz);
}
else
memcpy_iflen(o,sz,expsz);
return o;
}
1 change: 1 addition & 0 deletions thorlcr/thorutil/thmem.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,7 @@ extern graph_decl IThorRowLoader *createThorRowLoader(CActivityBase &activity, I
extern graph_decl IThorRowCollector *createThorRowCollector(CActivityBase &activity, IThorRowInterfaces *rowIf, ICompare *iCompare=NULL, StableSortFlag stableSort=stableSort_none, RowCollectorSpillFlags diskMemMix=rc_mixed, unsigned spillPriority=SPILL_PRIORITY_DEFAULT, EmptyRowSemantics emptyRowSemantics=ers_forbidden);


extern graph_decl void *fastLZDecompressToRoxieMem(roxiemem::IVariableRowHeap &heap, const void * src, size32_t &expsz);


class CSDSServerStatus;
Expand Down
1 change: 1 addition & 0 deletions thorlcr/thorutil/thormisc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@
#define THOROPT_LOOKAHEAD_WRITEAHEADK "readAheadWriteAheadK" // Splitter spilling write ahead size (K) (default = 2MB)
#define THOROPT_LOOKAHEAD_COMPRESSIONTOTALK "readAheadCompressionTotalK" // Splitter total compression buffer size (shared between writer and readers) (K) (default = 3MB)
#define THOROPT_LOOKAHEAD_TEMPFILE_GRANULARITY "readAheadTempFileGranularity" // Splitter temp file granularity (default = 1GB)
#define THOROPT_ROXIEMEM_GLOBALSORT_PARTITION "useRoxieMemGlobalSortPartition" // Use roxiemem for global sort partitioning (default = true)

#define INITIAL_SELFJOIN_MATCH_WARNING_LEVEL 20000 // max of row matches before selfjoin emits warning

Expand Down

0 comments on commit a6cacdc

Please sign in to comment.