Skip to content

Commit

Permalink
HPCC-32193 Fix some issues with spill stats in smart join activity
Browse files Browse the repository at this point in the history
The current temp file statistics for smartjoin did not include all
stats from all temp files:
1) temp files were closed before its sizes were recorded in the stats
2) stats from some types of temp files were not being tracked such
as overflowWriteFile from RHS
3) stats from temp files that were closed in CSpillableStreamBase
were not preserved
4) peak temp file size was not tracked in CThorSpillableRowArray
5) make CThorRowCollectorBase use of stats from CThorSpillableRowArray
for more accurate and simpler temp stats tracking.

Signed-off-by: Shamser Ahmed <[email protected]>
  • Loading branch information
shamser committed Aug 8, 2024
1 parent 7abc301 commit 5aaba8c
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 34 deletions.
35 changes: 30 additions & 5 deletions thorlcr/activities/lookupjoin/thlookupjoinslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1092,7 +1092,10 @@ class CInMemJoinBase : public CSlaveActivity, public CAllOrLookupHelper<HELPER>,
{
CThorSpillableRowArray *rows = rhsSlaveRows.item(a);
if (rows)
{
mergeStats(inactiveStats, rows);
rows->kill();
}
}
rhs.kill();
}
Expand Down Expand Up @@ -1808,6 +1811,7 @@ class CLookupJoinActivityBase : public CInMemJoinBase<HTHELPER, IHThorHashJoinAr
// NB: Only used by channel 0
Owned<CFileOwner> overflowWriteFile;
Owned<IExtRowWriter> overflowWriteStream;
OwnedIFileIO overflowWriteFileIO;
rowcount_t overflowWriteCount;
OwnedMalloc<IChannelDistributor *> channelDistributors;
unsigned nextRhsToSpill = 0;
Expand Down Expand Up @@ -2096,8 +2100,14 @@ class CLookupJoinActivityBase : public CInMemJoinBase<HTHELPER, IHThorHashJoinAr
if (isSmart())
{
overflowWriteCount = 0;
overflowWriteFile.clear();
overflowWriteStream.clear();
if (overflowWriteFileIO)
{
mergeRemappedStats(PARENT::inactiveStats, overflowWriteFileIO, diskToTempStatsMap);
overflowWriteFile->noteSize(overflowWriteFileIO->getStatistic(StSizeDiskWrite));
overflowWriteFileIO.clear();
}
overflowWriteFile.clear();
rightRowManager->addRowBuffer(this);
}
doBroadcastRHS(stopping);
Expand Down Expand Up @@ -2156,6 +2166,7 @@ class CLookupJoinActivityBase : public CInMemJoinBase<HTHELPER, IHThorHashJoinAr
{
CThorSpillableRowArray &rows = *rhsSlaveRows.item(a);
rhs.appendRows(rows, true); // NB: This should not cause spilling, rhs is already sized and we are only copying ptrs in
mergeStats(PARENT::inactiveStats, &rows);
rows.kill(); // free up ptr table asap
}
// Have to keep broadcastSpillingLock locked until sort and calculate are done
Expand Down Expand Up @@ -2504,6 +2515,7 @@ class CLookupJoinActivityBase : public CInMemJoinBase<HTHELPER, IHThorHashJoinAr
throwUnexpected();
}
joinHelper->init(left, right, leftAllocator, rightAllocator, ::queryRowMetaData(leftITDL));
mergeStats(PARENT::inactiveStats, rowLoader);
}
void getRHS(bool stopping)
{
Expand Down Expand Up @@ -2678,8 +2690,12 @@ class CLookupJoinActivityBase : public CInMemJoinBase<HTHELPER, IHThorHashJoinAr
IOutputMetaData *inputOutputMeta = rightITDL->queryFromActivity()->queryContainer().queryHelper()->queryOutputMeta();
// rows may either be in separate slave row arrays or in single rhs array, or split.
rowcount_t total = rightCollector ? rightCollector->numRows() : (getGlobalRHSTotal() + rhs.ordinality());
if (rightCollector && rightCollector->hasSpilt())
mergeStats(PARENT::inactiveStats, rightCollector);
throw checkAndCreateOOMContextException(this, e, "gathering RHS rows for lookup join", total, inputOutputMeta, NULL);
}
if (rightCollector && rightCollector->hasSpilt())
mergeStats(PARENT::inactiveStats, rightCollector);
}
public:
static bool needDedup(IHThorHashJoinArg *helper)
Expand Down Expand Up @@ -2943,7 +2959,7 @@ class CLookupJoinActivityBase : public CInMemJoinBase<HTHELPER, IHThorHashJoinAr
return true;
}
CriticalBlock b(rhsRowLock);
if (overflowWriteFile)
if (overflowWriteFileIO)
{
/* Tried to do outside crit above, but if empty, and now overflow, need to inside
* Will be one off if at all
Expand All @@ -2956,7 +2972,7 @@ class CLookupJoinActivityBase : public CInMemJoinBase<HTHELPER, IHThorHashJoinAr
overflowWriteCount += rhsInRowsTemp.ordinality();
ForEachItemIn(r, rhsInRowsTemp)
overflowWriteStream->putRow(rhsInRowsTemp.getClear(r));
overflowWriteFile->noteSize(overflowWriteStream->getStatistic(StSizeDiskWrite));
overflowWriteFile->noteSize(overflowWriteFileIO->getStatistic(StSizeDiskWrite));
return true;
}
if (hasFailedOverToLocal())
Expand Down Expand Up @@ -3001,12 +3017,13 @@ class CLookupJoinActivityBase : public CInMemJoinBase<HTHELPER, IHThorHashJoinAr
GetTempFilePath(tempFilename, "lookup_local");
ActPrintLog("Overflowing RHS broadcast rows to spill file: %s", tempFilename.str());
overflowWriteFile.setown(container.queryActivity()->createOwnedTempFile(tempFilename.str()));
overflowWriteStream.setown(createRowWriter(&(overflowWriteFile->queryIFile()), queryRowInterfaces(rightITDL), rwFlags));
overflowWriteFileIO.setown(overflowWriteFile->queryIFile().open(IFOcreate));
overflowWriteStream.setown(createRowWriter(overflowWriteFileIO, queryRowInterfaces(rightITDL), rwFlags));

overflowWriteCount += rhsInRowsTemp.ordinality();
ForEachItemIn(r, rhsInRowsTemp)
overflowWriteStream->putRow(rhsInRowsTemp.getClear(r));
overflowWriteFile->noteSize(overflowWriteStream->getStatistic(StSizeDiskWrite));
overflowWriteFile->noteSize(overflowWriteFileIO->getStatistic(StSizeDiskWrite));
return true;
}
virtual void gatherActiveStats(CRuntimeStatisticCollection &activeStats) const
Expand All @@ -3018,6 +3035,13 @@ class CLookupJoinActivityBase : public CInMemJoinBase<HTHELPER, IHThorHashJoinAr
activeStats.setStatistic(StNumSmartJoinDegradedToLocal, aggregateFailoversToLocal); // NB: is going to be same for all slaves.
activeStats.setStatistic(StNumSmartJoinSlavesDegradedToStd, aggregateFailoversToStandard);
}
if (overflowWriteFileIO)
mergeRemappedStats(activeStats, overflowWriteFileIO, diskToTempStatsMap);
ForEachItemIn(a, rhsSlaveRows)
{
CThorSpillableRowArray &rows = *rhsSlaveRows.item(a);
mergeStats(activeStats, &rows);
}
}
};

Expand Down Expand Up @@ -3360,6 +3384,7 @@ class CAllJoinSlaveActivity : public CInMemJoinBase<CAllTable, IHThorAllJoinArg>
ForEachItemIn(a, rhsSlaveRows)
{
CThorSpillableRowArray &rows = *rhsSlaveRows.item(a);
mergeStats(PARENT::inactiveStats, &rows);
rhs.appendRows(rows, true);
rows.kill(); // free up ptr table asap
}
Expand Down
47 changes: 20 additions & 27 deletions thorlcr/thorutil/thmem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ class CSpillableStreamBase : public CSpillable
unsigned spillCompInfo;
CThorSpillableRowArray rows;
Owned<CFileOwner> spillFile;
CRuntimeStatisticCollection inactiveStats;

bool spillRows()
{
Expand All @@ -248,13 +249,13 @@ class CSpillableStreamBase : public CSpillable
spillFile.setown(activity.createOwnedTempFile(tempName.str()));
VStringBuffer spillPrefixStr("SpillableStream(%u)", spillPriority);
rows.save(*spillFile, spillCompInfo, false, spillPrefixStr.str()); // saves committed rows
mergeStats(inactiveStats, &rows);
rows.kill(); // no longer needed, readers will pull from spillFile. NB: ok to kill array as rows is never written to or expanded
spillFile->noteSize(spillFile->queryIFile().size());
return true;
}
public:
CSpillableStreamBase(CActivityBase &_activity, CThorSpillableRowArray &inRows, IThorRowInterfaces *_rowIf, EmptyRowSemantics _emptyRowSemantics, unsigned _spillPriority)
: CSpillable(_activity, _rowIf, _spillPriority), rows(_activity), emptyRowSemantics(_emptyRowSemantics)
: CSpillable(_activity, _rowIf, _spillPriority), rows(_activity), emptyRowSemantics(_emptyRowSemantics), inactiveStats(spillStatistics)
{
assertex(inRows.isFlushed());
spillCompInfo = 0x0;
Expand All @@ -265,6 +266,10 @@ class CSpillableStreamBase : public CSpillable
{
ensureSpillingCallbackRemoved();
}
unsigned __int64 getStatistic(StatisticKind kind) const
{
return inactiveStats.getStatisticValue(kind);
}
// IBufferedRowCallback
virtual bool freeBufferedRows(bool critical) override
{
Expand Down Expand Up @@ -1328,13 +1333,13 @@ void CThorSpillableRowArray::safeUnregisterWriteCallback(IWritePosCallback &cb)
}

CThorSpillableRowArray::CThorSpillableRowArray(CActivityBase &activity)
: CThorExpandingRowArray(activity)
: CThorExpandingRowArray(activity), inactiveStats(spillStatistics)
{
throwOnOom = false;
}

CThorSpillableRowArray::CThorSpillableRowArray(CActivityBase &activity, IThorRowInterfaces *rowIf, EmptyRowSemantics emptyRowSemantics, StableSortFlag stableSort, rowidx_t initialSize, size32_t _commitDelta)
: CThorExpandingRowArray(activity, rowIf, ers_forbidden, stableSort, false, initialSize), commitDelta(_commitDelta)
: CThorExpandingRowArray(activity, rowIf, ers_forbidden, stableSort, false, initialSize), commitDelta(_commitDelta), inactiveStats(spillStatistics)
{
}

Expand Down Expand Up @@ -1363,6 +1368,7 @@ void CThorSpillableRowArray::kill()
{
clearRows();
CThorExpandingRowArray::kill();
inactiveStats.reset();
}

void CThorSpillableRowArray::sort(ICompare &compare, unsigned maxCores)
Expand Down Expand Up @@ -1413,7 +1419,8 @@ rowidx_t CThorSpillableRowArray::save(CFileOwner &iFileOwner, unsigned _spillCom
nextCB = &cbCopy.popGet();
nextCBI = nextCB->queryRecordNumber();
}
Owned<IExtRowWriter> writer = createRowWriter(&iFileOwner.queryIFile(), rowIf, rwFlags, nullptr, compBlkSz);
OwnedIFileIO iFileIO = iFileOwner.queryIFile().open(IFOcreate);
Owned<IExtRowWriter> writer = createRowWriter(iFileIO, rowIf, rwFlags, nullptr, compBlkSz);
rowidx_t i=0;
rowidx_t rowsWritten=0;
try
Expand Down Expand Up @@ -1452,7 +1459,6 @@ rowidx_t CThorSpillableRowArray::save(CFileOwner &iFileOwner, unsigned _spillCom
++i;
}
writer->flush(NULL);
iFileOwner.noteSize(writer->getStatistic(StSizeDiskWrite));
}
catch (IException *e)
{
Expand All @@ -1463,6 +1469,10 @@ rowidx_t CThorSpillableRowArray::save(CFileOwner &iFileOwner, unsigned _spillCom
firstRow += n;
offset_t bytesWritten = writer->getPosition();
writer.clear();
mergeRemappedStats(inactiveStats, iFileIO, diskToTempStatsMap);
offset_t sizeTempFile = iFileIO->getStatistic(StSizeDiskWrite);
iFileOwner.noteSize(sizeTempFile);
inactiveStats.addStatistic(StNumSpills, 1);
ActPrintLog(&activity, "%s: CThorSpillableRowArray::save done, rows written = %" RIPF "u, bytes = %" I64F "u, firstRow = %u", _tracingPrefix, rowsWritten, (__int64)bytesWritten, firstRow);
return rowsWritten;
}
Expand Down Expand Up @@ -1638,11 +1648,8 @@ class CThorRowCollectorBase : public CSpillable
Owned<CSharedSpillableRowSet> spillableRowSet;
unsigned options = 0;
unsigned spillCompInfo = 0;
RelaxedAtomic<unsigned> statOverflowCount{0};
RelaxedAtomic<offset_t> statSizeSpill{0};
RelaxedAtomic<__uint64> statSpillCycles{0};
RelaxedAtomic<__uint64> statSortCycles{0};

bool spillRows(bool critical)
{
//This must only be called while a lock is held on spillableRows
Expand All @@ -1668,11 +1675,6 @@ class CThorRowCollectorBase : public CSpillable
spillableRows.save(*tempFileOwner, spillCompInfo, false, spillPrefixStr.str()); // saves committed rows
spillFiles.append(tempFileOwner.getLink());
++overflowCount;
statOverflowCount.fastAdd(1); // NB: this is total over multiple uses of this class
offset_t tempFileSize = tempFileOwner->queryIFile().size();
statSizeSpill.fastAdd(tempFileSize);
tempFileOwner->noteSize(tempFileSize);
statSpillCycles.fastAdd(spillTimer.elapsedCycles());
return true;
}
void setEmptyRowSemantics(EmptyRowSemantics _emptyRowSemantics)
Expand Down Expand Up @@ -1960,26 +1962,17 @@ class CThorRowCollectorBase : public CSpillable
{
options = _options;
}
unsigned __int64 getStatistic(StatisticKind kind)
unsigned __int64 getStatistic(StatisticKind kind) const
{
switch (kind)
{
case StCycleSpillElapsedCycles:
return statSpillCycles;
case StCycleSortElapsedCycles:
return statSortCycles;
case StTimeSpillElapsed:
return cycle_to_nanosec(statSpillCycles);
case StTimeSortElapsed:
return cycle_to_nanosec(statSortCycles);
case StNumSpills:
return statOverflowCount;
case StSizeSpillFile:
return statSizeSpill;
default:
break;
return spillableRows.getStatistic(kind);
}
return 0;
}
bool hasSpilt() const { return overflowCount >= 1; }

Expand Down Expand Up @@ -2048,7 +2041,7 @@ class CThorRowLoader : public CThorRowCollectorBase, implements IThorRowLoader
}
virtual void resize(rowidx_t max) override { CThorRowCollectorBase::resize(max); }
virtual void setOptions(unsigned options) override { CThorRowCollectorBase::setOptions(options); }
virtual unsigned __int64 getStatistic(StatisticKind kind) override { return CThorRowCollectorBase::getStatistic(kind); }
virtual unsigned __int64 getStatistic(StatisticKind kind) const override { return CThorRowCollectorBase::getStatistic(kind); }
virtual bool hasSpilt() const override { return CThorRowCollectorBase::hasSpilt(); }
virtual void setTracingPrefix(const char *tracing) override { CThorRowCollectorBase::setTracingPrefix(tracing); }
virtual void reset() override { CThorRowCollectorBase::reset(); }
Expand Down Expand Up @@ -2103,7 +2096,7 @@ class CThorRowCollector : public CThorRowCollectorBase, implements IThorRowColle
}
virtual void resize(rowidx_t max) override { CThorRowCollectorBase::resize(max); }
virtual void setOptions(unsigned options) override { CThorRowCollectorBase::setOptions(options); }
virtual unsigned __int64 getStatistic(StatisticKind kind) override { return CThorRowCollectorBase::getStatistic(kind); }
virtual unsigned __int64 getStatistic(StatisticKind kind) const override { return CThorRowCollectorBase::getStatistic(kind); }
virtual bool hasSpilt() const override { return CThorRowCollectorBase::hasSpilt(); }
virtual void setTracingPrefix(const char *tracing) override { CThorRowCollectorBase::setTracingPrefix(tracing); }
// IThorArrayLock
Expand Down
8 changes: 6 additions & 2 deletions thorlcr/thorutil/thmem.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ class graph_decl CThorSpillableRowArray : private CThorExpandingRowArray, implem
mutable CriticalSection cs;
ICopyArrayOf<IWritePosCallback> writeCallbacks;
size32_t compBlkSz = 0; // means use default

CRuntimeStatisticCollection inactiveStats; // reset after each kill
bool _flush(bool force);
void doFlush();
inline bool needToMoveRows(bool force) { return (firstRow != 0 && (force || (firstRow >= commitRows/2))); }
Expand Down Expand Up @@ -484,6 +484,10 @@ class graph_decl CThorSpillableRowArray : private CThorExpandingRowArray, implem

inline rowidx_t numCommitted() const { return commitRows - firstRow; } //MORE::Not convinced this is very safe!
inline rowidx_t queryTotalRows() const { return CThorExpandingRowArray::ordinality(); } // includes uncommited rows
inline unsigned __int64 getStatistic(StatisticKind kind) const
{
return inactiveStats.getStatisticValue(kind);
}

// access to
void swap(CThorSpillableRowArray &src);
Expand Down Expand Up @@ -542,7 +546,7 @@ interface IThorRowCollectorCommon : extends IInterface, extends IThorArrayLock
virtual void setup(ICompare *iCompare, StableSortFlag stableSort=stableSort_none, RowCollectorSpillFlags diskMemMix=rc_mixed, unsigned spillPriority=50) = 0;
virtual void resize(rowidx_t max) = 0;
virtual void setOptions(unsigned options) = 0;
virtual unsigned __int64 getStatistic(StatisticKind kind) = 0;
virtual unsigned __int64 getStatistic(StatisticKind kind) const = 0;
virtual bool hasSpilt() const = 0; // equivalent to numOverlows() >= 1
virtual void setTracingPrefix(const char *tracing) = 0;
virtual void reset() = 0;
Expand Down

0 comments on commit 5aaba8c

Please sign in to comment.