diff --git a/thorlcr/activities/nsplitter/thnsplitterslave.cpp b/thorlcr/activities/nsplitter/thnsplitterslave.cpp index 56c5dbe0c91..7b47c37539b 100644 --- a/thorlcr/activities/nsplitter/thnsplitterslave.cpp +++ b/thorlcr/activities/nsplitter/thnsplitterslave.cpp @@ -148,7 +148,7 @@ class NSplitterSlaveActivity : public CSlaveActivity, implements ISharedSmartBuf } } public: - NSplitterSlaveActivity(CGraphElementBase *_container) : CSlaveActivity(_container), writer(*this) + NSplitterSlaveActivity(CGraphElementBase *_container) : CSlaveActivity(_container, nsplitterActivityStatistics), writer(*this) { numOutputs = container.getOutputs(); connectedOutputSet.setown(createBitSet()); @@ -360,6 +360,12 @@ class NSplitterSlaveActivity : public CSlaveActivity, implements ISharedSmartBuf if (smartBuf) smartBuf->cancel(); } + virtual void gatherActiveStats(CRuntimeStatisticCollection &activeStats) const + { + PARENT::gatherActiveStats(activeStats); + if (smartBuf) + mergeStats(activeStats, smartBuf); + } // ISharedSmartBufferCallback impl. virtual void paged() { pagedOut = true; } virtual void blocked() diff --git a/thorlcr/master/thactivitymaster.cpp b/thorlcr/master/thactivitymaster.cpp index 82ca2d647a9..f80ee0bd258 100644 --- a/thorlcr/master/thactivitymaster.cpp +++ b/thorlcr/master/thactivitymaster.cpp @@ -170,7 +170,6 @@ class CGenericMasterGraphElement : public CMasterGraphElement case TAKcase: case TAKchildcase: case TAKdegroup: - case TAKsplit: case TAKproject: case TAKprefetchproject: case TAKprefetchcountproject: @@ -210,6 +209,9 @@ class CGenericMasterGraphElement : public CMasterGraphElement case TAKemptyaction: ret = new CMasterActivity(this); break; + case TAKsplit: + ret = new CMasterActivity(this, nsplitterActivityStatistics); + break; case TAKsoap_rowdataset: case TAKsoap_rowaction: case TAKsoap_datasetdataset: diff --git a/thorlcr/thorutil/thbuf.cpp b/thorlcr/thorutil/thbuf.cpp index d64d06da1a2..43798dd73bb 100644 --- a/thorlcr/thorutil/thbuf.cpp +++ b/thorlcr/thorutil/thbuf.cpp @@ -1175,6 +1175,10 @@ class CSharedWriteAheadBase : public CSimpleInterface, implements ISharedSmartBu queryCOutput(c).reset(); inMemRows->reset(0); } + virtual unsigned __int64 getStatistic(StatisticKind kind) override + { + return 0; + } friend class COutput; friend class CRowSet; }; @@ -1197,7 +1201,7 @@ bool CRowSet::Release() const class CSharedWriteAheadDisk : public CSharedWriteAheadBase { - Owned spillFile; + Owned spillFile; Owned spillFileIO; CIArrayOf freeChunks; PointerArrayOf freeChunksSized; @@ -1206,6 +1210,8 @@ class CSharedWriteAheadDisk : public CSharedWriteAheadBase Linked allocator; Linked deserializer; IOutputMetaData *serializeMeta; + size_t sizeSpilled = 0; + stat_type spillElapsedCycles = 0; struct AddRemoveFreeChunk { @@ -1484,7 +1490,11 @@ class CSharedWriteAheadDisk : public CSharedWriteAheadBase mb.append((byte)0); size32_t len = mb.length(); chunk.setown(getOutOffset(len)); // will find space for 'len', might be bigger if from free list + CCycleTimer startCycles; spillFileIO->write(chunk->offset, len, mb.toByteArray()); + spillElapsedCycles += startCycles.elapsedCycles(); + sizeSpilled += len; + spillFile->noteSize(highOffset); #ifdef TRACE_WRITEAHEAD ActPrintLogEx(&activity->queryContainer(), thorlog_all, MCdebugProgress, "Flushed chunk = %d (savedChunks pos=%d), writeOffset = %" I64F "d, writeSize = %d", inMemRows->queryChunk(), savedChunks.ordinality(), chunk->offset, len); #endif @@ -1507,16 +1517,15 @@ class CSharedWriteAheadDisk : public CSharedWriteAheadBase allocator(rowIf->queryRowAllocator()), deserializer(rowIf->queryRowDeserializer()), serializeMeta(meta->querySerializedDiskMeta()) { assertex(spillName); - spillFile.setown(createIFile(spillName)); - spillFile->setShareMode(IFSHnone); - spillFileIO.setown(spillFile->open(IFOcreaterw)); + Owned tempFile = createIFile(spillName); + tempFile->setShareMode(IFSHnone); + spillFile.setown(new CFileOwner(tempFile, activity->queryTempFileSizeTracker())); + spillFileIO.setown(spillFile->queryIFile().open(IFOcreaterw)); highOffset = 0; } ~CSharedWriteAheadDisk() { spillFileIO.clear(); - if (spillFile) - spillFile->remove(); for (;;) { @@ -1538,6 +1547,20 @@ class CSharedWriteAheadDisk : public CSharedWriteAheadBase highOffset = 0; spillFileIO->setSize(0); } + virtual unsigned __int64 getStatistic(StatisticKind kind) override + { + switch(kind) + { + case StSizeSpillFile: + return sizeSpilled; + case StTimeSpillElapsed: + return cycle_to_nanosec(spillElapsedCycles); + case StNumSpills: + return 1; + default: + return 0; + } + } }; ISharedSmartBuffer *createSharedSmartDiskBuffer(CActivityBase *activity, const char *spillname, unsigned outputs, IThorRowInterfaces *rowIf) diff --git a/thorlcr/thorutil/thbuf.hpp b/thorlcr/thorutil/thbuf.hpp index f75b353951f..15a6813828e 100644 --- a/thorlcr/thorutil/thbuf.hpp +++ b/thorlcr/thorutil/thbuf.hpp @@ -69,6 +69,7 @@ interface ISharedSmartBuffer : extends IRowWriter virtual IRowStream *queryOutput(unsigned output) = 0; virtual void cancel()=0; virtual void reset() = 0; + virtual unsigned __int64 getStatistic(StatisticKind kind) = 0; }; extern graph_decl ISharedSmartBuffer *createSharedSmartMemBuffer(CActivityBase *activity, unsigned outputs, IThorRowInterfaces *rowif, unsigned buffSize=((unsigned)-1)); diff --git a/thorlcr/thorutil/thormisc.cpp b/thorlcr/thorutil/thormisc.cpp index a97fc94c5f8..1ec2f83d2c6 100644 --- a/thorlcr/thorutil/thormisc.cpp +++ b/thorlcr/thorutil/thormisc.cpp @@ -73,7 +73,7 @@ Owned globals; static Owned ClusterMPAllocator; // stat. mappings shared between master and slave activities -const StatisticsMapping spillStatistics({StTimeSpillElapsed, StTimeSortElapsed, StNumSpills, StSizeSpillFile}); +const StatisticsMapping spillStatistics({StTimeSpillElapsed, StTimeSortElapsed, StNumSpills, StSizeSpillFile, StSizePeakTempDisk}); const StatisticsMapping soapcallStatistics({StTimeSoapcall}); const StatisticsMapping basicActivityStatistics({StTimeTotalExecute, StTimeLocalExecute, StTimeBlocked}); const StatisticsMapping groupActivityStatistics({StNumGroups, StNumGroupMax}, basicActivityStatistics); @@ -92,8 +92,9 @@ const StatisticsMapping graphStatistics({StNumExecutions, StSizeSpillFile, StSiz const StatisticsMapping diskReadPartStatistics({StNumDiskRowsRead}, diskReadRemoteStatistics); const StatisticsMapping indexDistribActivityStatistics({}, basicActivityStatistics, jhtreeCacheStatistics); const StatisticsMapping soapcallActivityStatistics({}, basicActivityStatistics, soapcallStatistics); -const StatisticsMapping hashDedupActivityStatistics({StNumSpills, StSizeSpillFile, StTimeSortElapsed, StSizePeakTempDisk}, diskWriteRemoteStatistics, basicActivityStatistics); +const StatisticsMapping hashDedupActivityStatistics({}, spillStatistics, diskWriteRemoteStatistics, basicActivityStatistics); const StatisticsMapping hashDistribActivityStatistics({StNumLocalRows, StNumRemoteRows, StSizeRemoteWrite}, basicActivityStatistics); +const StatisticsMapping nsplitterActivityStatistics({}, spillStatistics, basicActivityStatistics); MODULE_INIT(INIT_PRIORITY_STANDARD) { diff --git a/thorlcr/thorutil/thormisc.hpp b/thorlcr/thorutil/thormisc.hpp index 2bc8f0c776d..c37a8433cde 100644 --- a/thorlcr/thorutil/thormisc.hpp +++ b/thorlcr/thorutil/thormisc.hpp @@ -156,6 +156,7 @@ extern graph_decl const StatisticsMapping soapcallActivityStatistics; extern graph_decl const StatisticsMapping indexReadFileStatistics; extern graph_decl const StatisticsMapping hashDedupActivityStatistics; extern graph_decl const StatisticsMapping hashDistribActivityStatistics; +extern graph_decl const StatisticsMapping nsplitterActivityStatistics; class BooleanOnOff { @@ -356,9 +357,14 @@ class graph_decl CFileOwner : public CSimpleInterface, implements IInterface } void noteSize(offset_t size) { - fileSize = size; - if (fileSizeTracker) - fileSizeTracker->growSize(fileSize); + if (fileSizeTracker && fileSize!=size) + { + if (size > fileSize) + fileSizeTracker->growSize(size-fileSize); + else + fileSizeTracker->shrinkSize(fileSize-size); + fileSize = size; + } } IFile &queryIFile() const { return *iFile; } };