Skip to content

Commit

Permalink
HPCC-32000 Spill stats for nsplitter
Browse files Browse the repository at this point in the history
StSizePeakEphemeralDisk, StSizePeakTempDisk, StNumSpills, and
StSizeSpillFile for nsplitter implemented.

Signed-off-by: Shamser Ahmed <[email protected]>
  • Loading branch information
shamser committed Jun 12, 2024
1 parent 58aa3e6 commit 4eab8f9
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 13 deletions.
8 changes: 7 additions & 1 deletion thorlcr/activities/nsplitter/thnsplitterslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ class NSplitterSlaveActivity : public CSlaveActivity, implements ISharedSmartBuf
}
}
public:
NSplitterSlaveActivity(CGraphElementBase *_container) : CSlaveActivity(_container), writer(*this)
NSplitterSlaveActivity(CGraphElementBase *_container) : CSlaveActivity(_container, nsplitterActivityStatistics), writer(*this)
{
numOutputs = container.getOutputs();
connectedOutputSet.setown(createBitSet());
Expand Down Expand Up @@ -360,6 +360,12 @@ class NSplitterSlaveActivity : public CSlaveActivity, implements ISharedSmartBuf
if (smartBuf)
smartBuf->cancel();
}
virtual void gatherActiveStats(CRuntimeStatisticCollection &activeStats) const
{
PARENT::gatherActiveStats(activeStats);
if (smartBuf)
mergeStats(activeStats, smartBuf);
}
// ISharedSmartBufferCallback impl.
virtual void paged() { pagedOut = true; }
virtual void blocked()
Expand Down
4 changes: 3 additions & 1 deletion thorlcr/master/thactivitymaster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,6 @@ class CGenericMasterGraphElement : public CMasterGraphElement
case TAKcase:
case TAKchildcase:
case TAKdegroup:
case TAKsplit:
case TAKproject:
case TAKprefetchproject:
case TAKprefetchcountproject:
Expand Down Expand Up @@ -210,6 +209,9 @@ class CGenericMasterGraphElement : public CMasterGraphElement
case TAKemptyaction:
ret = new CMasterActivity(this);
break;
case TAKsplit:
ret = new CMasterActivity(this, nsplitterActivityStatistics);
break;
case TAKsoap_rowdataset:
case TAKsoap_rowaction:
case TAKsoap_datasetdataset:
Expand Down
35 changes: 29 additions & 6 deletions thorlcr/thorutil/thbuf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1175,6 +1175,10 @@ class CSharedWriteAheadBase : public CSimpleInterface, implements ISharedSmartBu
queryCOutput(c).reset();
inMemRows->reset(0);
}
virtual unsigned __int64 getStatistic(StatisticKind kind) override
{
return 0;
}
friend class COutput;
friend class CRowSet;
};
Expand All @@ -1197,7 +1201,7 @@ bool CRowSet::Release() const

class CSharedWriteAheadDisk : public CSharedWriteAheadBase
{
Owned<IFile> spillFile;
Owned<CFileOwner> spillFile;
Owned<IFileIO> spillFileIO;
CIArrayOf<Chunk> freeChunks;
PointerArrayOf<Chunk> freeChunksSized;
Expand All @@ -1206,6 +1210,8 @@ class CSharedWriteAheadDisk : public CSharedWriteAheadBase
Linked<IEngineRowAllocator> allocator;
Linked<IOutputRowDeserializer> deserializer;
IOutputMetaData *serializeMeta;
size_t sizeSpilled = 0;
stat_type spillElapsedCycles = 0;

struct AddRemoveFreeChunk
{
Expand Down Expand Up @@ -1484,7 +1490,11 @@ class CSharedWriteAheadDisk : public CSharedWriteAheadBase
mb.append((byte)0);
size32_t len = mb.length();
chunk.setown(getOutOffset(len)); // will find space for 'len', might be bigger if from free list
CCycleTimer startCycles;
spillFileIO->write(chunk->offset, len, mb.toByteArray());
spillElapsedCycles += startCycles.elapsedCycles();
sizeSpilled += len;
spillFile->noteSize(highOffset);
#ifdef TRACE_WRITEAHEAD
ActPrintLogEx(&activity->queryContainer(), thorlog_all, MCdebugProgress, "Flushed chunk = %d (savedChunks pos=%d), writeOffset = %" I64F "d, writeSize = %d", inMemRows->queryChunk(), savedChunks.ordinality(), chunk->offset, len);
#endif
Expand All @@ -1507,16 +1517,15 @@ class CSharedWriteAheadDisk : public CSharedWriteAheadBase
allocator(rowIf->queryRowAllocator()), deserializer(rowIf->queryRowDeserializer()), serializeMeta(meta->querySerializedDiskMeta())
{
assertex(spillName);
spillFile.setown(createIFile(spillName));
spillFile->setShareMode(IFSHnone);
spillFileIO.setown(spillFile->open(IFOcreaterw));
Owned<IFile> tempFile = createIFile(spillName);
tempFile->setShareMode(IFSHnone);
spillFile.setown(new CFileOwner(tempFile, activity->queryTempFileSizeTracker()));
spillFileIO.setown(spillFile->queryIFile().open(IFOcreaterw));
highOffset = 0;
}
~CSharedWriteAheadDisk()
{
spillFileIO.clear();
if (spillFile)
spillFile->remove();

for (;;)
{
Expand All @@ -1538,6 +1547,20 @@ class CSharedWriteAheadDisk : public CSharedWriteAheadBase
highOffset = 0;
spillFileIO->setSize(0);
}
virtual unsigned __int64 getStatistic(StatisticKind kind) override
{
switch(kind)
{
case StSizeSpillFile:
return sizeSpilled;
case StTimeSpillElapsed:
return cycle_to_nanosec(spillElapsedCycles);
case StNumSpills:
return 1;
default:
return 0;
}
}
};

ISharedSmartBuffer *createSharedSmartDiskBuffer(CActivityBase *activity, const char *spillname, unsigned outputs, IThorRowInterfaces *rowIf)
Expand Down
1 change: 1 addition & 0 deletions thorlcr/thorutil/thbuf.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ interface ISharedSmartBuffer : extends IRowWriter
virtual IRowStream *queryOutput(unsigned output) = 0;
virtual void cancel()=0;
virtual void reset() = 0;
virtual unsigned __int64 getStatistic(StatisticKind kind) = 0;
};

extern graph_decl ISharedSmartBuffer *createSharedSmartMemBuffer(CActivityBase *activity, unsigned outputs, IThorRowInterfaces *rowif, unsigned buffSize=((unsigned)-1));
Expand Down
5 changes: 3 additions & 2 deletions thorlcr/thorutil/thormisc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ Owned<IPropertyTree> globals;
static Owned<IMPtagAllocator> ClusterMPAllocator;

// stat. mappings shared between master and slave activities
const StatisticsMapping spillStatistics({StTimeSpillElapsed, StTimeSortElapsed, StNumSpills, StSizeSpillFile});
const StatisticsMapping spillStatistics({StTimeSpillElapsed, StTimeSortElapsed, StNumSpills, StSizeSpillFile, StSizePeakTempDisk});
const StatisticsMapping soapcallStatistics({StTimeSoapcall});
const StatisticsMapping basicActivityStatistics({StTimeTotalExecute, StTimeLocalExecute, StTimeBlocked});
const StatisticsMapping groupActivityStatistics({StNumGroups, StNumGroupMax}, basicActivityStatistics);
Expand All @@ -92,8 +92,9 @@ const StatisticsMapping graphStatistics({StNumExecutions, StSizeSpillFile, StSiz
const StatisticsMapping diskReadPartStatistics({StNumDiskRowsRead}, diskReadRemoteStatistics);
const StatisticsMapping indexDistribActivityStatistics({}, basicActivityStatistics, jhtreeCacheStatistics);
const StatisticsMapping soapcallActivityStatistics({}, basicActivityStatistics, soapcallStatistics);
const StatisticsMapping hashDedupActivityStatistics({StNumSpills, StSizeSpillFile, StTimeSortElapsed, StSizePeakTempDisk}, diskWriteRemoteStatistics, basicActivityStatistics);
const StatisticsMapping hashDedupActivityStatistics({}, spillStatistics, diskWriteRemoteStatistics, basicActivityStatistics);
const StatisticsMapping hashDistribActivityStatistics({StNumLocalRows, StNumRemoteRows, StSizeRemoteWrite}, basicActivityStatistics);
const StatisticsMapping nsplitterActivityStatistics({}, spillStatistics, basicActivityStatistics);

MODULE_INIT(INIT_PRIORITY_STANDARD)
{
Expand Down
12 changes: 9 additions & 3 deletions thorlcr/thorutil/thormisc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ extern graph_decl const StatisticsMapping soapcallActivityStatistics;
extern graph_decl const StatisticsMapping indexReadFileStatistics;
extern graph_decl const StatisticsMapping hashDedupActivityStatistics;
extern graph_decl const StatisticsMapping hashDistribActivityStatistics;
extern graph_decl const StatisticsMapping nsplitterActivityStatistics;

class BooleanOnOff
{
Expand Down Expand Up @@ -356,9 +357,14 @@ class graph_decl CFileOwner : public CSimpleInterface, implements IInterface
}
void noteSize(offset_t size)
{
fileSize = size;
if (fileSizeTracker)
fileSizeTracker->growSize(fileSize);
if (fileSizeTracker && fileSize!=size)
{
if (size > fileSize)
fileSizeTracker->growSize(size-fileSize);
else
fileSizeTracker->shrinkSize(fileSize-size);
fileSize = size;
}
}
IFile &queryIFile() const { return *iFile; }
};
Expand Down

0 comments on commit 4eab8f9

Please sign in to comment.