Skip to content

Commit

Permalink
HPCC-32000 Spill stats for new nsplitter
Browse files Browse the repository at this point in the history
Signed-off-by: Shamser Ahmed <[email protected]>
  • Loading branch information
shamser committed Jun 25, 2024
1 parent f4392a3 commit c5c0127
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 34 deletions.
6 changes: 3 additions & 3 deletions thorlcr/activities/nsplitter/thnsplitterslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -401,11 +401,11 @@ class NSplitterSlaveActivity : public CSlaveActivity, implements ISharedSmartBuf
if (sharedRowStream)
sharedRowStream->cancel();
}
virtual void gatherActiveStats(CRuntimeStatisticCollection &activeStats) const
virtual void gatherActiveStats(CRuntimeStatisticCollection &activeStats) const override
{
PARENT::gatherActiveStats(activeStats);
if (sharedSmartRowWriter)
mergeStats(activeStats, sharedSmartRowWriter);
if (sharedRowStream)
sharedRowStream->mergeStats(activeStats);
}
// ISharedSmartBufferCallback impl.
virtual void paged() { pagedOut = true; }
Expand Down
73 changes: 43 additions & 30 deletions thorlcr/thorutil/thbuf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1175,9 +1175,8 @@ class CSharedWriteAheadBase : public CSimpleInterface, implements ISharedSmartBu
queryCOutput(c).reset();
inMemRows->reset(0);
}
virtual unsigned __int64 getStatistic(StatisticKind kind) const override
virtual void mergeStats(CRuntimeStatisticCollection & target) const override
{
return 0;
}
friend class COutput;
friend class CRowSet;
Expand Down Expand Up @@ -1233,8 +1232,6 @@ class CSharedWriteAheadDisk : public CSharedWriteAheadBase
Linked<IEngineRowAllocator> allocator;
Linked<IOutputRowDeserializer> deserializer;
IOutputMetaData *serializeMeta;
size_t tempSizeSpilled = 0;
stat_type tempFileElapsedCycles = 0;

struct AddRemoveFreeChunk
{
Expand Down Expand Up @@ -1513,10 +1510,7 @@ class CSharedWriteAheadDisk : public CSharedWriteAheadBase
mb.append((byte)0);
size32_t len = mb.length();
chunk.setown(getOutOffset(len)); // will find space for 'len', might be bigger if from free list
CCycleTimer startCycles;
tempFileIO->write(chunk->offset, len, mb.toByteArray());
tempFileElapsedCycles += startCycles.elapsedCycles();
tempSizeSpilled += len;
tempFileOwner->noteSize(highOffset);
#ifdef TRACE_WRITEAHEAD
ActPrintLogEx(&activity->queryContainer(), thorlog_all, MCdebugProgress, "Flushed chunk = %d (savedChunks pos=%d), writeOffset = %" I64F "d, writeSize = %d", inMemRows->queryChunk(), savedChunks.ordinality(), chunk->offset, len);
Expand Down Expand Up @@ -1576,19 +1570,19 @@ class CSharedWriteAheadDisk : public CSharedWriteAheadBase
tempFileIO->setSize(0);
tempFileOwner->noteSize(0);
}
virtual unsigned __int64 getStatistic(StatisticKind kind) const override
virtual void mergeStats(CRuntimeStatisticCollection & target) const override
{
switch(kind)
::mergeStats(target, tempFileIO);
unsigned __int64 diskWriteSize = target.queryStatistic(StSizeDiskWrite).getClear();
if (diskWriteSize)
target.setStatistic(StSizeSpillFile, diskWriteSize);
unsigned __int64 ioTime = target.queryStatistic(StCycleDiskWriteIOCycles).getClear();
if (ioTime)
{
case StSizeSpillFile:
return tempSizeSpilled;
case StTimeSpillElapsed:
return cycle_to_nanosec(tempFileElapsedCycles);
case StNumSpills:
return 1;
default:
return 0;
target.queryStatistic(StTimeDiskWriteIO).getClear();
target.setStatistic(StTimeSpillElapsed, ioTime);
}
target.setStatistic(StNumSpills, 1);
}
};

Expand Down Expand Up @@ -1877,7 +1871,7 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader
std::atomic<rowcount_t> totalInputRowsRead = 0; // not used until spilling begins, represents count of all rows read
rowcount_t inMemTotalRows = 0; // whilst in memory, represents count of all rows seen
CriticalSection readAheadCS; // ensure single reader (leader), reads ahead (updates rows/totalInputRowsRead/inMemTotalRows)
Owned<IFile> iFile;
Owned<CFileOwner> tempFileOwner;
Owned<IFileIO> iFileIO;
Owned<IBufferedSerialOutputStream> outputStream;
Linked<ICompressHandler> compressHandler;
Expand All @@ -1887,6 +1881,8 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader
SharedRowStreamReaderOptions options;
size32_t inMemReadAheadGranularity = 0;
size32_t compressionBlockSize = 0;
CRuntimeStatisticCollection inactiveStats;


rowcount_t getLowestOutput()
{
Expand All @@ -1912,13 +1908,16 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader
}
void closeWriter()
{
iFileIO->flush();
tempFileOwner->noteSize(iFileIO->getStatistic(StSizeDiskWrite));
::mergeStats(inactiveStats, iFileIO);
iFileIO.clear();
outputStream.clear();
}
void createOutputStream()
{
// NB: Called once, when spilling starts.
iFileIO.setown(iFile->open(IFOcreate)); // kept for stats purposes
iFileIO.setown(tempFileOwner->queryIFile().open(IFOcreate)); // kept for stats purposes
Owned<ISerialOutputStream> out = createSerialOutputStream(iFileIO);
outputStream.setown(createBufferedOutputStream(out, options.storageBlockSize)); //prefered plane block size
if (compressHandler)
Expand Down Expand Up @@ -1969,7 +1968,7 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader
}
outputStream->flush();
totalInputRowsRead.fetch_add(newRowsWritten);

tempFileOwner->noteSize(iFileIO->getStatistic(StSizeDiskWrite));
// JCSMORE - could track size written, and start new file at this point (e.g. every 100MB),
// and track their starting points (by row #) in a vector
// We could then tell if/when the readers catch up, and remove consumed files as they do.
Expand All @@ -1982,7 +1981,8 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader
public:
explicit CSharedFullSpillingWriteAhead(CActivityBase *_activity, unsigned numOutputs, IRowStream *_input, bool _inputGrouped, const SharedRowStreamReaderOptions &_options, IThorRowInterfaces *rowIf, const char *tempFileName, ICompressHandler *_compressHandler)
: activity(*_activity), input(_input), inputGrouped(_inputGrouped), options(_options), compressHandler(_compressHandler),
meta(rowIf->queryRowMetaData()), serializer(rowIf->queryRowSerializer()), allocator(rowIf->queryRowAllocator()), deserializer(rowIf->queryRowDeserializer())
meta(rowIf->queryRowMetaData()), serializer(rowIf->queryRowSerializer()), allocator(rowIf->queryRowAllocator()), deserializer(rowIf->queryRowDeserializer()),
inactiveStats(spillingWriteAheadStatistics)
{
assertex(input);

Expand All @@ -2004,15 +2004,12 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader
}
for (unsigned o=0; o<numOutputs; o++)
outputs.push_back(new COutputRowStream(*this, o));
iFile.setown(createIFile(tempFileName));
tempFileOwner.setown(activity.createOwnedTempFile(tempFileName));
}
~CSharedFullSpillingWriteAhead()
{
if (outputStream) // should have already been closed when inputs all stopped
{
closeWriter();
iFile->remove();
}
freeRows();
}
void outputStopped(unsigned output)
Expand All @@ -2031,15 +2028,15 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader
{
StringBuffer tracing;
getFileIOStats(tracing, iFileIO);
activity.ActPrintLog("CSharedFullSpillingWriteAhead: removing spill file: %s%s", iFile->queryFilename(), tracing.str());
activity.ActPrintLog("CSharedFullSpillingWriteAhead: removing spill file: %s%s", tempFileOwner->queryIFile().queryFilename(), tracing.str());
closeWriter();
iFile->remove();
tempFileOwner.clear();
}
}
}
std::tuple<IBufferedSerialInputStream *, IFileIO *> getReadStream() // also pass back IFileIO for stats purposes
{
Owned<IFileIO> iFileIO = iFile->open(IFOread);
Owned<IFileIO> iFileIO = tempFileOwner->queryIFile().open(IFOread);
Owned<ISerialInputStream> in = createSerialInputStream(iFileIO);
Owned<IBufferedSerialInputStream> inputStream = createBufferedInputStream(in, options.storageBlockSize, 0);
if (compressHandler)
Expand Down Expand Up @@ -2096,7 +2093,7 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader
if (rowsMemUsage >= options.inMemMaxMem) // too much in memory, spill
{
// NB: this will reset rowMemUsage, however, each reader will continue to consume rows until they catch up (or stop)
ActPrintLog(&activity, "Spilling to temp storage [file = %s, outputRowsAvailable = %" I64F "u, start = %" I64F "u, end = %" I64F "u, count = %u]", iFile->queryFilename(), outputRowsAvailable, inMemTotalRows - rows.size(), inMemTotalRows, (unsigned)rows.size());
ActPrintLog(&activity, "Spilling to temp storage [file = %s, outputRowsAvailable = %" I64F "u, start = %" I64F "u, end = %" I64F "u, count = %u]", tempFileOwner->queryIFile().queryFilename(), outputRowsAvailable, inMemTotalRows - rows.size(), inMemTotalRows, (unsigned)rows.size());
createOutputStream();
return false;
}
Expand Down Expand Up @@ -2162,7 +2159,7 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader
if (outputStream) // should have already been closed when inputs all stopped
{
closeWriter();
iFile->remove();
tempFileOwner.clear();
}
for (auto &output: outputs)
output->reset();
Expand All @@ -2174,6 +2171,22 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader
nextInputReadEog = false;
endOfInput = false;
}
virtual void mergeStats(CRuntimeStatisticCollection & target) const override
{
if (iFileIO)
::mergeStats(target, iFileIO);
target.merge(inactiveStats);
unsigned __int64 diskWriteSize = target.queryStatistic(StSizeDiskWrite).getClear();
if (diskWriteSize)
target.setStatistic(StSizeSpillFile, diskWriteSize);
unsigned __int64 ioTime = target.queryStatistic(StCycleDiskWriteIOCycles).getClear();
if (ioTime)
{
target.queryStatistic(StTimeDiskWriteIO).getClear();
target.setStatistic(StTimeSpillElapsed, ioTime);
}
target.setStatistic(StNumSpills, 1);
}
};

ISharedRowStreamReader *createSharedFullSpillingWriteAhead(CActivityBase *_activity, unsigned numOutputs, IRowStream *_input, bool _inputGrouped, const SharedRowStreamReaderOptions &options, IThorRowInterfaces *_rowIf, const char *tempFileName, ICompressHandler *compressHandler)
Expand Down
2 changes: 1 addition & 1 deletion thorlcr/thorutil/thbuf.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ interface ISharedRowStreamReader : extends IInterface
virtual IRowStream *queryOutput(unsigned output) = 0;
virtual void cancel()=0;
virtual void reset() = 0;
virtual void mergeStats(CRuntimeStatisticCollection & target) const = 0;
};


Expand All @@ -84,7 +85,6 @@ interface ISharedSmartBufferCallback
interface ISharedSmartBufferRowWriter : extends IRowWriter
{
virtual void putRow(const void *row, ISharedSmartBufferCallback *callback) = 0; // extended form of putRow, which signals when pages out via callback
virtual unsigned __int64 getStatistic(StatisticKind kind) const = 0;
};

interface ISharedSmartBuffer : extends ISharedRowStreamReader
Expand Down
1 change: 1 addition & 0 deletions thorlcr/thorutil/thormisc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ const StatisticsMapping soapcallActivityStatistics({}, basicActivityStatistics,
const StatisticsMapping hashDedupActivityStatistics({}, spillStatistics, diskWriteRemoteStatistics, basicActivityStatistics);
const StatisticsMapping hashDistribActivityStatistics({StNumLocalRows, StNumRemoteRows, StSizeRemoteWrite}, basicActivityStatistics);
const StatisticsMapping nsplitterActivityStatistics({}, spillStatistics, basicActivityStatistics);
const StatisticsMapping spillingWriteAheadStatistics({}, spillStatistics);

MODULE_INIT(INIT_PRIORITY_STANDARD)
{
Expand Down
1 change: 1 addition & 0 deletions thorlcr/thorutil/thormisc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ extern graph_decl const StatisticsMapping indexReadFileStatistics;
extern graph_decl const StatisticsMapping hashDedupActivityStatistics;
extern graph_decl const StatisticsMapping hashDistribActivityStatistics;
extern graph_decl const StatisticsMapping nsplitterActivityStatistics;
extern graph_decl const StatisticsMapping spillingWriteAheadStatistics;

class BooleanOnOff
{
Expand Down

0 comments on commit c5c0127

Please sign in to comment.