Skip to content

Commit

Permalink
HPCC-32193 Changes following review
Browse files Browse the repository at this point in the history
Signed-off-by: Shamser Ahmed <[email protected]>
  • Loading branch information
shamser committed Aug 22, 2024
1 parent 333932c commit ad04504
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 25 deletions.
50 changes: 32 additions & 18 deletions thorlcr/activities/lookupjoin/thlookupjoinslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -950,7 +950,7 @@ class CInMemJoinBase : public CSlaveActivity, public CAllOrLookupHelper<HELPER>,
}
} *rowProcessor;

CriticalSection rhsRowLock;
mutable CriticalSection rhsRowLock;
Owned<CBroadcaster> broadcaster;
CBroadcaster *channel0Broadcaster;
CriticalSection *broadcastLock;
Expand Down Expand Up @@ -1812,6 +1812,7 @@ class CLookupJoinActivityBase : public CInMemJoinBase<HTHELPER, IHThorHashJoinAr
Owned<CFileOwner> overflowWriteFile;
Owned<IExtRowWriter> overflowWriteStream;
OwnedIFileIO overflowWriteFileIO;
mutable CriticalSection critOverflowWriteFileIO;
rowcount_t overflowWriteCount;
OwnedMalloc<IChannelDistributor *> channelDistributors;
unsigned nextRhsToSpill = 0;
Expand Down Expand Up @@ -2101,11 +2102,14 @@ class CLookupJoinActivityBase : public CInMemJoinBase<HTHELPER, IHThorHashJoinAr
{
overflowWriteCount = 0;
overflowWriteStream.clear();
if (overflowWriteFileIO)
{
mergeRemappedStats(PARENT::inactiveStats, overflowWriteFileIO, diskToTempStatsMap);
overflowWriteFile->noteSize(overflowWriteFileIO->getStatistic(StSizeDiskWrite));
overflowWriteFileIO.clear();
CriticalBlock b(critOverflowWriteFileIO);
if (overflowWriteFileIO)
{
mergeRemappedStats(PARENT::inactiveStats, overflowWriteFileIO, diskToTempStatsMap);
overflowWriteFile->noteSize(overflowWriteFileIO->getStatistic(StSizeDiskWrite));
overflowWriteFileIO.clear();
}
}
overflowWriteFile.clear();
rightRowManager->addRowBuffer(this);
Expand Down Expand Up @@ -2165,8 +2169,8 @@ class CLookupJoinActivityBase : public CInMemJoinBase<HTHELPER, IHThorHashJoinAr
ForEachItemIn(a, rhsSlaveRows)
{
CThorSpillableRowArray &rows = *rhsSlaveRows.item(a);
rhs.appendRows(rows, true); // NB: This should not cause spilling, rhs is already sized and we are only copying ptrs in
mergeStats(PARENT::inactiveStats, &rows);
rhs.appendRows(rows, true); // NB: This should not cause spilling, rhs is already sized and we are only copying ptrs in
rows.kill(); // free up ptr table asap
}
// Have to keep broadcastSpillingLock locked until sort and calculate are done
Expand Down Expand Up @@ -2562,6 +2566,7 @@ class CLookupJoinActivityBase : public CInMemJoinBase<HTHELPER, IHThorHashJoinAr
*/

Owned<IThorRowCollector> rightCollector;
Owned<IException> exception;
try
{
CMarker marker(*this);
Expand Down Expand Up @@ -2686,16 +2691,19 @@ class CLookupJoinActivityBase : public CInMemJoinBase<HTHELPER, IHThorHashJoinAr
catch (IException *e)
{
if (!isOOMException(e))
throw e;
IOutputMetaData *inputOutputMeta = rightITDL->queryFromActivity()->queryContainer().queryHelper()->queryOutputMeta();
// rows may either be in separate slave row arrays or in single rhs array, or split.
rowcount_t total = rightCollector ? rightCollector->numRows() : (getGlobalRHSTotal() + rhs.ordinality());
if (rightCollector && rightCollector->hasSpilt())
mergeStats(PARENT::inactiveStats, rightCollector);
throw checkAndCreateOOMContextException(this, e, "gathering RHS rows for lookup join", total, inputOutputMeta, NULL);
exception.setown(e);
else
{
IOutputMetaData *inputOutputMeta = rightITDL->queryFromActivity()->queryContainer().queryHelper()->queryOutputMeta();
// rows may either be in separate slave row arrays or in single rhs array, or split.
rowcount_t total = rightCollector ? rightCollector->numRows() : (getGlobalRHSTotal() + rhs.ordinality());
exception.setown(checkAndCreateOOMContextException(this, e, "gathering RHS rows for lookup join", total, inputOutputMeta, NULL));
}
}
if (rightCollector && rightCollector->hasSpilt())
mergeStats(PARENT::inactiveStats, rightCollector);
if (exception)
throw exception.getClear();
}
public:
static bool needDedup(IHThorHashJoinArg *helper)
Expand Down Expand Up @@ -3035,12 +3043,18 @@ class CLookupJoinActivityBase : public CInMemJoinBase<HTHELPER, IHThorHashJoinAr
activeStats.setStatistic(StNumSmartJoinDegradedToLocal, aggregateFailoversToLocal); // NB: is going to be same for all slaves.
activeStats.setStatistic(StNumSmartJoinSlavesDegradedToStd, aggregateFailoversToStandard);
}
if (overflowWriteFileIO)
mergeRemappedStats(activeStats, overflowWriteFileIO, diskToTempStatsMap);
ForEachItemIn(a, rhsSlaveRows)
{
CThorSpillableRowArray &rows = *rhsSlaveRows.item(a);
mergeStats(activeStats, &rows);
CriticalBlock b(critOverflowWriteFileIO);
if (overflowWriteFileIO)
mergeRemappedStats(activeStats, overflowWriteFileIO, diskToTempStatsMap);
}
{
CriticalBlock b(rhsRowLock);
ForEachItemIn(a, rhsSlaveRows)
{
CThorSpillableRowArray &rows = *rhsSlaveRows.item(a);
mergeRemappedStats(activeStats, &rows, diskToTempStatsMap);
}
}
}
};
Expand Down
10 changes: 5 additions & 5 deletions thorlcr/thorutil/thmem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1333,13 +1333,13 @@ void CThorSpillableRowArray::safeUnregisterWriteCallback(IWritePosCallback &cb)
}

CThorSpillableRowArray::CThorSpillableRowArray(CActivityBase &activity)
: CThorExpandingRowArray(activity), inactiveStats(spillStatistics)
: CThorExpandingRowArray(activity), stats(tempFileStatistics)
{
throwOnOom = false;
}

CThorSpillableRowArray::CThorSpillableRowArray(CActivityBase &activity, IThorRowInterfaces *rowIf, EmptyRowSemantics emptyRowSemantics, StableSortFlag stableSort, rowidx_t initialSize, size32_t _commitDelta)
: CThorExpandingRowArray(activity, rowIf, ers_forbidden, stableSort, false, initialSize), commitDelta(_commitDelta), inactiveStats(spillStatistics)
: CThorExpandingRowArray(activity, rowIf, ers_forbidden, stableSort, false, initialSize), commitDelta(_commitDelta), stats(tempFileStatistics)
{
}

Expand Down Expand Up @@ -1368,7 +1368,7 @@ void CThorSpillableRowArray::kill()
{
clearRows();
CThorExpandingRowArray::kill();
inactiveStats.reset();
stats.reset();
}

void CThorSpillableRowArray::sort(ICompare &compare, unsigned maxCores)
Expand Down Expand Up @@ -1469,10 +1469,10 @@ rowidx_t CThorSpillableRowArray::save(CFileOwner &iFileOwner, unsigned _spillCom
firstRow += n;
offset_t bytesWritten = writer->getPosition();
writer.clear();
mergeRemappedStats(inactiveStats, iFileIO, diskToTempStatsMap);
mergeStats(stats, iFileIO);
offset_t sizeTempFile = iFileIO->getStatistic(StSizeDiskWrite);
iFileOwner.noteSize(sizeTempFile);
inactiveStats.addStatistic(StNumSpills, 1);
stats.addStatistic(StNumSpills, 1);
ActPrintLog(&activity, "%s: CThorSpillableRowArray::save done, rows written = %" RIPF "u, bytes = %" I64F "u, firstRow = %u", _tracingPrefix, rowsWritten, (__int64)bytesWritten, firstRow);
return rowsWritten;
}
Expand Down
4 changes: 2 additions & 2 deletions thorlcr/thorutil/thmem.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ class graph_decl CThorSpillableRowArray : private CThorExpandingRowArray, implem
mutable CriticalSection cs;
ICopyArrayOf<IWritePosCallback> writeCallbacks;
size32_t compBlkSz = 0; // means use default
CRuntimeStatisticCollection inactiveStats; // reset after each kill
CRuntimeStatisticCollection stats; // reset after each kill
bool _flush(bool force);
void doFlush();
inline bool needToMoveRows(bool force) { return (firstRow != 0 && (force || (firstRow >= commitRows/2))); }
Expand Down Expand Up @@ -486,7 +486,7 @@ class graph_decl CThorSpillableRowArray : private CThorExpandingRowArray, implem
inline rowidx_t queryTotalRows() const { return CThorExpandingRowArray::ordinality(); } // includes uncommited rows
inline unsigned __int64 getStatistic(StatisticKind kind) const
{
return inactiveStats.getStatisticValue(kind);
return stats.getStatisticValue(kind);
}

// access to
Expand Down
1 change: 1 addition & 0 deletions thorlcr/thorutil/thormisc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ const StatisticsMapping hashDedupActivityStatistics({}, spillStatistics, diskWri
const StatisticsMapping hashDistribActivityStatistics({StNumLocalRows, StNumRemoteRows, StSizeRemoteWrite}, basicActivityStatistics);
const StatisticsMapping loopActivityStatistics({StNumIterations}, basicActivityStatistics);
const StatisticsMapping graphStatistics({StNumExecutions, StSizeSpillFile, StSizeGraphSpill, StSizePeakTempDisk, StSizePeakEphemeralDisk, StTimeUser, StTimeSystem, StNumContextSwitches, StSizeMemory, StSizePeakMemory, StSizeRowMemory, StSizePeakRowMemory}, executeStatistics);
const StatisticsMapping tempFileStatistics({StNumSpills}, diskRemoteStatistics);

const StatKindMap diskToTempStatsMap
={ {StSizeDiskWrite, StSizeSpillFile},
Expand Down
2 changes: 2 additions & 0 deletions thorlcr/thorutil/thormisc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ extern graph_decl const StatisticsMapping soapcallActivityStatistics;
extern graph_decl const StatisticsMapping indexReadFileStatistics;
extern graph_decl const StatisticsMapping hashDedupActivityStatistics;
extern graph_decl const StatisticsMapping hashDistribActivityStatistics;
extern graph_decl const StatisticsMapping tempFileStatistics;


// Maps disk related stats to spill stats
extern graph_decl const std::map<StatisticKind, StatisticKind> diskToTempStatsMap;
Expand Down

0 comments on commit ad04504

Please sign in to comment.