Skip to content

Commit

Permalink
Merge pull request #18866 from shamser/issue32193
Browse files Browse the repository at this point in the history
HPCC-32193 Fix some issues with spill stats in smart join activity

Reviewed-by: Jake Smith <[email protected]>
Reviewed-by: Gavin Halliday <[email protected]>
Merged-by: Gavin Halliday <[email protected]>
  • Loading branch information
ghalliday authored Sep 16, 2024
2 parents 699f0a0 + 5b67f03 commit 0f9e143
Show file tree
Hide file tree
Showing 8 changed files with 86 additions and 50 deletions.
59 changes: 48 additions & 11 deletions thorlcr/activities/lookupjoin/thlookupjoinslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -957,7 +957,7 @@ class CInMemJoinBase : public CSlaveActivity, public CAllOrLookupHelper<HELPER>,
}
} *rowProcessor;

CriticalSection rhsRowLock;
mutable CriticalSection rhsRowLock;
Owned<CBroadcaster> broadcaster;
CBroadcaster *channel0Broadcaster;
CriticalSection *broadcastLock;
Expand Down Expand Up @@ -1099,7 +1099,10 @@ class CInMemJoinBase : public CSlaveActivity, public CAllOrLookupHelper<HELPER>,
{
CThorSpillableRowArray *rows = rhsSlaveRows.item(a);
if (rows)
{
mergeRemappedStats(inactiveStats, rows, diskToTempStatsMap);
rows->kill();
}
}
rhs.kill();
}
Expand Down Expand Up @@ -1815,6 +1818,8 @@ class CLookupJoinActivityBase : public CInMemJoinBase<HTHELPER, IHThorHashJoinAr
// NB: Only used by channel 0
Owned<CFileOwner> overflowWriteFile;
Owned<IExtRowWriter> overflowWriteStream;
OwnedIFileIO overflowWriteFileIO;
mutable CriticalSection critOverflowWriteFileIO;
rowcount_t overflowWriteCount;
OwnedMalloc<IChannelDistributor *> channelDistributors;
unsigned nextRhsToSpill = 0;
Expand Down Expand Up @@ -2103,7 +2108,6 @@ class CLookupJoinActivityBase : public CInMemJoinBase<HTHELPER, IHThorHashJoinAr
if (isSmart())
{
overflowWriteCount = 0;
overflowWriteFile.clear();
overflowWriteStream.clear();
rightRowManager->addRowBuffer(this);
}
Expand All @@ -2114,7 +2118,15 @@ class CLookupJoinActivityBase : public CInMemJoinBase<HTHELPER, IHThorHashJoinAr
CriticalBlock b(broadcastSpillingLock);
rhsRows = getGlobalRHSTotal(); // flushes all rhsSlaveRows arrays to calculate total.
if (hasFailedOverToLocal())
{
if (overflowWriteFileIO)
{
mergeRemappedStats(PARENT::inactiveStats, overflowWriteFileIO, diskToTempStatsMap);
overflowWriteFile->noteSize(overflowWriteFileIO->getStatistic(StSizeDiskWrite));
overflowWriteFileIO.clear();
}
overflowWriteStream.clear(); // broadcast has finished, no more can be written
}
}
if (!hasFailedOverToLocal())
{
Expand Down Expand Up @@ -2162,6 +2174,7 @@ class CLookupJoinActivityBase : public CInMemJoinBase<HTHELPER, IHThorHashJoinAr
ForEachItemIn(a, rhsSlaveRows)
{
CThorSpillableRowArray &rows = *rhsSlaveRows.item(a);
mergeRemappedStats(PARENT::inactiveStats, &rows, diskToTempStatsMap);
rhs.appendRows(rows, true); // NB: This should not cause spilling, rhs is already sized and we are only copying ptrs in
rows.kill(); // free up ptr table asap
}
Expand Down Expand Up @@ -2486,6 +2499,7 @@ class CLookupJoinActivityBase : public CInMemJoinBase<HTHELPER, IHThorHashJoinAr
Owned<IThorRowLoader> rowLoader = createThorRowLoader(*this, queryRowInterfaces(leftITDL), helper->isLeftAlreadyLocallySorted() ? NULL : compareLeft);
rowLoader->setTracingPrefix("Join left");
left.setown(rowLoader->load(left, abortSoon, false));
mergeRemappedStats(PARENT::inactiveStats, rowLoader, diskToTempStatsMap);
leftITDL = queryInput(0); // reset
ActPrintLog("LHS loaded/sorted");

Expand Down Expand Up @@ -2557,6 +2571,7 @@ class CLookupJoinActivityBase : public CInMemJoinBase<HTHELPER, IHThorHashJoinAr
*/

Owned<IThorRowCollector> rightCollector;
Owned<IException> exception;
try
{
CMarker marker(*this);
Expand Down Expand Up @@ -2681,12 +2696,19 @@ class CLookupJoinActivityBase : public CInMemJoinBase<HTHELPER, IHThorHashJoinAr
catch (IException *e)
{
if (!isOOMException(e))
throw e;
IOutputMetaData *inputOutputMeta = rightITDL->queryFromActivity()->queryContainer().queryHelper()->queryOutputMeta();
// rows may either be in separate slave row arrays or in single rhs array, or split.
rowcount_t total = rightCollector ? rightCollector->numRows() : (getGlobalRHSTotal() + rhs.ordinality());
throw checkAndCreateOOMContextException(this, e, "gathering RHS rows for lookup join", total, inputOutputMeta, NULL);
exception.setown(e);
else
{
IOutputMetaData *inputOutputMeta = rightITDL->queryFromActivity()->queryContainer().queryHelper()->queryOutputMeta();
// rows may either be in separate slave row arrays or in single rhs array, or split.
rowcount_t total = rightCollector ? rightCollector->numRows() : (getGlobalRHSTotal() + rhs.ordinality());
exception.setown(checkAndCreateOOMContextException(this, e, "gathering RHS rows for lookup join", total, inputOutputMeta, NULL));
}
}
if (rightCollector && rightCollector->hasSpilt())
mergeRemappedStats(PARENT::inactiveStats, rightCollector, diskToTempStatsMap);
if (exception)
throw exception.getClear();
}
public:
static bool needDedup(IHThorHashJoinArg *helper)
Expand Down Expand Up @@ -2950,7 +2972,7 @@ class CLookupJoinActivityBase : public CInMemJoinBase<HTHELPER, IHThorHashJoinAr
return true;
}
CriticalBlock b(rhsRowLock);
if (overflowWriteFile)
if (overflowWriteFileIO)
{
/* Tried to do outside crit above, but if empty, and now overflow, need to inside
* Will be one off if at all
Expand All @@ -2963,7 +2985,7 @@ class CLookupJoinActivityBase : public CInMemJoinBase<HTHELPER, IHThorHashJoinAr
overflowWriteCount += rhsInRowsTemp.ordinality();
ForEachItemIn(r, rhsInRowsTemp)
overflowWriteStream->putRow(rhsInRowsTemp.getClear(r));
overflowWriteFile->noteSize(overflowWriteStream->getStatistic(StSizeDiskWrite));
overflowWriteFile->noteSize(overflowWriteFileIO->getStatistic(StSizeDiskWrite));
return true;
}
if (hasFailedOverToLocal())
Expand Down Expand Up @@ -3008,12 +3030,13 @@ class CLookupJoinActivityBase : public CInMemJoinBase<HTHELPER, IHThorHashJoinAr
GetTempFilePath(tempFilename, "lookup_local");
ActPrintLog("Overflowing RHS broadcast rows to spill file: %s", tempFilename.str());
overflowWriteFile.setown(container.queryActivity()->createOwnedTempFile(tempFilename.str()));
overflowWriteStream.setown(createRowWriter(&(overflowWriteFile->queryIFile()), queryRowInterfaces(rightITDL), rwFlags));
overflowWriteFileIO.setown(overflowWriteFile->queryIFile().open(IFOcreate));
overflowWriteStream.setown(createRowWriter(overflowWriteFileIO, queryRowInterfaces(rightITDL), rwFlags));

overflowWriteCount += rhsInRowsTemp.ordinality();
ForEachItemIn(r, rhsInRowsTemp)
overflowWriteStream->putRow(rhsInRowsTemp.getClear(r));
overflowWriteFile->noteSize(overflowWriteStream->getStatistic(StSizeDiskWrite));
overflowWriteFile->noteSize(overflowWriteFileIO->getStatistic(StSizeDiskWrite));
return true;
}
virtual void gatherActiveStats(CRuntimeStatisticCollection &activeStats) const
Expand All @@ -3025,6 +3048,19 @@ class CLookupJoinActivityBase : public CInMemJoinBase<HTHELPER, IHThorHashJoinAr
activeStats.setStatistic(StNumSmartJoinDegradedToLocal, aggregateFailoversToLocal); // NB: is going to be same for all slaves.
activeStats.setStatistic(StNumSmartJoinSlavesDegradedToStd, aggregateFailoversToStandard);
}
{
CriticalBlock b(critOverflowWriteFileIO);
if (overflowWriteFileIO)
mergeRemappedStats(activeStats, overflowWriteFileIO, diskToTempStatsMap);
}
{
CriticalBlock b(rhsRowLock);
ForEachItemIn(a, rhsSlaveRows)
{
CThorSpillableRowArray &rows = *rhsSlaveRows.item(a);
mergeRemappedStats(activeStats, &rows, diskToTempStatsMap);
}
}
}
};

Expand Down Expand Up @@ -3367,6 +3403,7 @@ class CAllJoinSlaveActivity : public CInMemJoinBase<CAllTable, IHThorAllJoinArg>
ForEachItemIn(a, rhsSlaveRows)
{
CThorSpillableRowArray &rows = *rhsSlaveRows.item(a);
mergeRemappedStats(PARENT::inactiveStats, &rows, diskToTempStatsMap);
rhs.appendRows(rows, true);
rows.kill(); // free up ptr table asap
}
Expand Down
2 changes: 1 addition & 1 deletion thorlcr/activities/nsplitter/thnsplitterslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ class NSplitterSlaveActivity : public CSlaveActivity, implements ISharedSmartBuf
{
PARENT::gatherActiveStats(activeStats);
if (sharedRowStream)
::mergeStats(activeStats, sharedRowStream);
mergeRemappedStats(activeStats, sharedRowStream, diskToTempStatsMap);
}
// ISharedSmartBufferCallback impl.
virtual void paged() { pagedOut = true; }
Expand Down
2 changes: 1 addition & 1 deletion thorlcr/msort/tsorts.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1338,7 +1338,7 @@ class CThorSorter : public CSimpleInterface, implements IThorSorter, implements
throw;
}

mergeStats(spillStats, sortedloader);
mergeRemappedStats(spillStats, sortedloader, diskToTempStatsMap);

if (!abort)
{
Expand Down
14 changes: 7 additions & 7 deletions thorlcr/thorutil/thbuf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2449,7 +2449,7 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader
bool inputGrouped = false;
SharedRowStreamReaderOptions options;
size32_t inMemReadAheadGranularity = 0;
CRuntimeStatisticCollection inactiveStats;
CRuntimeStatisticCollection stats;
CRuntimeStatisticCollection previousFileStats;
StringAttr baseTmpFilename;

Expand Down Expand Up @@ -2483,21 +2483,21 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader
outputStream.clear();
iFileIO->flush();
tempFileOwner->noteSize(iFileIO->getStatistic(StSizeDiskWrite));
updateRemappedStatsDelta(inactiveStats, previousFileStats, iFileIO, diskToTempStatsMap); // NB: also updates prev to current
updateStatsDelta(stats, previousFileStats, iFileIO); // NB: also updates prev to current
previousFileStats.reset();
iFileIO.clear();
}
}
void createOutputStream()
{
closeWriter(); // Ensure stats from closing files are preserved in inactiveStats
closeWriter(); // Ensure stats from closing files are preserved in stats
// NB: Called once, when spilling starts.
tempFileOwner.setown(activity.createOwnedTempFile(baseTmpFilename));
auto res = createSerialOutputStream(&(tempFileOwner->queryIFile()), compressHandler, options, numOutputs + 1);
outputStream.setown(std::get<0>(res));
iFileIO.setown(std::get<1>(res));
totalInputRowsRead = inMemTotalRows;
inactiveStats.addStatistic(StNumSpills, 1);
stats.addStatistic(StNumSpills, 1);
}
void writeRowsFromInput()
{
Expand Down Expand Up @@ -2539,7 +2539,7 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader
outputStream->flush();
totalInputRowsRead.fetch_add(newRowsWritten);
tempFileOwner->noteSize(iFileIO->getStatistic(StSizeDiskWrite));
updateRemappedStatsDelta(inactiveStats, previousFileStats, iFileIO, diskToTempStatsMap); // NB: also updates prev to current
updateStatsDelta(stats, previousFileStats, iFileIO); // NB: also updates prev to current
// JCSMORE - could track size written, and start new file at this point (e.g. every 100MB),
// and track their starting points (by row #) in a vector
// We could then tell if/when the readers catch up, and remove consumed files as they do.
Expand All @@ -2553,7 +2553,7 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader
explicit CSharedFullSpillingWriteAhead(CActivityBase *_activity, unsigned _numOutputs, IRowStream *_input, bool _inputGrouped, const SharedRowStreamReaderOptions &_options, IThorRowInterfaces *rowIf, const char *_baseTmpFilename, ICompressHandler *_compressHandler)
: activity(*_activity), numOutputs(_numOutputs), input(_input), inputGrouped(_inputGrouped), options(_options), compressHandler(_compressHandler), baseTmpFilename(_baseTmpFilename),
meta(rowIf->queryRowMetaData()), serializer(rowIf->queryRowSerializer()), allocator(rowIf->queryRowAllocator()), deserializer(rowIf->queryRowDeserializer()),
inactiveStats(spillStatistics), previousFileStats(spillStatistics)
stats(tempFileStatistics), previousFileStats(tempFileStatistics)
{
assertex(input);

Expand Down Expand Up @@ -2717,7 +2717,7 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader
}
virtual unsigned __int64 getStatistic(StatisticKind kind) const override
{
return inactiveStats.getStatisticValue(kind);
return stats.getStatisticValue(kind);
}
};

Expand Down
Loading

0 comments on commit 0f9e143

Please sign in to comment.