diff --git a/thorlcr/activities/nsplitter/thnsplitterslave.cpp b/thorlcr/activities/nsplitter/thnsplitterslave.cpp index de22da08908..ca9ccd0d6df 100644 --- a/thorlcr/activities/nsplitter/thnsplitterslave.cpp +++ b/thorlcr/activities/nsplitter/thnsplitterslave.cpp @@ -152,7 +152,7 @@ class NSplitterSlaveActivity : public CSlaveActivity, implements ISharedSmartBuf } } public: - NSplitterSlaveActivity(CGraphElementBase *_container) : CSlaveActivity(_container), writer(*this) + NSplitterSlaveActivity(CGraphElementBase *_container) : CSlaveActivity(_container, nsplitterActivityStatistics), writer(*this) { numOutputs = container.getOutputs(); connectedOutputSet.setown(createBitSet()); @@ -401,6 +401,12 @@ class NSplitterSlaveActivity : public CSlaveActivity, implements ISharedSmartBuf if (sharedRowStream) sharedRowStream->cancel(); } + virtual void gatherActiveStats(CRuntimeStatisticCollection &activeStats) const override + { + PARENT::gatherActiveStats(activeStats); + if (sharedRowStream) + ::mergeStats(activeStats, sharedRowStream); + } // ISharedSmartBufferCallback impl. virtual void paged() { pagedOut = true; } virtual void blocked() diff --git a/thorlcr/master/thactivitymaster.cpp b/thorlcr/master/thactivitymaster.cpp index 82ca2d647a9..f80ee0bd258 100644 --- a/thorlcr/master/thactivitymaster.cpp +++ b/thorlcr/master/thactivitymaster.cpp @@ -170,7 +170,6 @@ class CGenericMasterGraphElement : public CMasterGraphElement case TAKcase: case TAKchildcase: case TAKdegroup: - case TAKsplit: case TAKproject: case TAKprefetchproject: case TAKprefetchcountproject: @@ -210,6 +209,9 @@ class CGenericMasterGraphElement : public CMasterGraphElement case TAKemptyaction: ret = new CMasterActivity(this); break; + case TAKsplit: + ret = new CMasterActivity(this, nsplitterActivityStatistics); + break; case TAKsoap_rowdataset: case TAKsoap_rowaction: case TAKsoap_datasetdataset: diff --git a/thorlcr/thorutil/thbuf.cpp b/thorlcr/thorutil/thbuf.cpp index b1377f6db00..41fb29cb541 100644 --- a/thorlcr/thorutil/thbuf.cpp +++ b/thorlcr/thorutil/thbuf.cpp @@ -1753,6 +1753,10 @@ class CSharedWriteAheadBase : public CSimpleInterface, implements ISharedSmartBu queryCOutput(c).reset(); inMemRows->reset(0); } + virtual unsigned __int64 getStatistic(StatisticKind kind) const override + { + return 0; + } friend class COutput; friend class CRowSet; }; @@ -2145,6 +2149,24 @@ class CSharedWriteAheadDisk : public CSharedWriteAheadBase tempFileIO->setSize(0); tempFileOwner->noteSize(0); } + virtual unsigned __int64 getStatistic(StatisticKind kind) const override + { + switch (kind) + { + case StSizeSpillFile: + return tempFileIO->getStatistic(StSizeDiskWrite); + case StCycleDiskWriteIOCycles: + case StTimeDiskWriteIO: + case StSizeDiskWrite: + return 0; + case StNumSpills: + return 1; + case StTimeSpillElapsed: + return tempFileIO->getStatistic(StCycleDiskWriteIOCycles); + default: + return tempFileIO->getStatistic(kind); + } + } }; ISharedSmartBuffer *createSharedSmartDiskBuffer(CActivityBase *activity, const char *spillname, unsigned outputs, IThorRowInterfaces *rowIf) @@ -2433,7 +2455,7 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf totalInputRowsRead = 0; // not used until spilling begins, represents count of all rows read rowcount_t inMemTotalRows = 0; // whilst in memory, represents count of all rows seen CriticalSection readAheadCS; // ensure single reader (leader), reads ahead (updates rows/totalInputRowsRead/inMemTotalRows) - Owned iFile; + Owned tempFileOwner; Owned iFileIO; Owned outputStream; Linked compressHandler; @@ -2442,6 +2464,8 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOfflush(); + tempFileOwner->noteSize(iFileIO->getStatistic(StSizeDiskWrite)); + ::mergeStats(inactiveStats, iFileIO); + iFileIO.clear(); + outputStream.clear(); + } } void createOutputStream() { // NB: Called once, when spilling starts. - auto res = createSerialOutputStream(iFile, compressHandler, options, numOutputs + 1); + auto res = createSerialOutputStream(tempFileOwner->queryIFile(), compressHandler, options, numOutputs + 1); outputStream.setown(std::get<0>(res)); iFileIO.setown(std::get<1>(res)); totalInputRowsRead = inMemTotalRows; @@ -2517,7 +2547,7 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOfflush(); totalInputRowsRead.fetch_add(newRowsWritten); - + tempFileOwner->noteSize(iFileIO->getStatistic(StSizeDiskWrite)); // JCSMORE - could track size written, and start new file at this point (e.g. every 100MB), // and track their starting points (by row #) in a vector // We could then tell if/when the readers catch up, and remove consumed files as they do. @@ -2530,7 +2560,8 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOfqueryRowMetaData()), serializer(rowIf->queryRowSerializer()), allocator(rowIf->queryRowAllocator()), deserializer(rowIf->queryRowDeserializer()) + meta(rowIf->queryRowMetaData()), serializer(rowIf->queryRowSerializer()), allocator(rowIf->queryRowAllocator()), deserializer(rowIf->queryRowDeserializer()), + inactiveStats(spillingWriteAheadStatistics) { assertex(input); @@ -2541,15 +2572,11 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOfremove(); - } + closeWriter(); freeRows(); } void outputStopped(unsigned output) @@ -2568,15 +2595,15 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOfqueryFilename(), tracing.str()); + activity.ActPrintLog("CSharedFullSpillingWriteAhead::outputStopped closing tempfile writer: %s %s", tempFileOwner->queryIFile().queryFilename(), tracing.str()); closeWriter(); - iFile->remove(); + tempFileOwner->noteSize(0); } } } std::tuple getReadStream() // also pass back IFileIO for stats purposes { - return createSerialInputStream(iFile, compressHandler, options, numOutputs + 1); // +1 for writer + return createSerialInputStream(tempFileOwner->queryIFile(), compressHandler, options, numOutputs + 1); // +1 for writer } bool checkWriteAhead(rowcount_t &outputRowsAvailable) { @@ -2623,7 +2650,7 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf= options.inMemMaxMem) // too much in memory, spill { // NB: this will reset rowMemUsage, however, each reader will continue to consume rows until they catch up (or stop) - ActPrintLog(&activity, "Spilling to temp storage [file = %s, outputRowsAvailable = %" I64F "u, start = %" I64F "u, end = %" I64F "u, count = %u]", iFile->queryFilename(), outputRowsAvailable, inMemTotalRows - rows.size(), inMemTotalRows, (unsigned)rows.size()); + ActPrintLog(&activity, "Spilling to temp storage [file = %s, outputRowsAvailable = %" I64F "u, start = %" I64F "u, end = %" I64F "u, count = %u]", tempFileOwner->queryIFile().queryFilename(), outputRowsAvailable, inMemTotalRows - rows.size(), inMemTotalRows, (unsigned)rows.size()); createOutputStream(); return false; } @@ -2686,11 +2713,8 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOfremove(); - } + closeWriter(); + tempFileOwner->noteSize(0); for (auto &output: outputs) output->reset(); freeRows(); @@ -2701,6 +2725,32 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOfgetStatistic(useKind); + v += inactiveStats.getStatisticValue(useKind); + return v; + } }; ISharedRowStreamReader *createSharedFullSpillingWriteAhead(CActivityBase *_activity, unsigned numOutputs, IRowStream *_input, bool _inputGrouped, const SharedRowStreamReaderOptions &options, IThorRowInterfaces *_rowIf, const char *tempFileName, ICompressHandler *compressHandler) diff --git a/thorlcr/thorutil/thbuf.hpp b/thorlcr/thorutil/thbuf.hpp index 1750f63b007..fb5a66af8fa 100644 --- a/thorlcr/thorutil/thbuf.hpp +++ b/thorlcr/thorutil/thbuf.hpp @@ -87,6 +87,7 @@ interface ISharedRowStreamReader : extends IInterface virtual IRowStream *queryOutput(unsigned output) = 0; virtual void cancel()=0; virtual void reset() = 0; + virtual unsigned __int64 getStatistic(StatisticKind kind) const = 0; }; diff --git a/thorlcr/thorutil/thormisc.cpp b/thorlcr/thorutil/thormisc.cpp index 48dc1231ee1..4d36f2e1686 100644 --- a/thorlcr/thorutil/thormisc.cpp +++ b/thorlcr/thorutil/thormisc.cpp @@ -92,8 +92,10 @@ const StatisticsMapping graphStatistics({StNumExecutions, StSizeSpillFile, StSiz const StatisticsMapping diskReadPartStatistics({StNumDiskRowsRead}, diskReadRemoteStatistics); const StatisticsMapping indexDistribActivityStatistics({}, basicActivityStatistics, jhtreeCacheStatistics); const StatisticsMapping soapcallActivityStatistics({}, basicActivityStatistics, soapcallStatistics); -const StatisticsMapping hashDedupActivityStatistics({StNumSpills, StSizeSpillFile, StTimeSortElapsed, StSizePeakTempDisk}, diskWriteRemoteStatistics, basicActivityStatistics); +const StatisticsMapping hashDedupActivityStatistics({}, spillStatistics, diskWriteRemoteStatistics, basicActivityStatistics); const StatisticsMapping hashDistribActivityStatistics({StNumLocalRows, StNumRemoteRows, StSizeRemoteWrite}, basicActivityStatistics); +const StatisticsMapping nsplitterActivityStatistics({}, spillStatistics, basicActivityStatistics); +const StatisticsMapping spillingWriteAheadStatistics({}, spillStatistics); MODULE_INIT(INIT_PRIORITY_STANDARD) { diff --git a/thorlcr/thorutil/thormisc.hpp b/thorlcr/thorutil/thormisc.hpp index d760f3d06da..4ba1cc18664 100644 --- a/thorlcr/thorutil/thormisc.hpp +++ b/thorlcr/thorutil/thormisc.hpp @@ -166,6 +166,8 @@ extern graph_decl const StatisticsMapping soapcallActivityStatistics; extern graph_decl const StatisticsMapping indexReadFileStatistics; extern graph_decl const StatisticsMapping hashDedupActivityStatistics; extern graph_decl const StatisticsMapping hashDistribActivityStatistics; +extern graph_decl const StatisticsMapping nsplitterActivityStatistics; +extern graph_decl const StatisticsMapping spillingWriteAheadStatistics; class BooleanOnOff {