From f6730cbb3760244cbd50cf4c8243683c160c48be Mon Sep 17 00:00:00 2001 From: Shamser Ahmed Date: Fri, 1 Nov 2024 15:56:41 +0000 Subject: [PATCH] HPCC-32931 Capture and report execute timings for splitters Signed-off-by: Shamser Ahmed --- thorlcr/activities/nsplitter/thnsplitterslave.cpp | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/thorlcr/activities/nsplitter/thnsplitterslave.cpp b/thorlcr/activities/nsplitter/thnsplitterslave.cpp index d1d8f2e2e8e..9a0fe51801f 100644 --- a/thorlcr/activities/nsplitter/thnsplitterslave.cpp +++ b/thorlcr/activities/nsplitter/thnsplitterslave.cpp @@ -101,6 +101,7 @@ class NSplitterSlaveActivity : public CSlaveActivity, implements ISharedSmartBuf bool inputConnected = false; unsigned numOutputs = 0; ThorDataLinkMetaInfo cachedMetaInfo; + std::unique_ptr blockedActivityTimer; // NB: CWriter only used by 'balanced' splitter, which blocks write when too far ahead class CWriter : public CSimpleInterface, IThreaded @@ -217,7 +218,10 @@ class NSplitterSlaveActivity : public CSlaveActivity, implements ISharedSmartBuf assertex(((unsigned)-1) != connectedOutputCount); activeOutputCount = connectedOutputCount; - PARENT::start(); + { + ActivityTimer t(slaveTimerStats, queryTimeActivities()); + PARENT::start(); + } initMetaInfo(cachedMetaInfo); cachedMetaInfo.suppressLookAhead = spill; // only suppress downstream lookaheads if this is a spilling splitter @@ -299,6 +303,7 @@ class NSplitterSlaveActivity : public CSlaveActivity, implements ISharedSmartBuf } inline const void *nextRow(unsigned outIdx, rowcount_t current) { + ActivityTimer t(slaveTimerStats, queryTimeActivities()); if (1 == activeOutputCount) // will be true, if only 1 input connected, or only 1 input was active (others stopped) when it started reading return inputStream->nextRow(); if (recsReady == current && writeAheadException.get()) @@ -412,11 +417,13 @@ class NSplitterSlaveActivity : public CSlaveActivity, implements ISharedSmartBuf virtual void paged() { pagedOut = true; } virtual void blocked() { + blockedActivityTimer.reset(new BlockedActivityTimer(slaveTimerStats, queryTimeActivities())); writeBlocked = true; // Prevent other users getting beyond checking recsReady in writeahead() writeAheadCrit.leave(); } virtual void unblocked() { + blockedActivityTimer.reset(nullptr); writeAheadCrit.enter(); writeBlocked = false; if (stalledWriters.ordinality())