Skip to content

Commit

Permalink
HPCC-32193 Fix some issues with spill stats in smart join activity
Browse files Browse the repository at this point in the history
Signed-off-by: Shamser Ahmed <[email protected]>
  • Loading branch information
shamser committed Jul 10, 2024
1 parent 0878095 commit 67cd61d
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 35 deletions.
35 changes: 30 additions & 5 deletions thorlcr/activities/lookupjoin/thlookupjoinslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1053,7 +1053,10 @@ class CInMemJoinBase : public CSlaveActivity, public CAllOrLookupHelper<HELPER>,
{
CThorSpillableRowArray *rows = rhsSlaveRows.item(a);
if (rows)
{
mergeStats(inactiveStats, rows);
rows->kill();
}
}
rhs.kill();
}
Expand Down Expand Up @@ -1769,6 +1772,7 @@ class CLookupJoinActivityBase : public CInMemJoinBase<HTHELPER, IHThorHashJoinAr
// NB: Only used by channel 0
Owned<CFileOwner> overflowWriteFile;
Owned<IExtRowWriter> overflowWriteStream;
OwnedIFileIO overflowWriteFileIO;
rowcount_t overflowWriteCount;
OwnedMalloc<IChannelDistributor *> channelDistributors;
unsigned nextRhsToSpill = 0;
Expand Down Expand Up @@ -2057,8 +2061,14 @@ class CLookupJoinActivityBase : public CInMemJoinBase<HTHELPER, IHThorHashJoinAr
if (isSmart())
{
overflowWriteCount = 0;
overflowWriteFile.clear();
overflowWriteStream.clear();
if (overflowWriteFileIO)
{
mergeRemappedStats(PARENT::inactiveStats, overflowWriteFileIO, diskToSpillStatsMap);
overflowWriteFile->noteSize(overflowWriteFileIO->getStatistic(StSizeDiskWrite));
overflowWriteFileIO.clear();
}
overflowWriteFile.clear();
rightRowManager->addRowBuffer(this);
}
doBroadcastRHS(stopping);
Expand Down Expand Up @@ -2117,6 +2127,7 @@ class CLookupJoinActivityBase : public CInMemJoinBase<HTHELPER, IHThorHashJoinAr
{
CThorSpillableRowArray &rows = *rhsSlaveRows.item(a);
rhs.appendRows(rows, true); // NB: This should not cause spilling, rhs is already sized and we are only copying ptrs in
mergeStats(PARENT::inactiveStats, &rows);
rows.kill(); // free up ptr table asap
}
// Have to keep broadcastSpillingLock locked until sort and calculate are done
Expand Down Expand Up @@ -2465,6 +2476,7 @@ class CLookupJoinActivityBase : public CInMemJoinBase<HTHELPER, IHThorHashJoinAr
throwUnexpected();
}
joinHelper->init(left, right, leftAllocator, rightAllocator, ::queryRowMetaData(leftITDL));
mergeStats(PARENT::inactiveStats, rowLoader);
}
void getRHS(bool stopping)
{
Expand Down Expand Up @@ -2639,8 +2651,12 @@ class CLookupJoinActivityBase : public CInMemJoinBase<HTHELPER, IHThorHashJoinAr
IOutputMetaData *inputOutputMeta = rightITDL->queryFromActivity()->queryContainer().queryHelper()->queryOutputMeta();
// rows may either be in separate slave row arrays or in single rhs array, or split.
rowcount_t total = rightCollector ? rightCollector->numRows() : (getGlobalRHSTotal() + rhs.ordinality());
if (rightCollector && rightCollector->hasSpilt())
mergeStats(PARENT::inactiveStats, rightCollector);
throw checkAndCreateOOMContextException(this, e, "gathering RHS rows for lookup join", total, inputOutputMeta, NULL);
}
if (rightCollector && rightCollector->hasSpilt())
mergeStats(PARENT::inactiveStats, rightCollector);
}
public:
static bool needDedup(IHThorHashJoinArg *helper)
Expand Down Expand Up @@ -2904,7 +2920,7 @@ class CLookupJoinActivityBase : public CInMemJoinBase<HTHELPER, IHThorHashJoinAr
return true;
}
CriticalBlock b(rhsRowLock);
if (overflowWriteFile)
if (overflowWriteFileIO)
{
/* Tried to do outside crit above, but if empty, and now overflow, need to inside
* Will be one off if at all
Expand All @@ -2917,7 +2933,7 @@ class CLookupJoinActivityBase : public CInMemJoinBase<HTHELPER, IHThorHashJoinAr
overflowWriteCount += rhsInRowsTemp.ordinality();
ForEachItemIn(r, rhsInRowsTemp)
overflowWriteStream->putRow(rhsInRowsTemp.getClear(r));
overflowWriteFile->noteSize(overflowWriteStream->getStatistic(StSizeDiskWrite));
overflowWriteFile->noteSize(overflowWriteFileIO->getStatistic(StSizeDiskWrite));
return true;
}
if (hasFailedOverToLocal())
Expand Down Expand Up @@ -2962,12 +2978,13 @@ class CLookupJoinActivityBase : public CInMemJoinBase<HTHELPER, IHThorHashJoinAr
GetTempFilePath(tempFilename, "lookup_local");
ActPrintLog("Overflowing RHS broadcast rows to spill file: %s", tempFilename.str());
overflowWriteFile.setown(container.queryActivity()->createOwnedTempFile(tempFilename.str()));
overflowWriteStream.setown(createRowWriter(&(overflowWriteFile->queryIFile()), queryRowInterfaces(rightITDL), rwFlags));
overflowWriteFileIO.setown(overflowWriteFile->queryIFile().open(IFOcreate));
overflowWriteStream.setown(createRowWriter(overflowWriteFileIO, queryRowInterfaces(rightITDL), rwFlags));

overflowWriteCount += rhsInRowsTemp.ordinality();
ForEachItemIn(r, rhsInRowsTemp)
overflowWriteStream->putRow(rhsInRowsTemp.getClear(r));
overflowWriteFile->noteSize(overflowWriteStream->getStatistic(StSizeDiskWrite));
overflowWriteFile->noteSize(overflowWriteFileIO->getStatistic(StSizeDiskWrite));
return true;
}
virtual void gatherActiveStats(CRuntimeStatisticCollection &activeStats) const
Expand All @@ -2979,6 +2996,13 @@ class CLookupJoinActivityBase : public CInMemJoinBase<HTHELPER, IHThorHashJoinAr
activeStats.setStatistic(StNumSmartJoinDegradedToLocal, aggregateFailoversToLocal); // NB: is going to be same for all slaves.
activeStats.setStatistic(StNumSmartJoinSlavesDegradedToStd, aggregateFailoversToStandard);
}
if (overflowWriteFileIO)
mergeRemappedStats(activeStats, overflowWriteFileIO, diskToSpillStatsMap);
ForEachItemIn(a, rhsSlaveRows)
{
CThorSpillableRowArray &rows = *rhsSlaveRows.item(a);
mergeStats(activeStats, &rows);
}
}
};

Expand Down Expand Up @@ -3321,6 +3345,7 @@ class CAllJoinSlaveActivity : public CInMemJoinBase<CAllTable, IHThorAllJoinArg>
ForEachItemIn(a, rhsSlaveRows)
{
CThorSpillableRowArray &rows = *rhsSlaveRows.item(a);
mergeStats(PARENT::inactiveStats, &rows);
rhs.appendRows(rows, true);
rows.kill(); // free up ptr table asap
}
Expand Down
1 change: 0 additions & 1 deletion thorlcr/graph/thgraphslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,6 @@ void CSlaveActivity::serializeStats(MemoryBuffer &mb)
// which the activity should use to protect any objects it uses whilst stats are being collected.

CRuntimeStatisticCollection serializedStats(inactiveStats);

{
CriticalBlock block(statsCs);
gatherActiveStats(serializedStats);
Expand Down
47 changes: 20 additions & 27 deletions thorlcr/thorutil/thmem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ class CSpillableStreamBase : public CSpillable
unsigned spillCompInfo;
CThorSpillableRowArray rows;
Owned<CFileOwner> spillFile;
CRuntimeStatisticCollection inactiveStats;

bool spillRows()
{
Expand All @@ -248,13 +249,13 @@ class CSpillableStreamBase : public CSpillable
spillFile.setown(activity.createOwnedTempFile(tempName.str()));
VStringBuffer spillPrefixStr("SpillableStream(%u)", spillPriority);
rows.save(*spillFile, spillCompInfo, false, spillPrefixStr.str()); // saves committed rows
mergeStats(inactiveStats, &rows);
rows.kill(); // no longer needed, readers will pull from spillFile. NB: ok to kill array as rows is never written to or expanded
spillFile->noteSize(spillFile->queryIFile().size());
return true;
}
public:
CSpillableStreamBase(CActivityBase &_activity, CThorSpillableRowArray &inRows, IThorRowInterfaces *_rowIf, EmptyRowSemantics _emptyRowSemantics, unsigned _spillPriority)
: CSpillable(_activity, _rowIf, _spillPriority), rows(_activity), emptyRowSemantics(_emptyRowSemantics)
: CSpillable(_activity, _rowIf, _spillPriority), rows(_activity), emptyRowSemantics(_emptyRowSemantics), inactiveStats(spillStatistics)
{
assertex(inRows.isFlushed());
spillCompInfo = 0x0;
Expand All @@ -265,6 +266,10 @@ class CSpillableStreamBase : public CSpillable
{
ensureSpillingCallbackRemoved();
}
virtual unsigned __int64 getStatistic(StatisticKind kind)
{
return inactiveStats.getStatisticValue(kind);
}
// IBufferedRowCallback
virtual bool freeBufferedRows(bool critical) override
{
Expand Down Expand Up @@ -1317,13 +1322,13 @@ void CThorSpillableRowArray::safeUnregisterWriteCallback(IWritePosCallback &cb)
}

CThorSpillableRowArray::CThorSpillableRowArray(CActivityBase &activity)
: CThorExpandingRowArray(activity)
: CThorExpandingRowArray(activity), inactiveStats(spillStatistics)
{
throwOnOom = false;
}

CThorSpillableRowArray::CThorSpillableRowArray(CActivityBase &activity, IThorRowInterfaces *rowIf, EmptyRowSemantics emptyRowSemantics, StableSortFlag stableSort, rowidx_t initialSize, size32_t _commitDelta)
: CThorExpandingRowArray(activity, rowIf, ers_forbidden, stableSort, false, initialSize), commitDelta(_commitDelta)
: CThorExpandingRowArray(activity, rowIf, ers_forbidden, stableSort, false, initialSize), commitDelta(_commitDelta), inactiveStats(spillStatistics)
{
}

Expand Down Expand Up @@ -1352,6 +1357,7 @@ void CThorSpillableRowArray::kill()
{
clearRows();
CThorExpandingRowArray::kill();
inactiveStats.reset();
}

void CThorSpillableRowArray::sort(ICompare &compare, unsigned maxCores)
Expand Down Expand Up @@ -1405,7 +1411,8 @@ rowidx_t CThorSpillableRowArray::save(CFileOwner &iFileOwner, unsigned _spillCom
nextCB = &cbCopy.popGet();
nextCBI = nextCB->queryRecordNumber();
}
Owned<IExtRowWriter> writer = createRowWriter(&iFileOwner.queryIFile(), rowIf, rwFlags, nullptr, compBlkSz);
OwnedIFileIO iFileIO = iFileOwner.queryIFile().open(IFOcreate);
Owned<IExtRowWriter> writer = createRowWriter(iFileIO, rowIf, rwFlags, nullptr, compBlkSz);
rowidx_t i=0;
rowidx_t rowsWritten=0;
try
Expand Down Expand Up @@ -1444,7 +1451,6 @@ rowidx_t CThorSpillableRowArray::save(CFileOwner &iFileOwner, unsigned _spillCom
++i;
}
writer->flush(NULL);
iFileOwner.noteSize(writer->getStatistic(StSizeDiskWrite));
}
catch (IException *e)
{
Expand All @@ -1455,6 +1461,10 @@ rowidx_t CThorSpillableRowArray::save(CFileOwner &iFileOwner, unsigned _spillCom
firstRow += n;
offset_t bytesWritten = writer->getPosition();
writer.clear();
mergeRemappedStats(inactiveStats, iFileIO, diskToSpillStatsMap);
offset_t sizeTempFile = iFileIO->getStatistic(StSizeDiskWrite);
iFileOwner.noteSize(sizeTempFile);
inactiveStats.addStatistic(StNumSpills, 1);
ActPrintLog(&activity, "%s: CThorSpillableRowArray::save done, rows written = %" RIPF "u, bytes = %" I64F "u, firstRow = %u", _tracingPrefix, rowsWritten, (__int64)bytesWritten, firstRow);
return rowsWritten;
}
Expand Down Expand Up @@ -1630,11 +1640,8 @@ class CThorRowCollectorBase : public CSpillable
Owned<CSharedSpillableRowSet> spillableRowSet;
unsigned options = 0;
unsigned spillCompInfo = 0;
RelaxedAtomic<unsigned> statOverflowCount{0};
RelaxedAtomic<offset_t> statSizeSpill{0};
RelaxedAtomic<__uint64> statSpillCycles{0};
RelaxedAtomic<__uint64> statSortCycles{0};

bool spillRows(bool critical)
{
//This must only be called while a lock is held on spillableRows
Expand All @@ -1660,11 +1667,6 @@ class CThorRowCollectorBase : public CSpillable
spillableRows.save(*tempFileOwner, spillCompInfo, false, spillPrefixStr.str()); // saves committed rows
spillFiles.append(tempFileOwner.getLink());
++overflowCount;
statOverflowCount.fastAdd(1); // NB: this is total over multiple uses of this class
offset_t tempFileSize = tempFileOwner->queryIFile().size();
statSizeSpill.fastAdd(tempFileSize);
tempFileOwner->noteSize(tempFileSize);
statSpillCycles.fastAdd(spillTimer.elapsedCycles());
return true;
}
void setEmptyRowSemantics(EmptyRowSemantics _emptyRowSemantics)
Expand Down Expand Up @@ -1952,26 +1954,17 @@ class CThorRowCollectorBase : public CSpillable
{
options = _options;
}
unsigned __int64 getStatistic(StatisticKind kind)
unsigned __int64 getStatistic(StatisticKind kind) const
{
switch (kind)
{
case StCycleSpillElapsedCycles:
return statSpillCycles;
case StCycleSortElapsedCycles:
return statSortCycles;
case StTimeSpillElapsed:
return cycle_to_nanosec(statSpillCycles);
case StTimeSortElapsed:
return cycle_to_nanosec(statSortCycles);
case StNumSpills:
return statOverflowCount;
case StSizeSpillFile:
return statSizeSpill;
default:
break;
return spillableRows.getStatistic(kind);
}
return 0;
}
bool hasSpilt() const { return overflowCount >= 1; }

Expand Down Expand Up @@ -2040,7 +2033,7 @@ class CThorRowLoader : public CThorRowCollectorBase, implements IThorRowLoader
}
virtual void resize(rowidx_t max) override { CThorRowCollectorBase::resize(max); }
virtual void setOptions(unsigned options) override { CThorRowCollectorBase::setOptions(options); }
virtual unsigned __int64 getStatistic(StatisticKind kind) override { return CThorRowCollectorBase::getStatistic(kind); }
virtual unsigned __int64 getStatistic(StatisticKind kind) const override { return CThorRowCollectorBase::getStatistic(kind); }
virtual bool hasSpilt() const override { return CThorRowCollectorBase::hasSpilt(); }
virtual void setTracingPrefix(const char *tracing) override { CThorRowCollectorBase::setTracingPrefix(tracing); }
virtual void reset() override { CThorRowCollectorBase::reset(); }
Expand Down Expand Up @@ -2095,7 +2088,7 @@ class CThorRowCollector : public CThorRowCollectorBase, implements IThorRowColle
}
virtual void resize(rowidx_t max) override { CThorRowCollectorBase::resize(max); }
virtual void setOptions(unsigned options) override { CThorRowCollectorBase::setOptions(options); }
virtual unsigned __int64 getStatistic(StatisticKind kind) override { return CThorRowCollectorBase::getStatistic(kind); }
virtual unsigned __int64 getStatistic(StatisticKind kind) const override { return CThorRowCollectorBase::getStatistic(kind); }
virtual bool hasSpilt() const override { return CThorRowCollectorBase::hasSpilt(); }
virtual void setTracingPrefix(const char *tracing) override { CThorRowCollectorBase::setTracingPrefix(tracing); }
// IThorArrayLock
Expand Down
8 changes: 6 additions & 2 deletions thorlcr/thorutil/thmem.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ class graph_decl CThorSpillableRowArray : private CThorExpandingRowArray, implem
mutable CriticalSection cs;
ICopyArrayOf<IWritePosCallback> writeCallbacks;
size32_t compBlkSz = 0; // means use default

CRuntimeStatisticCollection inactiveStats; // reset after each kill
bool _flush(bool force);
void doFlush();
inline bool needToMoveRows(bool force) { return (firstRow != 0 && (force || (firstRow >= commitRows/2))); }
Expand Down Expand Up @@ -484,6 +484,10 @@ class graph_decl CThorSpillableRowArray : private CThorExpandingRowArray, implem

inline rowidx_t numCommitted() const { return commitRows - firstRow; } //MORE::Not convinced this is very safe!
inline rowidx_t queryTotalRows() const { return CThorExpandingRowArray::ordinality(); } // includes uncommited rows
inline virtual unsigned __int64 getStatistic(StatisticKind kind) const
{
return inactiveStats.getStatisticValue(kind);
}

// access to
void swap(CThorSpillableRowArray &src);
Expand Down Expand Up @@ -542,7 +546,7 @@ interface IThorRowCollectorCommon : extends IInterface, extends IThorArrayLock
virtual void setup(ICompare *iCompare, StableSortFlag stableSort=stableSort_none, RowCollectorSpillFlags diskMemMix=rc_mixed, unsigned spillPriority=50) = 0;
virtual void resize(rowidx_t max) = 0;
virtual void setOptions(unsigned options) = 0;
virtual unsigned __int64 getStatistic(StatisticKind kind) = 0;
virtual unsigned __int64 getStatistic(StatisticKind kind) const = 0;
virtual bool hasSpilt() const = 0; // equivalent to numOverlows() >= 1
virtual void setTracingPrefix(const char *tracing) = 0;
virtual void reset() = 0;
Expand Down

0 comments on commit 67cd61d

Please sign in to comment.