From 06a9ffea3c049f68b09f5306ff5107855c8ff5d7 Mon Sep 17 00:00:00 2001 From: Shamser Ahmed Date: Tue, 5 Nov 2024 14:17:13 +0000 Subject: [PATCH] HPCC-32946 Capture and report lookahead timings for hash distribute Signed-off-by: Shamser Ahmed --- thorlcr/activities/fetch/thfetchslave.cpp | 4 ++-- .../hashdistrib/thhashdistribslave.cpp | 18 +++++++++++------- .../hashdistrib/thhashdistribslave.ipp | 2 +- 3 files changed, 14 insertions(+), 10 deletions(-) diff --git a/thorlcr/activities/fetch/thfetchslave.cpp b/thorlcr/activities/fetch/thfetchslave.cpp index 96a6d75bfb6..1eabc0d8a3a 100644 --- a/thorlcr/activities/fetch/thfetchslave.cpp +++ b/thorlcr/activities/fetch/thfetchslave.cpp @@ -80,7 +80,7 @@ class CFetchStream : public IRowStream, implements IStopInput, implements IFetch bool abortSoon; mptag_t tag; Owned keyOutStream; - CActivityBase &owner; + CSlaveActivity &owner; Linked keyRowIf, fetchRowIf; StringAttr logicalFilename; @@ -124,7 +124,7 @@ class CFetchStream : public IRowStream, implements IStopInput, implements IFetch public: IMPLEMENT_IINTERFACE_USING(CSimpleInterface); - CFetchStream(CActivityBase &_owner, IThorRowInterfaces *_keyRowIf, IThorRowInterfaces *_fetchRowIf, bool &_abortSoon, const char *_logicalFilename, CPartDescriptorArray &_parts, unsigned _offsetCount, size32_t offsetMapSz, const void *offsetMap, IFetchHandler *_iFetchHandler, mptag_t _tag, IExpander *_eexp) + CFetchStream(CSlaveActivity &_owner, IThorRowInterfaces *_keyRowIf, IThorRowInterfaces *_fetchRowIf, bool &_abortSoon, const char *_logicalFilename, CPartDescriptorArray &_parts, unsigned _offsetCount, size32_t offsetMapSz, const void *offsetMap, IFetchHandler *_iFetchHandler, mptag_t _tag, IExpander *_eexp) : owner(_owner), keyRowIf(_keyRowIf), fetchRowIf(_fetchRowIf), abortSoon(_abortSoon), logicalFilename(_logicalFilename), iFetchHandler(_iFetchHandler), offsetCount(_offsetCount), tag(_tag), eexp(_eexp) { diff --git a/thorlcr/activities/hashdistrib/thhashdistribslave.cpp b/thorlcr/activities/hashdistrib/thhashdistribslave.cpp index b2e5f034417..a2fc290cf8c 100644 --- a/thorlcr/activities/hashdistrib/thhashdistribslave.cpp +++ b/thorlcr/activities/hashdistrib/thhashdistribslave.cpp @@ -859,7 +859,11 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl } if (aborted) break; - const void *row = input->ungroupedNextRow(); + const void *row; + { + LookAheadTimer t(owner.activity->getActivityTimerAccumulator(), owner.activity->queryTimeActivities()); + row = input->ungroupedNextRow(); + } if (!row) break; @@ -1060,7 +1064,7 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl ::ActPrintLogEx(&activity->queryContainer(), e, thorlog_all, MCexception(e), "%s", msg.str()); } protected: - CActivityBase *activity; + CSlaveActivity *activity; size32_t inputBufferSize, pullBufferSize; unsigned writerPoolSize; unsigned self; @@ -1078,7 +1082,7 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl public: IMPLEMENT_IINTERFACE_USING(CInterface); - CDistributorBase(CActivityBase *_activity, bool _doDedup, bool _isall, IStopInput *_istop, const char *_id) + CDistributorBase(CSlaveActivity *_activity, bool _doDedup, bool _isall, IStopInput *_istop, const char *_id) : activity(_activity), recvthread(this), sendthread(this), sender(*this), id(_id) { aborted = connected = false; @@ -1530,7 +1534,7 @@ class CRowDistributor: public CDistributorBase ICommunicator &comm; bool stopping; public: - CRowDistributor(CActivityBase *activity, ICommunicator &_comm, mptag_t _tag, bool doDedup, bool isAll, IStopInput *istop, const char *id) + CRowDistributor(CSlaveActivity *activity, ICommunicator &_comm, mptag_t _tag, bool doDedup, bool isAll, IStopInput *istop, const char *id) : CDistributorBase(activity, doDedup, isAll, istop, id), comm(_comm), tag(_tag) { stopping = false; @@ -1791,7 +1795,7 @@ class CRowPullDistributor: public CDistributorBase selfdone.reinit(); } public: - CRowPullDistributor(CActivityBase *activity, ICommunicator &_comm, mptag_t _tag, bool doDedup, IStopInput *istop, const char *id) + CRowPullDistributor(CSlaveActivity *activity, ICommunicator &_comm, mptag_t _tag, bool doDedup, IStopInput *istop, const char *id) : CDistributorBase(activity, doDedup, false, istop, id), comm(_comm), tag(_tag) { pull = true; @@ -2086,12 +2090,12 @@ class CRowPullDistributor: public CDistributorBase //================================================================================================== -IHashDistributor *createHashDistributor(CActivityBase *activity, ICommunicator &comm, mptag_t tag, bool doDedup, bool isAll, IStopInput *istop, const char *id) +IHashDistributor *createHashDistributor(CSlaveActivity *activity, ICommunicator &comm, mptag_t tag, bool doDedup, bool isAll, IStopInput *istop, const char *id) { return new CRowDistributor(activity, comm, tag, doDedup, isAll, istop, id); } -IHashDistributor *createPullHashDistributor(CActivityBase *activity, ICommunicator &comm, mptag_t tag, bool doDedup, IStopInput *istop, const char *id=NULL) +IHashDistributor *createPullHashDistributor(CSlaveActivity *activity, ICommunicator &comm, mptag_t tag, bool doDedup, IStopInput *istop, const char *id=NULL) { return new CRowPullDistributor(activity, comm, tag, doDedup, istop, id); } diff --git a/thorlcr/activities/hashdistrib/thhashdistribslave.ipp b/thorlcr/activities/hashdistrib/thhashdistribslave.ipp index 862ec32bdd9..900d2f0bf11 100644 --- a/thorlcr/activities/hashdistrib/thhashdistribslave.ipp +++ b/thorlcr/activities/hashdistrib/thhashdistribslave.ipp @@ -36,7 +36,7 @@ interface IHashDistributor : extends IInterface interface IStopInput; IHashDistributor *createHashDistributor( - CActivityBase *activity, + CSlaveActivity *activity, ICommunicator &comm, mptag_t tag, bool dedup,