diff --git a/thorlcr/activities/lookupjoin/thlookupjoinslave.cpp b/thorlcr/activities/lookupjoin/thlookupjoinslave.cpp index 2236833cc2b..c1e62740c2f 100644 --- a/thorlcr/activities/lookupjoin/thlookupjoinslave.cpp +++ b/thorlcr/activities/lookupjoin/thlookupjoinslave.cpp @@ -957,7 +957,7 @@ class CInMemJoinBase : public CSlaveActivity, public CAllOrLookupHelper, } } *rowProcessor; - CriticalSection rhsRowLock; + mutable CriticalSection rhsRowLock; Owned broadcaster; CBroadcaster *channel0Broadcaster; CriticalSection *broadcastLock; @@ -1099,7 +1099,10 @@ class CInMemJoinBase : public CSlaveActivity, public CAllOrLookupHelper, { CThorSpillableRowArray *rows = rhsSlaveRows.item(a); if (rows) + { + mergeRemappedStats(inactiveStats, rows, diskToTempStatsMap); rows->kill(); + } } rhs.kill(); } @@ -1815,6 +1818,8 @@ class CLookupJoinActivityBase : public CInMemJoinBase overflowWriteFile; Owned overflowWriteStream; + OwnedIFileIO overflowWriteFileIO; + mutable CriticalSection critOverflowWriteFileIO; rowcount_t overflowWriteCount; OwnedMalloc channelDistributors; unsigned nextRhsToSpill = 0; @@ -2103,7 +2108,6 @@ class CLookupJoinActivityBase : public CInMemJoinBaseaddRowBuffer(this); } @@ -2114,7 +2118,15 @@ class CLookupJoinActivityBase : public CInMemJoinBasenoteSize(overflowWriteFileIO->getStatistic(StSizeDiskWrite)); + overflowWriteFileIO.clear(); + } overflowWriteStream.clear(); // broadcast has finished, no more can be written + } } if (!hasFailedOverToLocal()) { @@ -2162,6 +2174,7 @@ class CLookupJoinActivityBase : public CInMemJoinBase rowLoader = createThorRowLoader(*this, queryRowInterfaces(leftITDL), helper->isLeftAlreadyLocallySorted() ? NULL : compareLeft); rowLoader->setTracingPrefix("Join left"); left.setown(rowLoader->load(left, abortSoon, false)); + mergeRemappedStats(PARENT::inactiveStats, rowLoader, diskToTempStatsMap); leftITDL = queryInput(0); // reset ActPrintLog("LHS loaded/sorted"); @@ -2557,6 +2571,7 @@ class CLookupJoinActivityBase : public CInMemJoinBase rightCollector; + Owned exception; try { CMarker marker(*this); @@ -2681,12 +2696,19 @@ class CLookupJoinActivityBase : public CInMemJoinBasequeryFromActivity()->queryContainer().queryHelper()->queryOutputMeta(); - // rows may either be in separate slave row arrays or in single rhs array, or split. - rowcount_t total = rightCollector ? rightCollector->numRows() : (getGlobalRHSTotal() + rhs.ordinality()); - throw checkAndCreateOOMContextException(this, e, "gathering RHS rows for lookup join", total, inputOutputMeta, NULL); + exception.setown(e); + else + { + IOutputMetaData *inputOutputMeta = rightITDL->queryFromActivity()->queryContainer().queryHelper()->queryOutputMeta(); + // rows may either be in separate slave row arrays or in single rhs array, or split. + rowcount_t total = rightCollector ? rightCollector->numRows() : (getGlobalRHSTotal() + rhs.ordinality()); + exception.setown(checkAndCreateOOMContextException(this, e, "gathering RHS rows for lookup join", total, inputOutputMeta, NULL)); + } } + if (rightCollector && rightCollector->hasSpilt()) + mergeRemappedStats(PARENT::inactiveStats, rightCollector, diskToTempStatsMap); + if (exception) + throw exception.getClear(); } public: static bool needDedup(IHThorHashJoinArg *helper) @@ -2950,7 +2972,7 @@ class CLookupJoinActivityBase : public CInMemJoinBaseputRow(rhsInRowsTemp.getClear(r)); - overflowWriteFile->noteSize(overflowWriteStream->getStatistic(StSizeDiskWrite)); + overflowWriteFile->noteSize(overflowWriteFileIO->getStatistic(StSizeDiskWrite)); return true; } if (hasFailedOverToLocal()) @@ -3008,12 +3030,13 @@ class CLookupJoinActivityBase : public CInMemJoinBasecreateOwnedTempFile(tempFilename.str())); - overflowWriteStream.setown(createRowWriter(&(overflowWriteFile->queryIFile()), queryRowInterfaces(rightITDL), rwFlags)); + overflowWriteFileIO.setown(overflowWriteFile->queryIFile().open(IFOcreate)); + overflowWriteStream.setown(createRowWriter(overflowWriteFileIO, queryRowInterfaces(rightITDL), rwFlags)); overflowWriteCount += rhsInRowsTemp.ordinality(); ForEachItemIn(r, rhsInRowsTemp) overflowWriteStream->putRow(rhsInRowsTemp.getClear(r)); - overflowWriteFile->noteSize(overflowWriteStream->getStatistic(StSizeDiskWrite)); + overflowWriteFile->noteSize(overflowWriteFileIO->getStatistic(StSizeDiskWrite)); return true; } virtual void gatherActiveStats(CRuntimeStatisticCollection &activeStats) const @@ -3025,6 +3048,19 @@ class CLookupJoinActivityBase : public CInMemJoinBase ForEachItemIn(a, rhsSlaveRows) { CThorSpillableRowArray &rows = *rhsSlaveRows.item(a); + mergeRemappedStats(PARENT::inactiveStats, &rows, diskToTempStatsMap); rhs.appendRows(rows, true); rows.kill(); // free up ptr table asap } diff --git a/thorlcr/activities/nsplitter/thnsplitterslave.cpp b/thorlcr/activities/nsplitter/thnsplitterslave.cpp index d733aa41a64..2cb322e6730 100644 --- a/thorlcr/activities/nsplitter/thnsplitterslave.cpp +++ b/thorlcr/activities/nsplitter/thnsplitterslave.cpp @@ -405,7 +405,7 @@ class NSplitterSlaveActivity : public CSlaveActivity, implements ISharedSmartBuf { PARENT::gatherActiveStats(activeStats); if (sharedRowStream) - ::mergeStats(activeStats, sharedRowStream); + mergeRemappedStats(activeStats, sharedRowStream, diskToTempStatsMap); } // ISharedSmartBufferCallback impl. virtual void paged() { pagedOut = true; } diff --git a/thorlcr/msort/tsorts.cpp b/thorlcr/msort/tsorts.cpp index 7e58fa50ea1..10a95e6af0d 100644 --- a/thorlcr/msort/tsorts.cpp +++ b/thorlcr/msort/tsorts.cpp @@ -1338,7 +1338,7 @@ class CThorSorter : public CSimpleInterface, implements IThorSorter, implements throw; } - mergeStats(spillStats, sortedloader); + mergeRemappedStats(spillStats, sortedloader, diskToTempStatsMap); if (!abort) { diff --git a/thorlcr/thorutil/thbuf.cpp b/thorlcr/thorutil/thbuf.cpp index 2e45c7c78d8..ad2d42bdb60 100644 --- a/thorlcr/thorutil/thbuf.cpp +++ b/thorlcr/thorutil/thbuf.cpp @@ -2449,7 +2449,7 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOfflush(); tempFileOwner->noteSize(iFileIO->getStatistic(StSizeDiskWrite)); - updateRemappedStatsDelta(inactiveStats, previousFileStats, iFileIO, diskToTempStatsMap); // NB: also updates prev to current + updateStatsDelta(stats, previousFileStats, iFileIO); // NB: also updates prev to current previousFileStats.reset(); iFileIO.clear(); } } void createOutputStream() { - closeWriter(); // Ensure stats from closing files are preserved in inactiveStats + closeWriter(); // Ensure stats from closing files are preserved in stats // NB: Called once, when spilling starts. tempFileOwner.setown(activity.createOwnedTempFile(baseTmpFilename)); auto res = createSerialOutputStream(&(tempFileOwner->queryIFile()), compressHandler, options, numOutputs + 1); outputStream.setown(std::get<0>(res)); iFileIO.setown(std::get<1>(res)); totalInputRowsRead = inMemTotalRows; - inactiveStats.addStatistic(StNumSpills, 1); + stats.addStatistic(StNumSpills, 1); } void writeRowsFromInput() { @@ -2539,7 +2539,7 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOfflush(); totalInputRowsRead.fetch_add(newRowsWritten); tempFileOwner->noteSize(iFileIO->getStatistic(StSizeDiskWrite)); - updateRemappedStatsDelta(inactiveStats, previousFileStats, iFileIO, diskToTempStatsMap); // NB: also updates prev to current + updateStatsDelta(stats, previousFileStats, iFileIO); // NB: also updates prev to current // JCSMORE - could track size written, and start new file at this point (e.g. every 100MB), // and track their starting points (by row #) in a vector // We could then tell if/when the readers catch up, and remove consumed files as they do. @@ -2553,7 +2553,7 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOfqueryRowMetaData()), serializer(rowIf->queryRowSerializer()), allocator(rowIf->queryRowAllocator()), deserializer(rowIf->queryRowDeserializer()), - inactiveStats(spillStatistics), previousFileStats(spillStatistics) + stats(tempFileStatistics), previousFileStats(tempFileStatistics) { assertex(input); @@ -2717,7 +2717,7 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf spillFile; + CRuntimeStatisticCollection stats; bool spillRows() { @@ -248,13 +249,13 @@ class CSpillableStreamBase : public CSpillable spillFile.setown(activity.createOwnedTempFile(tempName.str())); VStringBuffer spillPrefixStr("SpillableStream(%u)", spillPriority); rows.save(*spillFile, spillCompInfo, false, spillPrefixStr.str()); // saves committed rows + mergeStats(stats, &rows); rows.kill(); // no longer needed, readers will pull from spillFile. NB: ok to kill array as rows is never written to or expanded - spillFile->noteSize(spillFile->queryIFile().size()); return true; } public: CSpillableStreamBase(CActivityBase &_activity, CThorSpillableRowArray &inRows, IThorRowInterfaces *_rowIf, EmptyRowSemantics _emptyRowSemantics, unsigned _spillPriority) - : CSpillable(_activity, _rowIf, _spillPriority), rows(_activity), emptyRowSemantics(_emptyRowSemantics) + : CSpillable(_activity, _rowIf, _spillPriority), rows(_activity), emptyRowSemantics(_emptyRowSemantics), stats(diskRemoteStatistics) { assertex(inRows.isFlushed()); spillCompInfo = 0x0; @@ -265,6 +266,10 @@ class CSpillableStreamBase : public CSpillable { ensureSpillingCallbackRemoved(); } + unsigned __int64 getStatistic(StatisticKind kind) const + { + return stats.getStatisticValue(kind); + } // IBufferedRowCallback virtual bool freeBufferedRows(bool critical) override { @@ -1328,13 +1333,13 @@ void CThorSpillableRowArray::safeUnregisterWriteCallback(IWritePosCallback &cb) } CThorSpillableRowArray::CThorSpillableRowArray(CActivityBase &activity) - : CThorExpandingRowArray(activity) + : CThorExpandingRowArray(activity), stats(tempFileStatistics) { throwOnOom = false; } CThorSpillableRowArray::CThorSpillableRowArray(CActivityBase &activity, IThorRowInterfaces *rowIf, EmptyRowSemantics emptyRowSemantics, StableSortFlag stableSort, rowidx_t initialSize, size32_t _commitDelta) - : CThorExpandingRowArray(activity, rowIf, ers_forbidden, stableSort, false, initialSize), commitDelta(_commitDelta) + : CThorExpandingRowArray(activity, rowIf, ers_forbidden, stableSort, false, initialSize), commitDelta(_commitDelta), stats(tempFileStatistics) { } @@ -1363,6 +1368,7 @@ void CThorSpillableRowArray::kill() { clearRows(); CThorExpandingRowArray::kill(); + stats.reset(); } void CThorSpillableRowArray::sort(ICompare &compare, unsigned maxCores) @@ -1413,7 +1419,8 @@ rowidx_t CThorSpillableRowArray::save(CFileOwner &iFileOwner, unsigned _spillCom nextCB = &cbCopy.popGet(); nextCBI = nextCB->queryRecordNumber(); } - Owned writer = createRowWriter(&iFileOwner.queryIFile(), rowIf, rwFlags, nullptr, compBlkSz); + OwnedIFileIO iFileIO = iFileOwner.queryIFile().open(IFOcreate); + Owned writer = createRowWriter(iFileIO, rowIf, rwFlags, nullptr, compBlkSz); rowidx_t i=0; rowidx_t rowsWritten=0; try @@ -1452,7 +1459,6 @@ rowidx_t CThorSpillableRowArray::save(CFileOwner &iFileOwner, unsigned _spillCom ++i; } writer->flush(NULL); - iFileOwner.noteSize(writer->getStatistic(StSizeDiskWrite)); } catch (IException *e) { @@ -1463,6 +1469,10 @@ rowidx_t CThorSpillableRowArray::save(CFileOwner &iFileOwner, unsigned _spillCom firstRow += n; offset_t bytesWritten = writer->getPosition(); writer.clear(); + mergeStats(stats, iFileIO); + offset_t sizeTempFile = iFileIO->getStatistic(StSizeDiskWrite); + iFileOwner.noteSize(sizeTempFile); + stats.addStatistic(StNumSpills, 1); ActPrintLog(&activity, "%s: CThorSpillableRowArray::save done, rows written = %" RIPF "u, bytes = %" I64F "u, firstRow = %u", _tracingPrefix, rowsWritten, (__int64)bytesWritten, firstRow); return rowsWritten; } @@ -1638,11 +1648,7 @@ class CThorRowCollectorBase : public CSpillable Owned spillableRowSet; unsigned options = 0; unsigned spillCompInfo = 0; - RelaxedAtomic statOverflowCount{0}; - RelaxedAtomic statSizeSpill{0}; - RelaxedAtomic<__uint64> statSpillCycles{0}; RelaxedAtomic<__uint64> statSortCycles{0}; - bool spillRows(bool critical) { //This must only be called while a lock is held on spillableRows @@ -1668,11 +1674,6 @@ class CThorRowCollectorBase : public CSpillable spillableRows.save(*tempFileOwner, spillCompInfo, false, spillPrefixStr.str()); // saves committed rows spillFiles.append(tempFileOwner.getLink()); ++overflowCount; - statOverflowCount.fastAdd(1); // NB: this is total over multiple uses of this class - offset_t tempFileSize = tempFileOwner->queryIFile().size(); - statSizeSpill.fastAdd(tempFileSize); - tempFileOwner->noteSize(tempFileSize); - statSpillCycles.fastAdd(spillTimer.elapsedCycles()); return true; } void setEmptyRowSemantics(EmptyRowSemantics _emptyRowSemantics) @@ -1960,26 +1961,17 @@ class CThorRowCollectorBase : public CSpillable { options = _options; } - unsigned __int64 getStatistic(StatisticKind kind) + unsigned __int64 getStatistic(StatisticKind kind) const { switch (kind) { - case StCycleSpillElapsedCycles: - return statSpillCycles; case StCycleSortElapsedCycles: return statSortCycles; - case StTimeSpillElapsed: - return cycle_to_nanosec(statSpillCycles); case StTimeSortElapsed: return cycle_to_nanosec(statSortCycles); - case StNumSpills: - return statOverflowCount; - case StSizeSpillFile: - return statSizeSpill; default: - break; + return spillableRows.getStatistic(kind); } - return 0; } bool hasSpilt() const { return overflowCount >= 1; } @@ -2048,7 +2040,7 @@ class CThorRowLoader : public CThorRowCollectorBase, implements IThorRowLoader } virtual void resize(rowidx_t max) override { CThorRowCollectorBase::resize(max); } virtual void setOptions(unsigned options) override { CThorRowCollectorBase::setOptions(options); } - virtual unsigned __int64 getStatistic(StatisticKind kind) override { return CThorRowCollectorBase::getStatistic(kind); } + virtual unsigned __int64 getStatistic(StatisticKind kind) const override { return CThorRowCollectorBase::getStatistic(kind); } virtual bool hasSpilt() const override { return CThorRowCollectorBase::hasSpilt(); } virtual void setTracingPrefix(const char *tracing) override { CThorRowCollectorBase::setTracingPrefix(tracing); } virtual void reset() override { CThorRowCollectorBase::reset(); } @@ -2103,7 +2095,7 @@ class CThorRowCollector : public CThorRowCollectorBase, implements IThorRowColle } virtual void resize(rowidx_t max) override { CThorRowCollectorBase::resize(max); } virtual void setOptions(unsigned options) override { CThorRowCollectorBase::setOptions(options); } - virtual unsigned __int64 getStatistic(StatisticKind kind) override { return CThorRowCollectorBase::getStatistic(kind); } + virtual unsigned __int64 getStatistic(StatisticKind kind) const override { return CThorRowCollectorBase::getStatistic(kind); } virtual bool hasSpilt() const override { return CThorRowCollectorBase::hasSpilt(); } virtual void setTracingPrefix(const char *tracing) override { CThorRowCollectorBase::setTracingPrefix(tracing); } // IThorArrayLock diff --git a/thorlcr/thorutil/thmem.hpp b/thorlcr/thorutil/thmem.hpp index 8e4f1b896a8..a89d4cdd0f2 100644 --- a/thorlcr/thorutil/thmem.hpp +++ b/thorlcr/thorutil/thmem.hpp @@ -413,7 +413,7 @@ class graph_decl CThorSpillableRowArray : private CThorExpandingRowArray, implem mutable CriticalSection cs; ICopyArrayOf writeCallbacks; size32_t compBlkSz = 0; // means use default - + CRuntimeStatisticCollection stats; // reset after each kill bool _flush(bool force); void doFlush(); inline bool needToMoveRows(bool force) { return (firstRow != 0 && (force || (firstRow >= commitRows/2))); } @@ -484,6 +484,10 @@ class graph_decl CThorSpillableRowArray : private CThorExpandingRowArray, implem inline rowidx_t numCommitted() const { return commitRows - firstRow; } //MORE::Not convinced this is very safe! inline rowidx_t queryTotalRows() const { return CThorExpandingRowArray::ordinality(); } // includes uncommited rows + inline unsigned __int64 getStatistic(StatisticKind kind) const + { + return stats.getStatisticValue(kind); + } // access to void swap(CThorSpillableRowArray &src); @@ -542,7 +546,7 @@ interface IThorRowCollectorCommon : extends IInterface, extends IThorArrayLock virtual void setup(ICompare *iCompare, StableSortFlag stableSort=stableSort_none, RowCollectorSpillFlags diskMemMix=rc_mixed, unsigned spillPriority=50) = 0; virtual void resize(rowidx_t max) = 0; virtual void setOptions(unsigned options) = 0; - virtual unsigned __int64 getStatistic(StatisticKind kind) = 0; + virtual unsigned __int64 getStatistic(StatisticKind kind) const = 0; virtual bool hasSpilt() const = 0; // equivalent to numOverlows() >= 1 virtual void setTracingPrefix(const char *tracing) = 0; virtual void reset() = 0; diff --git a/thorlcr/thorutil/thormisc.cpp b/thorlcr/thorutil/thormisc.cpp index afc06cee34a..da8a8b06144 100644 --- a/thorlcr/thorutil/thormisc.cpp +++ b/thorlcr/thorutil/thormisc.cpp @@ -97,6 +97,7 @@ const StatisticsMapping hashDedupActivityStatistics({}, diskWriteRemoteStatistic const StatisticsMapping hashDistribActivityStatistics({StNumLocalRows, StNumRemoteRows, StSizeRemoteWrite}, basicActivityStatistics); const StatisticsMapping loopActivityStatistics({StNumIterations}, basicActivityStatistics); const StatisticsMapping graphStatistics({StNumExecutions, StSizeSpillFile, StSizeGraphSpill, StSizePeakTempDisk, StSizePeakEphemeralDisk, StTimeUser, StTimeSystem, StNumContextSwitches, StSizeMemory, StSizePeakMemory, StSizeRowMemory, StSizePeakRowMemory}, executeStatistics); +const StatisticsMapping tempFileStatistics({StNumSpills}, diskRemoteStatistics); const StatKindMap diskToTempStatsMap ={ {StSizeDiskWrite, StSizeSpillFile}, diff --git a/thorlcr/thorutil/thormisc.hpp b/thorlcr/thorutil/thormisc.hpp index c2d05fccb31..49d30f1e7aa 100644 --- a/thorlcr/thorutil/thormisc.hpp +++ b/thorlcr/thorutil/thormisc.hpp @@ -175,6 +175,8 @@ extern graph_decl const StatisticsMapping soapcallActivityStatistics; extern graph_decl const StatisticsMapping indexReadFileStatistics; extern graph_decl const StatisticsMapping hashDedupActivityStatistics; extern graph_decl const StatisticsMapping hashDistribActivityStatistics; +extern graph_decl const StatisticsMapping tempFileStatistics; + // Maps disk related stats to spill stats extern graph_decl const std::map diskToTempStatsMap;