Skip to content

Commit

Permalink
HPCC-28757 Report the peak activity and graph spill
Browse files Browse the repository at this point in the history
Signed-off-by: Shamser Ahmed <[email protected]>
  • Loading branch information
shamser committed Apr 8, 2024
1 parent cbb5711 commit 6f81a81
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 8 deletions.
1 change: 1 addition & 0 deletions system/jlib/jstatcodes.h
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ enum StatisticKind
StSizeContinuationData,
StNumContinuationRequests,
StNumFailures,
StSizePeakSpill,
StMax,

//For any quantity there is potentially the following variants.
Expand Down
1 change: 1 addition & 0 deletions system/jlib/jstats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -974,6 +974,7 @@ static const constexpr StatisticMeta statsMetaData[StMax] = {
{ SIZESTAT(ContinuationData), "The total size of continuation data sent from agent to the server\nA large number may indicate a poor filter, or merging from many different index locations" },
{ NUMSTAT(ContinuationRequests), "The number of times the agent indicated there was more data to be returned" },
{ NUMSTAT(Failures), "The number of times a query has failed" },
{ PEAKSIZESTAT(PeakSpill), "The high water mark for spill files"},
};

static MapStringTo<StatisticKind, StatisticKind> statisticNameMap(true);
Expand Down
21 changes: 16 additions & 5 deletions thorlcr/activities/hashdistrib/thhashdistribslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2648,17 +2648,18 @@ class CSpill : implements IRowWriter, public CSimpleInterface
CActivityBase &owner;
IThorRowInterfaces *rowIf;
rowcount_t count;
Owned<CFileOwner> spillFile;
Owned<CFileOwnerSizeUpdater> spillFile;
Owned<IFileIO> spillFileIO;
IRowWriter *writer;
StringAttr desc;
unsigned bucketN, rwFlags;
Linked<FilesSizeTracker> spillsSizeTracker;

public:
IMPLEMENT_IINTERFACE_USING(CSimpleInterface);

CSpill(CActivityBase &_owner, IThorRowInterfaces *_rowIf, const char *_desc, unsigned _bucketN)
: owner(_owner), rowIf(_rowIf), desc(_desc), bucketN(_bucketN)
CSpill(CActivityBase &_owner, IThorRowInterfaces *_rowIf, const char *_desc, unsigned _bucketN, FilesSizeTracker * _spillsSizeTracker)
: owner(_owner), rowIf(_rowIf), desc(_desc), bucketN(_bucketN), spillsSizeTracker(_spillsSizeTracker)
{
count = 0;
writer = NULL;
Expand All @@ -2676,7 +2677,7 @@ class CSpill : implements IRowWriter, public CSimpleInterface
prefix.append(bucketN).append('_').append(desc);
GetTempFilePath(tempname, prefix.str());
OwnedIFile iFile = createIFile(tempname.str());
spillFile.setown(new CFileOwner(iFile.getLink()));
spillFile.setown(new CFileOwnerSizeUpdater(iFile.getLink(), spillsSizeTracker));
if (owner.getOptBool(THOROPT_COMPRESS_SPILLS, true))
{
rwFlags |= rw_compress;
Expand Down Expand Up @@ -2712,6 +2713,7 @@ class CSpill : implements IRowWriter, public CSimpleInterface
writer = NULL;
spillFileIO->flush();
mergeStats(stats, this);
spillFile->noteSpill(getStatistic(StSizeSpillFile));
spillFileIO.clear();
}
inline __int64 getStatistic(StatisticKind kind) const
Expand Down Expand Up @@ -3174,6 +3176,7 @@ class HashDedupSlaveActivityBase : public CSlaveActivity
bucketHandlerStack.kill();
CSlaveActivity::kill();
}

CATCH_NEXTROW()
{
ActivityTimer t(slaveTimerStats, timeActivities);
Expand Down Expand Up @@ -3299,6 +3302,14 @@ class HashDedupSlaveActivityBase : public CSlaveActivity

virtual bool isGrouped() const override { return grouped; }
virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override = 0;

virtual void gatherActiveStats(CRuntimeStatisticCollection &activeStats) const
{
PARENT::gatherActiveStats(activeStats);
unsigned __int64 peakSpill = queryPeakSpillSize();
activeStats.setStatistic(StSizePeakSpill, peakSpill);
}

friend class CBucketHandler;
friend class CHashTableRowTable;
friend class CBucket;
Expand Down Expand Up @@ -3373,7 +3384,7 @@ void CHashTableRowTable::rehash(const void **newRows)

CBucket::CBucket(HashDedupSlaveActivityBase &_owner, IThorRowInterfaces *_rowIf, IThorRowInterfaces *_keyIf, bool _extractKey, unsigned _bucketN, CHashTableRowTable *_htRows)
: owner(_owner), keyIf(_keyIf), extractKey(_extractKey), bucketN(_bucketN), htRows(_htRows),
rowSpill(owner, _rowIf, "rows", _bucketN), keySpill(owner, _keyIf, "keys", _bucketN)
rowSpill(owner, _rowIf, "rows", _bucketN, _owner.queryFileSizeTracker()), keySpill(owner, _keyIf, "keys", _bucketN, _owner.queryFileSizeTracker())

{
spilt = false;
Expand Down
20 changes: 19 additions & 1 deletion thorlcr/graph/thgraphslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1280,8 +1280,26 @@ bool CSlaveGraph::serializeStats(MemoryBuffer &mb)
jobS->querySharedAllocator()->queryRowManager()->reportSummaryStatistics(stats);

IGraphTempHandler *tempHandler = owner ? queryTempHandler(false) : queryJob().queryTempHandler();
offset_t sizeGraphSpill = tempHandler ? tempHandler->getActiveUsageSize() : 0;
if (tempHandler)
stats.mergeStatistic(StSizeGraphSpill, tempHandler->getActiveUsageSize());
stats.mergeStatistic(StSizeGraphSpill, sizeGraphSpill);

// calculate peak spill size
if (started&&initialized)
{
unsigned __int64 activeSpillSize = 0;
Owned<IThorActivityIterator> iter = getConnectedIterator();
ForEach (*iter)
{
CGraphElementBase &element = iter->query();
CSlaveActivity &activity = (CSlaveActivity &)*element.queryActivity();
activeSpillSize += activity.queryActiveSpillSize();
}
if (activeSpillSize > peakSpillSize)
peakSpillSize = activeSpillSize;
}
if (peakSpillSize + sizeGraphSpill)
stats.mergeStatistic(StSizePeakSpill, peakSpillSize + sizeGraphSpill);
stats.serialize(mb);

unsigned cPos = mb.length();
Expand Down
16 changes: 16 additions & 0 deletions thorlcr/graph/thgraphslave.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ class graphslave_decl CSlaveActivity : public CActivityBase, public CEdgeProgres
// fileStats is in this base class as it used by multiple derived classes (both slave and master) but not all.
// (Having it in the base class aids setup and resizing.)
mutable std::vector<OwnedPtr<CRuntimeStatisticCollection>> fileStats;
Owned<FilesSizeTracker> spillsSizeTracker;

protected:
unsigned __int64 queryLocalCycles() const;
Expand Down Expand Up @@ -262,6 +263,20 @@ class graphslave_decl CSlaveActivity : public CActivityBase, public CEdgeProgres
bool canStall() const;
bool isFastThrough() const;

FilesSizeTracker * queryFileSizeTracker()
{
if (!spillsSizeTracker)
spillsSizeTracker.setown(new FilesSizeTracker);
return spillsSizeTracker;
}
unsigned __int64 queryActiveSpillSize() const
{
return spillsSizeTracker ? spillsSizeTracker->queryActiveSize() : 0;
}
unsigned __int64 queryPeakSpillSize() const
{
return spillsSizeTracker ? spillsSizeTracker->queryPeakSize() : 0;
}

// IThorDataLink
virtual CSlaveActivity *queryFromActivity() override { return this; }
Expand Down Expand Up @@ -516,6 +531,7 @@ class graphslave_decl CSlaveGraph : public CGraphBase
bool doneInit = false;
std::atomic_bool progressActive;
ProcessInfo processStartInfo;
unsigned __int64 peakSpillSize = 0;

public:

Expand Down
4 changes: 2 additions & 2 deletions thorlcr/thorutil/thormisc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,11 @@ const StatisticsMapping joinActivityStatistics({StNumLeftRows, StNumRightRows},
const StatisticsMapping diskReadActivityStatistics({StNumDiskRowsRead, }, basicActivityStatistics, diskReadRemoteStatistics);
const StatisticsMapping diskWriteActivityStatistics({StPerReplicated}, basicActivityStatistics, diskWriteRemoteStatistics);
const StatisticsMapping sortActivityStatistics({}, basicActivityStatistics, spillStatistics);
const StatisticsMapping graphStatistics({StNumExecutions, StSizeSpillFile, StSizeGraphSpill, StTimeUser, StTimeSystem, StNumContextSwitches, StSizeMemory, StSizePeakMemory, StSizeRowMemory, StSizePeakRowMemory}, basicActivityStatistics);
const StatisticsMapping graphStatistics({StNumExecutions, StSizeSpillFile, StSizeGraphSpill, StSizePeakSpill, StTimeUser, StTimeSystem, StNumContextSwitches, StSizeMemory, StSizePeakMemory, StSizeRowMemory, StSizePeakRowMemory}, basicActivityStatistics);
const StatisticsMapping diskReadPartStatistics({StNumDiskRowsRead}, diskReadRemoteStatistics);
const StatisticsMapping indexDistribActivityStatistics({}, basicActivityStatistics, jhtreeCacheStatistics);
const StatisticsMapping soapcallActivityStatistics({}, basicActivityStatistics, soapcallStatistics);
const StatisticsMapping hashDedupActivityStatistics({StNumSpills, StSizeSpillFile, StTimeSortElapsed}, diskWriteRemoteStatistics, basicActivityStatistics);
const StatisticsMapping hashDedupActivityStatistics({StNumSpills, StSizeSpillFile, StTimeSortElapsed, StSizePeakSpill}, diskWriteRemoteStatistics, basicActivityStatistics);

MODULE_INIT(INIT_PRIORITY_STANDARD)
{
Expand Down
48 changes: 48 additions & 0 deletions thorlcr/thorutil/thormisc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,54 @@ class graph_decl CStreamFileOwner : public CSimpleInterfaceOf<IExtRowStream>
}
};

// Tracks the current and peak storage used for some files
class FilesSizeTracker: public CInterface
{
RelaxedAtomic<unsigned __int64> activeSize{0};
RelaxedAtomic<unsigned __int64> peakSize{0};
public:
void growSize(unsigned __int64 size)
{
if (size)
{
unsigned __int64 newActiveSize = activeSize.add_fetch(size);
peakSize.store_max(newActiveSize);
}
}
void shrinkSize(unsigned __int64 size)
{
if (size)
activeSize.fetch_sub(size);
}
unsigned __int64 queryActiveSize() const
{
return activeSize.load();
}
unsigned __int64 queryPeakSize() const
{
return peakSize.load();
}
};

class CFileOwnerSizeUpdater : public CFileOwner
{
Linked<FilesSizeTracker> filesSizeTracker;
__int64 fileSize = 0;
public:
CFileOwnerSizeUpdater(IFile *_iFile, FilesSizeTracker * _filesSizeTracker): CFileOwner(_iFile), filesSizeTracker(_filesSizeTracker)
{}

~CFileOwnerSizeUpdater()
{
filesSizeTracker->shrinkSize(fileSize);
}

void noteSpill(__int64 size)
{
fileSize = size;
filesSizeTracker->growSize(fileSize);
}
};

#define DEFAULT_THORMASTERPORT 20000
#define DEFAULT_THORSLAVEPORT 20100
Expand Down

0 comments on commit 6f81a81

Please sign in to comment.