From c5c0127a8b2277e552961eb511c4c6a8cfc55bf3 Mon Sep 17 00:00:00 2001 From: Shamser Ahmed Date: Mon, 24 Jun 2024 12:11:04 +0100 Subject: [PATCH] HPCC-32000 Spill stats for new nsplitter Signed-off-by: Shamser Ahmed --- .../activities/nsplitter/thnsplitterslave.cpp | 6 +- thorlcr/thorutil/thbuf.cpp | 73 +++++++++++-------- thorlcr/thorutil/thbuf.hpp | 2 +- thorlcr/thorutil/thormisc.cpp | 1 + thorlcr/thorutil/thormisc.hpp | 1 + 5 files changed, 49 insertions(+), 34 deletions(-) diff --git a/thorlcr/activities/nsplitter/thnsplitterslave.cpp b/thorlcr/activities/nsplitter/thnsplitterslave.cpp index b19c92f2995..de5e5db6c54 100644 --- a/thorlcr/activities/nsplitter/thnsplitterslave.cpp +++ b/thorlcr/activities/nsplitter/thnsplitterslave.cpp @@ -401,11 +401,11 @@ class NSplitterSlaveActivity : public CSlaveActivity, implements ISharedSmartBuf if (sharedRowStream) sharedRowStream->cancel(); } - virtual void gatherActiveStats(CRuntimeStatisticCollection &activeStats) const + virtual void gatherActiveStats(CRuntimeStatisticCollection &activeStats) const override { PARENT::gatherActiveStats(activeStats); - if (sharedSmartRowWriter) - mergeStats(activeStats, sharedSmartRowWriter); + if (sharedRowStream) + sharedRowStream->mergeStats(activeStats); } // ISharedSmartBufferCallback impl. virtual void paged() { pagedOut = true; } diff --git a/thorlcr/thorutil/thbuf.cpp b/thorlcr/thorutil/thbuf.cpp index 0b92f4ec559..9c0e6dcfa4d 100644 --- a/thorlcr/thorutil/thbuf.cpp +++ b/thorlcr/thorutil/thbuf.cpp @@ -1175,9 +1175,8 @@ class CSharedWriteAheadBase : public CSimpleInterface, implements ISharedSmartBu queryCOutput(c).reset(); inMemRows->reset(0); } - virtual unsigned __int64 getStatistic(StatisticKind kind) const override + virtual void mergeStats(CRuntimeStatisticCollection & target) const override { - return 0; } friend class COutput; friend class CRowSet; @@ -1233,8 +1232,6 @@ class CSharedWriteAheadDisk : public CSharedWriteAheadBase Linked allocator; Linked deserializer; IOutputMetaData *serializeMeta; - size_t tempSizeSpilled = 0; - stat_type tempFileElapsedCycles = 0; struct AddRemoveFreeChunk { @@ -1513,10 +1510,7 @@ class CSharedWriteAheadDisk : public CSharedWriteAheadBase mb.append((byte)0); size32_t len = mb.length(); chunk.setown(getOutOffset(len)); // will find space for 'len', might be bigger if from free list - CCycleTimer startCycles; tempFileIO->write(chunk->offset, len, mb.toByteArray()); - tempFileElapsedCycles += startCycles.elapsedCycles(); - tempSizeSpilled += len; tempFileOwner->noteSize(highOffset); #ifdef TRACE_WRITEAHEAD ActPrintLogEx(&activity->queryContainer(), thorlog_all, MCdebugProgress, "Flushed chunk = %d (savedChunks pos=%d), writeOffset = %" I64F "d, writeSize = %d", inMemRows->queryChunk(), savedChunks.ordinality(), chunk->offset, len); @@ -1576,19 +1570,19 @@ class CSharedWriteAheadDisk : public CSharedWriteAheadBase tempFileIO->setSize(0); tempFileOwner->noteSize(0); } - virtual unsigned __int64 getStatistic(StatisticKind kind) const override + virtual void mergeStats(CRuntimeStatisticCollection & target) const override { - switch(kind) + ::mergeStats(target, tempFileIO); + unsigned __int64 diskWriteSize = target.queryStatistic(StSizeDiskWrite).getClear(); + if (diskWriteSize) + target.setStatistic(StSizeSpillFile, diskWriteSize); + unsigned __int64 ioTime = target.queryStatistic(StCycleDiskWriteIOCycles).getClear(); + if (ioTime) { - case StSizeSpillFile: - return tempSizeSpilled; - case StTimeSpillElapsed: - return cycle_to_nanosec(tempFileElapsedCycles); - case StNumSpills: - return 1; - default: - return 0; + target.queryStatistic(StTimeDiskWriteIO).getClear(); + target.setStatistic(StTimeSpillElapsed, ioTime); } + target.setStatistic(StNumSpills, 1); } }; @@ -1877,7 +1871,7 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf totalInputRowsRead = 0; // not used until spilling begins, represents count of all rows read rowcount_t inMemTotalRows = 0; // whilst in memory, represents count of all rows seen CriticalSection readAheadCS; // ensure single reader (leader), reads ahead (updates rows/totalInputRowsRead/inMemTotalRows) - Owned iFile; + Owned tempFileOwner; Owned iFileIO; Owned outputStream; Linked compressHandler; @@ -1887,6 +1881,8 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOfflush(); + tempFileOwner->noteSize(iFileIO->getStatistic(StSizeDiskWrite)); + ::mergeStats(inactiveStats, iFileIO); iFileIO.clear(); outputStream.clear(); } void createOutputStream() { // NB: Called once, when spilling starts. - iFileIO.setown(iFile->open(IFOcreate)); // kept for stats purposes + iFileIO.setown(tempFileOwner->queryIFile().open(IFOcreate)); // kept for stats purposes Owned out = createSerialOutputStream(iFileIO); outputStream.setown(createBufferedOutputStream(out, options.storageBlockSize)); //prefered plane block size if (compressHandler) @@ -1969,7 +1968,7 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOfflush(); totalInputRowsRead.fetch_add(newRowsWritten); - + tempFileOwner->noteSize(iFileIO->getStatistic(StSizeDiskWrite)); // JCSMORE - could track size written, and start new file at this point (e.g. every 100MB), // and track their starting points (by row #) in a vector // We could then tell if/when the readers catch up, and remove consumed files as they do. @@ -1982,7 +1981,8 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOfqueryRowMetaData()), serializer(rowIf->queryRowSerializer()), allocator(rowIf->queryRowAllocator()), deserializer(rowIf->queryRowDeserializer()) + meta(rowIf->queryRowMetaData()), serializer(rowIf->queryRowSerializer()), allocator(rowIf->queryRowAllocator()), deserializer(rowIf->queryRowDeserializer()), + inactiveStats(spillingWriteAheadStatistics) { assertex(input); @@ -2004,15 +2004,12 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOfremove(); - } freeRows(); } void outputStopped(unsigned output) @@ -2031,15 +2028,15 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOfqueryFilename(), tracing.str()); + activity.ActPrintLog("CSharedFullSpillingWriteAhead: removing spill file: %s%s", tempFileOwner->queryIFile().queryFilename(), tracing.str()); closeWriter(); - iFile->remove(); + tempFileOwner.clear(); } } } std::tuple getReadStream() // also pass back IFileIO for stats purposes { - Owned iFileIO = iFile->open(IFOread); + Owned iFileIO = tempFileOwner->queryIFile().open(IFOread); Owned in = createSerialInputStream(iFileIO); Owned inputStream = createBufferedInputStream(in, options.storageBlockSize, 0); if (compressHandler) @@ -2096,7 +2093,7 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf= options.inMemMaxMem) // too much in memory, spill { // NB: this will reset rowMemUsage, however, each reader will continue to consume rows until they catch up (or stop) - ActPrintLog(&activity, "Spilling to temp storage [file = %s, outputRowsAvailable = %" I64F "u, start = %" I64F "u, end = %" I64F "u, count = %u]", iFile->queryFilename(), outputRowsAvailable, inMemTotalRows - rows.size(), inMemTotalRows, (unsigned)rows.size()); + ActPrintLog(&activity, "Spilling to temp storage [file = %s, outputRowsAvailable = %" I64F "u, start = %" I64F "u, end = %" I64F "u, count = %u]", tempFileOwner->queryIFile().queryFilename(), outputRowsAvailable, inMemTotalRows - rows.size(), inMemTotalRows, (unsigned)rows.size()); createOutputStream(); return false; } @@ -2162,7 +2159,7 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOfremove(); + tempFileOwner.clear(); } for (auto &output: outputs) output->reset(); @@ -2174,6 +2171,22 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf