diff --git a/thorlcr/activities/lookupjoin/thlookupjoinslave.cpp b/thorlcr/activities/lookupjoin/thlookupjoinslave.cpp index 2d99be825ec..0e9479eb121 100644 --- a/thorlcr/activities/lookupjoin/thlookupjoinslave.cpp +++ b/thorlcr/activities/lookupjoin/thlookupjoinslave.cpp @@ -950,7 +950,7 @@ class CInMemJoinBase : public CSlaveActivity, public CAllOrLookupHelper, } } *rowProcessor; - CriticalSection rhsRowLock; + mutable CriticalSection rhsRowLock; Owned broadcaster; CBroadcaster *channel0Broadcaster; CriticalSection *broadcastLock; @@ -1812,6 +1812,7 @@ class CLookupJoinActivityBase : public CInMemJoinBase overflowWriteFile; Owned overflowWriteStream; OwnedIFileIO overflowWriteFileIO; + mutable CriticalSection critOverflowWriteFileIO; rowcount_t overflowWriteCount; OwnedMalloc channelDistributors; unsigned nextRhsToSpill = 0; @@ -2101,11 +2102,14 @@ class CLookupJoinActivityBase : public CInMemJoinBasenoteSize(overflowWriteFileIO->getStatistic(StSizeDiskWrite)); - overflowWriteFileIO.clear(); + CriticalBlock b(critOverflowWriteFileIO); + if (overflowWriteFileIO) + { + mergeRemappedStats(PARENT::inactiveStats, overflowWriteFileIO, diskToTempStatsMap); + overflowWriteFile->noteSize(overflowWriteFileIO->getStatistic(StSizeDiskWrite)); + overflowWriteFileIO.clear(); + } } overflowWriteFile.clear(); rightRowManager->addRowBuffer(this); @@ -2165,8 +2169,8 @@ class CLookupJoinActivityBase : public CInMemJoinBase rightCollector; + Owned exception; try { CMarker marker(*this); @@ -2686,16 +2691,19 @@ class CLookupJoinActivityBase : public CInMemJoinBasequeryFromActivity()->queryContainer().queryHelper()->queryOutputMeta(); - // rows may either be in separate slave row arrays or in single rhs array, or split. - rowcount_t total = rightCollector ? rightCollector->numRows() : (getGlobalRHSTotal() + rhs.ordinality()); - if (rightCollector && rightCollector->hasSpilt()) - mergeStats(PARENT::inactiveStats, rightCollector); - throw checkAndCreateOOMContextException(this, e, "gathering RHS rows for lookup join", total, inputOutputMeta, NULL); + exception.setown(e); + else + { + IOutputMetaData *inputOutputMeta = rightITDL->queryFromActivity()->queryContainer().queryHelper()->queryOutputMeta(); + // rows may either be in separate slave row arrays or in single rhs array, or split. + rowcount_t total = rightCollector ? rightCollector->numRows() : (getGlobalRHSTotal() + rhs.ordinality()); + exception.setown(checkAndCreateOOMContextException(this, e, "gathering RHS rows for lookup join", total, inputOutputMeta, NULL)); + } } if (rightCollector && rightCollector->hasSpilt()) mergeStats(PARENT::inactiveStats, rightCollector); + if (exception) + throw exception.getClear(); } public: static bool needDedup(IHThorHashJoinArg *helper) @@ -3035,12 +3043,18 @@ class CLookupJoinActivityBase : public CInMemJoinBasegetPosition(); writer.clear(); - mergeRemappedStats(inactiveStats, iFileIO, diskToTempStatsMap); + mergeStats(stats, iFileIO); offset_t sizeTempFile = iFileIO->getStatistic(StSizeDiskWrite); iFileOwner.noteSize(sizeTempFile); - inactiveStats.addStatistic(StNumSpills, 1); + stats.addStatistic(StNumSpills, 1); ActPrintLog(&activity, "%s: CThorSpillableRowArray::save done, rows written = %" RIPF "u, bytes = %" I64F "u, firstRow = %u", _tracingPrefix, rowsWritten, (__int64)bytesWritten, firstRow); return rowsWritten; } diff --git a/thorlcr/thorutil/thmem.hpp b/thorlcr/thorutil/thmem.hpp index bdcda28efbf..a89d4cdd0f2 100644 --- a/thorlcr/thorutil/thmem.hpp +++ b/thorlcr/thorutil/thmem.hpp @@ -413,7 +413,7 @@ class graph_decl CThorSpillableRowArray : private CThorExpandingRowArray, implem mutable CriticalSection cs; ICopyArrayOf writeCallbacks; size32_t compBlkSz = 0; // means use default - CRuntimeStatisticCollection inactiveStats; // reset after each kill + CRuntimeStatisticCollection stats; // reset after each kill bool _flush(bool force); void doFlush(); inline bool needToMoveRows(bool force) { return (firstRow != 0 && (force || (firstRow >= commitRows/2))); } @@ -486,7 +486,7 @@ class graph_decl CThorSpillableRowArray : private CThorExpandingRowArray, implem inline rowidx_t queryTotalRows() const { return CThorExpandingRowArray::ordinality(); } // includes uncommited rows inline unsigned __int64 getStatistic(StatisticKind kind) const { - return inactiveStats.getStatisticValue(kind); + return stats.getStatisticValue(kind); } // access to diff --git a/thorlcr/thorutil/thormisc.cpp b/thorlcr/thorutil/thormisc.cpp index a9e493e02e4..70bdedb9361 100644 --- a/thorlcr/thorutil/thormisc.cpp +++ b/thorlcr/thorutil/thormisc.cpp @@ -97,6 +97,7 @@ const StatisticsMapping hashDedupActivityStatistics({}, spillStatistics, diskWri const StatisticsMapping hashDistribActivityStatistics({StNumLocalRows, StNumRemoteRows, StSizeRemoteWrite}, basicActivityStatistics); const StatisticsMapping loopActivityStatistics({StNumIterations}, basicActivityStatistics); const StatisticsMapping graphStatistics({StNumExecutions, StSizeSpillFile, StSizeGraphSpill, StSizePeakTempDisk, StSizePeakEphemeralDisk, StTimeUser, StTimeSystem, StNumContextSwitches, StSizeMemory, StSizePeakMemory, StSizeRowMemory, StSizePeakRowMemory}, executeStatistics); +const StatisticsMapping tempFileStatistics({StNumSpills}, diskRemoteStatistics); const StatKindMap diskToTempStatsMap ={ {StSizeDiskWrite, StSizeSpillFile}, diff --git a/thorlcr/thorutil/thormisc.hpp b/thorlcr/thorutil/thormisc.hpp index 3159d58ad48..17ebe967e8b 100644 --- a/thorlcr/thorutil/thormisc.hpp +++ b/thorlcr/thorutil/thormisc.hpp @@ -167,6 +167,8 @@ extern graph_decl const StatisticsMapping soapcallActivityStatistics; extern graph_decl const StatisticsMapping indexReadFileStatistics; extern graph_decl const StatisticsMapping hashDedupActivityStatistics; extern graph_decl const StatisticsMapping hashDistribActivityStatistics; +extern graph_decl const StatisticsMapping tempFileStatistics; + // Maps disk related stats to spill stats extern graph_decl const std::map diskToTempStatsMap;