Skip to content

Commit

Permalink
HPCC-32000 Spill stats for nsplitter
Browse files Browse the repository at this point in the history
StSizePeakEphemeralDisk, StSizePeakTempDisk, StNumSpills, and
StSizeSpillFile for nsplitter implemented.

Signed-off-by: Shamser Ahmed <[email protected]>
  • Loading branch information
shamser committed Jun 27, 2024
1 parent 30cae58 commit 2a73c06
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 24 deletions.
8 changes: 7 additions & 1 deletion thorlcr/activities/nsplitter/thnsplitterslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ class NSplitterSlaveActivity : public CSlaveActivity, implements ISharedSmartBuf
}
}
public:
NSplitterSlaveActivity(CGraphElementBase *_container) : CSlaveActivity(_container), writer(*this)
NSplitterSlaveActivity(CGraphElementBase *_container) : CSlaveActivity(_container, nsplitterActivityStatistics), writer(*this)
{
numOutputs = container.getOutputs();
connectedOutputSet.setown(createBitSet());
Expand Down Expand Up @@ -401,6 +401,12 @@ class NSplitterSlaveActivity : public CSlaveActivity, implements ISharedSmartBuf
if (sharedRowStream)
sharedRowStream->cancel();
}
virtual void gatherActiveStats(CRuntimeStatisticCollection &activeStats) const override
{
PARENT::gatherActiveStats(activeStats);
if (sharedRowStream)
::mergeStats(activeStats, sharedRowStream);
}
// ISharedSmartBufferCallback impl.
virtual void paged() { pagedOut = true; }
virtual void blocked()
Expand Down
4 changes: 3 additions & 1 deletion thorlcr/master/thactivitymaster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,6 @@ class CGenericMasterGraphElement : public CMasterGraphElement
case TAKcase:
case TAKchildcase:
case TAKdegroup:
case TAKsplit:
case TAKproject:
case TAKprefetchproject:
case TAKprefetchcountproject:
Expand Down Expand Up @@ -210,6 +209,9 @@ class CGenericMasterGraphElement : public CMasterGraphElement
case TAKemptyaction:
ret = new CMasterActivity(this);
break;
case TAKsplit:
ret = new CMasterActivity(this, nsplitterActivityStatistics);
break;
case TAKsoap_rowdataset:
case TAKsoap_rowaction:
case TAKsoap_datasetdataset:
Expand Down
92 changes: 71 additions & 21 deletions thorlcr/thorutil/thbuf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1753,6 +1753,10 @@ class CSharedWriteAheadBase : public CSimpleInterface, implements ISharedSmartBu
queryCOutput(c).reset();
inMemRows->reset(0);
}
virtual unsigned __int64 getStatistic(StatisticKind kind) const override
{
return 0;
}
friend class COutput;
friend class CRowSet;
};
Expand Down Expand Up @@ -2145,6 +2149,24 @@ class CSharedWriteAheadDisk : public CSharedWriteAheadBase
tempFileIO->setSize(0);
tempFileOwner->noteSize(0);
}
virtual unsigned __int64 getStatistic(StatisticKind kind) const override
{
switch (kind)
{
case StSizeSpillFile:
return tempFileIO->getStatistic(StSizeDiskWrite);
case StCycleDiskWriteIOCycles:
case StTimeDiskWriteIO:
case StSizeDiskWrite:
return 0;
case StNumSpills:
return 1;
case StTimeSpillElapsed:
return tempFileIO->getStatistic(StCycleDiskWriteIOCycles);
default:
return tempFileIO->getStatistic(kind);
}
}
};

ISharedSmartBuffer *createSharedSmartDiskBuffer(CActivityBase *activity, const char *spillname, unsigned outputs, IThorRowInterfaces *rowIf)
Expand Down Expand Up @@ -2433,7 +2455,7 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader
std::atomic<rowcount_t> totalInputRowsRead = 0; // not used until spilling begins, represents count of all rows read
rowcount_t inMemTotalRows = 0; // whilst in memory, represents count of all rows seen
CriticalSection readAheadCS; // ensure single reader (leader), reads ahead (updates rows/totalInputRowsRead/inMemTotalRows)
Owned<IFile> iFile;
Owned<CFileOwner> tempFileOwner;
Owned<IFileIO> iFileIO;
Owned<IBufferedSerialOutputStream> outputStream;
Linked<ICompressHandler> compressHandler;
Expand All @@ -2442,6 +2464,8 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader
bool inputGrouped = false;
SharedRowStreamReaderOptions options;
size32_t inMemReadAheadGranularity = 0;
CRuntimeStatisticCollection inactiveStats;


rowcount_t getLowestOutput()
{
Expand All @@ -2467,13 +2491,19 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader
}
void closeWriter()
{
iFileIO.clear();
outputStream.clear();
if (outputStream)
{
iFileIO->flush();
tempFileOwner->noteSize(iFileIO->getStatistic(StSizeDiskWrite));
::mergeStats(inactiveStats, iFileIO);
iFileIO.clear();
outputStream.clear();
}
}
void createOutputStream()
{
// NB: Called once, when spilling starts.
auto res = createSerialOutputStream(iFile, compressHandler, options, numOutputs + 1);
auto res = createSerialOutputStream(tempFileOwner->queryIFile(), compressHandler, options, numOutputs + 1);
outputStream.setown(std::get<0>(res));
iFileIO.setown(std::get<1>(res));
totalInputRowsRead = inMemTotalRows;
Expand Down Expand Up @@ -2517,7 +2547,7 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader
}
outputStream->flush();
totalInputRowsRead.fetch_add(newRowsWritten);

tempFileOwner->noteSize(iFileIO->getStatistic(StSizeDiskWrite));
// JCSMORE - could track size written, and start new file at this point (e.g. every 100MB),
// and track their starting points (by row #) in a vector
// We could then tell if/when the readers catch up, and remove consumed files as they do.
Expand All @@ -2530,7 +2560,8 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader
public:
explicit CSharedFullSpillingWriteAhead(CActivityBase *_activity, unsigned _numOutputs, IRowStream *_input, bool _inputGrouped, const SharedRowStreamReaderOptions &_options, IThorRowInterfaces *rowIf, const char *tempFileName, ICompressHandler *_compressHandler)
: activity(*_activity), numOutputs(_numOutputs), input(_input), inputGrouped(_inputGrouped), options(_options), compressHandler(_compressHandler),
meta(rowIf->queryRowMetaData()), serializer(rowIf->queryRowSerializer()), allocator(rowIf->queryRowAllocator()), deserializer(rowIf->queryRowDeserializer())
meta(rowIf->queryRowMetaData()), serializer(rowIf->queryRowSerializer()), allocator(rowIf->queryRowAllocator()), deserializer(rowIf->queryRowDeserializer()),
inactiveStats(spillingWriteAheadStatistics)
{
assertex(input);

Expand All @@ -2541,15 +2572,11 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader

for (unsigned o=0; o<numOutputs; o++)
outputs.push_back(new COutputRowStream(*this, o));
iFile.setown(createIFile(tempFileName));
tempFileOwner.setown(activity.createOwnedTempFile(tempFileName));
}
~CSharedFullSpillingWriteAhead()
{
if (outputStream) // should have already been closed when inputs all stopped
{
closeWriter();
iFile->remove();
}
closeWriter();
freeRows();
}
void outputStopped(unsigned output)
Expand All @@ -2568,15 +2595,15 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader
{
StringBuffer tracing;
getFileIOStats(tracing, iFileIO);
activity.ActPrintLog("CSharedFullSpillingWriteAhead: removing spill file: %s%s", iFile->queryFilename(), tracing.str());
activity.ActPrintLog("CSharedFullSpillingWriteAhead::outputStopped closing tempfile writer: %s %s", tempFileOwner->queryIFile().queryFilename(), tracing.str());
closeWriter();
iFile->remove();
tempFileOwner->noteSize(0);
}
}
}
std::tuple<IBufferedSerialInputStream *, IFileIO *> getReadStream() // also pass back IFileIO for stats purposes
{
return createSerialInputStream(iFile, compressHandler, options, numOutputs + 1); // +1 for writer
return createSerialInputStream(tempFileOwner->queryIFile(), compressHandler, options, numOutputs + 1); // +1 for writer
}
bool checkWriteAhead(rowcount_t &outputRowsAvailable)
{
Expand Down Expand Up @@ -2623,7 +2650,7 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader
if (rowsMemUsage >= options.inMemMaxMem) // too much in memory, spill
{
// NB: this will reset rowMemUsage, however, each reader will continue to consume rows until they catch up (or stop)
ActPrintLog(&activity, "Spilling to temp storage [file = %s, outputRowsAvailable = %" I64F "u, start = %" I64F "u, end = %" I64F "u, count = %u]", iFile->queryFilename(), outputRowsAvailable, inMemTotalRows - rows.size(), inMemTotalRows, (unsigned)rows.size());
ActPrintLog(&activity, "Spilling to temp storage [file = %s, outputRowsAvailable = %" I64F "u, start = %" I64F "u, end = %" I64F "u, count = %u]", tempFileOwner->queryIFile().queryFilename(), outputRowsAvailable, inMemTotalRows - rows.size(), inMemTotalRows, (unsigned)rows.size());
createOutputStream();
return false;
}
Expand Down Expand Up @@ -2686,11 +2713,8 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader
}
virtual void reset() override
{
if (outputStream) // should have already been closed when inputs all stopped
{
closeWriter();
iFile->remove();
}
closeWriter();
tempFileOwner->noteSize(0);
for (auto &output: outputs)
output->reset();
freeRows();
Expand All @@ -2701,6 +2725,32 @@ class CSharedFullSpillingWriteAhead : public CInterfaceOf<ISharedRowStreamReader
nextInputReadEog = false;
endOfInput = false;
}
virtual unsigned __int64 getStatistic(StatisticKind kind) const override
{
StatisticKind useKind;
switch (kind)
{
case StSizeSpillFile:
useKind = StSizeDiskWrite;
break;
case StCycleDiskWriteIOCycles:
case StTimeDiskWriteIO:
case StSizeDiskWrite:
return 0;
case StNumSpills:
return 1;
case StTimeSpillElapsed:
useKind = StCycleDiskWriteIOCycles;
break;
default:
useKind = kind;
}
unsigned __int64 v = 0;
if (likely(iFileIO))
v = iFileIO->getStatistic(useKind);
v += inactiveStats.getStatisticValue(useKind);
return v;
}
};

ISharedRowStreamReader *createSharedFullSpillingWriteAhead(CActivityBase *_activity, unsigned numOutputs, IRowStream *_input, bool _inputGrouped, const SharedRowStreamReaderOptions &options, IThorRowInterfaces *_rowIf, const char *tempFileName, ICompressHandler *compressHandler)
Expand Down
1 change: 1 addition & 0 deletions thorlcr/thorutil/thbuf.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ interface ISharedRowStreamReader : extends IInterface
virtual IRowStream *queryOutput(unsigned output) = 0;
virtual void cancel()=0;
virtual void reset() = 0;
virtual unsigned __int64 getStatistic(StatisticKind kind) const = 0;
};


Expand Down
4 changes: 3 additions & 1 deletion thorlcr/thorutil/thormisc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,10 @@ const StatisticsMapping graphStatistics({StNumExecutions, StSizeSpillFile, StSiz
const StatisticsMapping diskReadPartStatistics({StNumDiskRowsRead}, diskReadRemoteStatistics);
const StatisticsMapping indexDistribActivityStatistics({}, basicActivityStatistics, jhtreeCacheStatistics);
const StatisticsMapping soapcallActivityStatistics({}, basicActivityStatistics, soapcallStatistics);
const StatisticsMapping hashDedupActivityStatistics({StNumSpills, StSizeSpillFile, StTimeSortElapsed, StSizePeakTempDisk}, diskWriteRemoteStatistics, basicActivityStatistics);
const StatisticsMapping hashDedupActivityStatistics({}, spillStatistics, diskWriteRemoteStatistics, basicActivityStatistics);
const StatisticsMapping hashDistribActivityStatistics({StNumLocalRows, StNumRemoteRows, StSizeRemoteWrite}, basicActivityStatistics);
const StatisticsMapping nsplitterActivityStatistics({}, spillStatistics, basicActivityStatistics);
const StatisticsMapping spillingWriteAheadStatistics({}, spillStatistics);

MODULE_INIT(INIT_PRIORITY_STANDARD)
{
Expand Down
2 changes: 2 additions & 0 deletions thorlcr/thorutil/thormisc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ extern graph_decl const StatisticsMapping soapcallActivityStatistics;
extern graph_decl const StatisticsMapping indexReadFileStatistics;
extern graph_decl const StatisticsMapping hashDedupActivityStatistics;
extern graph_decl const StatisticsMapping hashDistribActivityStatistics;
extern graph_decl const StatisticsMapping nsplitterActivityStatistics;
extern graph_decl const StatisticsMapping spillingWriteAheadStatistics;

class BooleanOnOff
{
Expand Down

0 comments on commit 2a73c06

Please sign in to comment.