From 42e4591dd0b03cb83c3dc92dae27f72560449b90 Mon Sep 17 00:00:00 2001 From: Jake Smith Date: Thu, 13 Jun 2024 16:33:12 +0100 Subject: [PATCH] HPCC-32060 Rationalize fastThrough lookahead logic Modify info meta flags and logic, to avoid some unnecessary lookaheads Signed-off-by: Jake Smith --- .../activities/aggregate/thaggregateslave.cpp | 1 - .../choosesets/thchoosesetsslave.cpp | 1 - .../countproject/thcountprojectslave.cpp | 2 +- thorlcr/activities/enth/thenthslave.cpp | 2 +- thorlcr/activities/filter/thfilterslave.cpp | 10 +++++ thorlcr/activities/firstn/thfirstnslave.cpp | 5 +++ thorlcr/activities/limit/thlimitslave.cpp | 1 - thorlcr/activities/msort/thgroupsortslave.cpp | 3 +- thorlcr/activities/msort/thmsortslave.cpp | 2 +- .../activities/nsplitter/thnsplitterslave.cpp | 2 + thorlcr/activities/pull/thpullslave.cpp | 4 +- .../activities/selfjoin/thselfjoinslave.cpp | 2 +- thorlcr/graph/thgraphslave.cpp | 43 ++++++++++++++++--- thorlcr/graph/thgraphslave.hpp | 2 + thorlcr/slave/slave.hpp | 17 ++++---- 15 files changed, 74 insertions(+), 23 deletions(-) diff --git a/thorlcr/activities/aggregate/thaggregateslave.cpp b/thorlcr/activities/aggregate/thaggregateslave.cpp index 48c6df103f9..978c9611707 100644 --- a/thorlcr/activities/aggregate/thaggregateslave.cpp +++ b/thorlcr/activities/aggregate/thaggregateslave.cpp @@ -196,7 +196,6 @@ class AggregateSlaveActivity : public AggregateSlaveBase virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override { initMetaInfo(info); - info.singleRowOutput = true; info.totalRowsMin=1; info.totalRowsMax=1; } diff --git a/thorlcr/activities/choosesets/thchoosesetsslave.cpp b/thorlcr/activities/choosesets/thchoosesetsslave.cpp index 13623a5bff5..43029e92618 100644 --- a/thorlcr/activities/choosesets/thchoosesetsslave.cpp +++ b/thorlcr/activities/choosesets/thchoosesetsslave.cpp @@ -308,7 +308,6 @@ class ChooseSetsPlusActivity : public CSlaveActivity, implements ILookAheadStopN virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override { initMetaInfo(info); - info.buffersInput = true; info.isSequential = true; info.canReduceNumRows = true; info.canBufferInput = true; diff --git a/thorlcr/activities/countproject/thcountprojectslave.cpp b/thorlcr/activities/countproject/thcountprojectslave.cpp index 0727127b3bb..58b27bf9d62 100644 --- a/thorlcr/activities/countproject/thcountprojectslave.cpp +++ b/thorlcr/activities/countproject/thcountprojectslave.cpp @@ -235,7 +235,7 @@ class CountProjectActivity : public BaseCountProjectActivity, implements ILookAh virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override { initMetaInfo(info); - info.buffersInput = true; + info.canBufferInput = true; info.isSequential = true; calcMetaInfoSize(info, queryInput(0)); } diff --git a/thorlcr/activities/enth/thenthslave.cpp b/thorlcr/activities/enth/thenthslave.cpp index b91ca33a453..1a5529c2a14 100644 --- a/thorlcr/activities/enth/thenthslave.cpp +++ b/thorlcr/activities/enth/thenthslave.cpp @@ -110,7 +110,7 @@ class BaseEnthActivity : public CSlaveActivity, implements ILookAheadStopNotify void getMetaInfo(ThorDataLinkMetaInfo &info) const override { initMetaInfo(info); - info.buffersInput = true; + info.canBufferInput = true; info.canReduceNumRows = true; calcMetaInfoSize(info, input); } diff --git a/thorlcr/activities/filter/thfilterslave.cpp b/thorlcr/activities/filter/thfilterslave.cpp index a7bd4635428..bf475cb01c4 100644 --- a/thorlcr/activities/filter/thfilterslave.cpp +++ b/thorlcr/activities/filter/thfilterslave.cpp @@ -157,6 +157,11 @@ class CFilterSlaveActivity : public CFilterSlaveActivityBase, public CThorSteppa CThorSteppable::setInputStream(index, input, consumerOrdered); } virtual IInputSteppingMeta *querySteppingMeta() { return CThorSteppable::inputStepping; } + virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override + { + PARENT::getMetaInfo(info); + info.fastThrough = true; + } }; class CFilterProjectSlaveActivity : public CFilterSlaveActivityBase @@ -230,6 +235,11 @@ class CFilterProjectSlaveActivity : public CFilterSlaveActivityBase anyThisGroup = false; return NULL; } + virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override + { + PARENT::getMetaInfo(info); + info.fastThrough = true; + } }; class CFilterGroupSlaveActivity : public CFilterSlaveActivityBase, public CThorSteppable diff --git a/thorlcr/activities/firstn/thfirstnslave.cpp b/thorlcr/activities/firstn/thfirstnslave.cpp index 2140ab9a67b..0973c014b77 100644 --- a/thorlcr/activities/firstn/thfirstnslave.cpp +++ b/thorlcr/activities/firstn/thfirstnslave.cpp @@ -124,6 +124,11 @@ class CFirstNSlaveLocal : public CFirstNSlaveBase } return NULL; } + virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override + { + PARENT::getMetaInfo(info); + info.fastThrough = true; + } }; class CFirstNSlaveGrouped : public CFirstNSlaveBase diff --git a/thorlcr/activities/limit/thlimitslave.cpp b/thorlcr/activities/limit/thlimitslave.cpp index ffb55a2457d..2e0231e4db3 100644 --- a/thorlcr/activities/limit/thlimitslave.cpp +++ b/thorlcr/activities/limit/thlimitslave.cpp @@ -75,7 +75,6 @@ class CLimitSlaveActivityBase : public CSlaveActivity { initMetaInfo(info); info.canReduceNumRows = true; - info.canBufferInput = false; info.totalRowsMax = rowLimit; calcMetaInfoSize(info, queryInput(0)); } diff --git a/thorlcr/activities/msort/thgroupsortslave.cpp b/thorlcr/activities/msort/thgroupsortslave.cpp index 0e7c97898c8..4e6aac95e57 100644 --- a/thorlcr/activities/msort/thgroupsortslave.cpp +++ b/thorlcr/activities/msort/thgroupsortslave.cpp @@ -106,7 +106,7 @@ class CLocalSortSlaveActivity : public CSlaveActivity virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override { initMetaInfo(info); - info.buffersInput = true; + info.canBufferInput = true; calcMetaInfoSize(info, queryInput(0)); } }; @@ -183,6 +183,7 @@ class CSortedSlaveActivity : public CSlaveActivity, public CThorSteppable { initMetaInfo(info); calcMetaInfoSize(info, queryInput(0)); + info.fastThrough = true; } // steppable virtual void setInputStream(unsigned index, CThorInput &input, bool consumerOrdered) override diff --git a/thorlcr/activities/msort/thmsortslave.cpp b/thorlcr/activities/msort/thmsortslave.cpp index a4289e33d0e..14acc3dedb4 100644 --- a/thorlcr/activities/msort/thmsortslave.cpp +++ b/thorlcr/activities/msort/thmsortslave.cpp @@ -206,7 +206,7 @@ class MSortSlaveActivity : public CSlaveActivity virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override { initMetaInfo(info); - info.buffersInput = true; + info.canBufferInput = true; info.unknownRowsOutput = false; // shuffles rows if (totalrows!=RCUNSET) { // NB totalrows not available until after start info.totalRowsMin = totalrows; diff --git a/thorlcr/activities/nsplitter/thnsplitterslave.cpp b/thorlcr/activities/nsplitter/thnsplitterslave.cpp index 56c5dbe0c91..56158ae2381 100644 --- a/thorlcr/activities/nsplitter/thnsplitterslave.cpp +++ b/thorlcr/activities/nsplitter/thnsplitterslave.cpp @@ -208,6 +208,8 @@ class NSplitterSlaveActivity : public CSlaveActivity, implements ISharedSmartBuf PARENT::start(); initMetaInfo(cachedMetaInfo); + cachedMetaInfo.suppressLookAhead = true; + calcMetaInfoSize(cachedMetaInfo, queryInput(0)); ForEachItemIn(o, outputs) diff --git a/thorlcr/activities/pull/thpullslave.cpp b/thorlcr/activities/pull/thpullslave.cpp index f0279539eb5..ef0daa33064 100644 --- a/thorlcr/activities/pull/thpullslave.cpp +++ b/thorlcr/activities/pull/thpullslave.cpp @@ -54,7 +54,9 @@ class PullSlaveActivity : public CSlaveActivity virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override { initMetaInfo(info); - info.buffersInput = true; + info.canBufferInput = true; + info.suppressLookAhead = true; + info.fastThrough = true; calcMetaInfoSize(info, queryInput(0)); } }; diff --git a/thorlcr/activities/selfjoin/thselfjoinslave.cpp b/thorlcr/activities/selfjoin/thselfjoinslave.cpp index 075d28c9390..8a951a8fe06 100644 --- a/thorlcr/activities/selfjoin/thselfjoinslave.cpp +++ b/thorlcr/activities/selfjoin/thselfjoinslave.cpp @@ -224,7 +224,7 @@ class SelfJoinSlaveActivity : public CSlaveActivity virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override { initMetaInfo(info); - info.buffersInput = true; + info.canBufferInput = true; info.unknownRowsOutput = true; } virtual void gatherActiveStats(CRuntimeStatisticCollection &activeStats) const diff --git a/thorlcr/graph/thgraphslave.cpp b/thorlcr/graph/thgraphslave.cpp index b295efeec64..3974042bc12 100644 --- a/thorlcr/graph/thgraphslave.cpp +++ b/thorlcr/graph/thgraphslave.cpp @@ -124,6 +124,12 @@ bool CThorInput::isFastThrough() const { return itdl->queryFromActivity()->isFastThrough(); } + +bool CThorInput::suppressLookAhead() const +{ + return itdl->queryFromActivity()->suppressLookAhead(); +} + // CSlaveActivity::CSlaveActivity(CGraphElementBase *_container, const StatisticsMapping &statsMapping) @@ -251,28 +257,32 @@ bool CSlaveActivity::isInputFastThrough(unsigned index) const return input.isFastThrough(); } -/* If fastThrough, return false. - * If !fastThrough (indicating needs look ahead) and has existing lookahead, start it, return false. - * If !fastThrough (indicating needs look ahead) and no existing lookahead, return true, caller will install. +/* If fastThrough or suppressLookAhead, return false. + * If not (indicating needs look ahead) and has existing lookahead, start it, return false. + * If not (indicating needs look ahead) and no existing lookahead, return true, caller will install. * * NB: only return true if new lookahead needs installing. */ bool CSlaveActivity::ensureStartFTLookAhead(unsigned index) { CThorInput &input = inputs.item(index); - if (input.isFastThrough()) + if (input.isFastThrough() || input.suppressLookAhead()) return false; // no look ahead required else { // look ahead required if (input.hasLookAhead()) { + //ActPrintLog("Already has lookahead"); // no change, start existing look ahead startLookAhead(index); return false; // no [new] look ahead required } else + { + //ActPrintLog("lookahead will be inserted"); return true; // new look ahead required + } } } @@ -308,7 +318,7 @@ bool CSlaveActivity::canStall() const getMetaInfo(info); if (info.canStall) return true; - if (info.isSource || info.buffersInput || info.canBufferInput) + if (info.isSource || info.canBufferInput) return false; for (unsigned i=0; iqueryFromActivity(); + if (!inputAct->suppressLookAhead()) + return false; + } + } + return true; +} IStrandJunction *CSlaveActivity::getOutputStreams(CActivityBase &ctx, unsigned idx, PointerArrayOf &streams, const CThorStrandOptions * consumerOptions, bool consumerOrdered, IOrderedCallbackCollection * orderedCallbacks) diff --git a/thorlcr/graph/thgraphslave.hpp b/thorlcr/graph/thgraphslave.hpp index c0239e4e027..f6b73c912c1 100644 --- a/thorlcr/graph/thgraphslave.hpp +++ b/thorlcr/graph/thgraphslave.hpp @@ -186,6 +186,7 @@ class CThorInput : public CSimpleInterfaceOf stopped = true; } bool isFastThrough() const; + bool suppressLookAhead() const; }; typedef IArrayOf CThorInputArray; @@ -266,6 +267,7 @@ class graphslave_decl CSlaveActivity : public CActivityBase, public CEdgeProgres void debugRequest(unsigned edgeIdx, MemoryBuffer &msg); bool canStall() const; bool isFastThrough() const; + bool suppressLookAhead() const; // IThorDataLink virtual CSlaveActivity *queryFromActivity() override { return this; } diff --git a/thorlcr/slave/slave.hpp b/thorlcr/slave/slave.hpp index 5b882d43091..1ebe27b7824 100644 --- a/thorlcr/slave/slave.hpp +++ b/thorlcr/slave/slave.hpp @@ -41,15 +41,14 @@ struct ThorDataLinkMetaInfo __int64 totalRowsMax = -1; // set to -1 if not known offset_t spilled = (offset_t)-1; // amount "spilled" to disk (approx) (offset_t)-1 for not known - bool isSource = false; - bool isSequential = false; - bool canStall = false; - bool fastThrough = false; - bool buffersInput = false; - bool canBufferInput = false; - bool singleRowOutput = false; - bool canIncreaseNumRows = false; - bool canReduceNumRows = false; + bool isSource = false; // A source activity (disk read, index read, etc) + bool isSequential = false; // There is a sequential nature to the implementation, workers dependent on previous worker (e.g. global ChooseN) + bool canStall = false; // The activity may stall if its outputs are not pulled on each worker + bool fastThrough = false; // The activity will return rows quickly if it can (does not mean it can't block on its input) + bool canBufferInput = false; // The activity caches input rows + bool suppressLookAhead = false; // Downstream activities should avoid inserting lookaheads + bool canIncreaseNumRows = false; // The activity can produce more rows than it reads from its input (e.g. NORMALIZE) + bool canReduceNumRows = false; // The activity can produce less rows than it reads from its input (e.g. ENTH) bool unknownRowsOutput = false; // cannot use input to deduce total offset_t byteTotal = (offset_t)-1; // total (uncompressed) byte count of all rows };