Skip to content

Commit

Permalink
Merge pull request #18767 from jakesmith/metainfo-fastthrough
Browse files Browse the repository at this point in the history
HPCC-32060 Rationalize fastThrough lookahead logic

Reviewed-by: Gavin Halliday <[email protected]>
Merged-by: Gavin Halliday <[email protected]>
  • Loading branch information
ghalliday authored Jun 14, 2024
2 parents 8e61f84 + 42e4591 commit d0f1f8a
Show file tree
Hide file tree
Showing 15 changed files with 74 additions and 23 deletions.
1 change: 0 additions & 1 deletion thorlcr/activities/aggregate/thaggregateslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,6 @@ class AggregateSlaveActivity : public AggregateSlaveBase
virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
{
initMetaInfo(info);
info.singleRowOutput = true;
info.totalRowsMin=1;
info.totalRowsMax=1;
}
Expand Down
1 change: 0 additions & 1 deletion thorlcr/activities/choosesets/thchoosesetsslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,6 @@ class ChooseSetsPlusActivity : public CSlaveActivity, implements ILookAheadStopN
virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
{
initMetaInfo(info);
info.buffersInput = true;
info.isSequential = true;
info.canReduceNumRows = true;
info.canBufferInput = true;
Expand Down
2 changes: 1 addition & 1 deletion thorlcr/activities/countproject/thcountprojectslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ class CountProjectActivity : public BaseCountProjectActivity, implements ILookAh
virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
{
initMetaInfo(info);
info.buffersInput = true;
info.canBufferInput = true;
info.isSequential = true;
calcMetaInfoSize(info, queryInput(0));
}
Expand Down
2 changes: 1 addition & 1 deletion thorlcr/activities/enth/thenthslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ class BaseEnthActivity : public CSlaveActivity, implements ILookAheadStopNotify
void getMetaInfo(ThorDataLinkMetaInfo &info) const override
{
initMetaInfo(info);
info.buffersInput = true;
info.canBufferInput = true;
info.canReduceNumRows = true;
calcMetaInfoSize(info, input);
}
Expand Down
10 changes: 10 additions & 0 deletions thorlcr/activities/filter/thfilterslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,11 @@ class CFilterSlaveActivity : public CFilterSlaveActivityBase, public CThorSteppa
CThorSteppable::setInputStream(index, input, consumerOrdered);
}
virtual IInputSteppingMeta *querySteppingMeta() { return CThorSteppable::inputStepping; }
virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
{
PARENT::getMetaInfo(info);
info.fastThrough = true;
}
};

class CFilterProjectSlaveActivity : public CFilterSlaveActivityBase
Expand Down Expand Up @@ -230,6 +235,11 @@ class CFilterProjectSlaveActivity : public CFilterSlaveActivityBase
anyThisGroup = false;
return NULL;
}
virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
{
PARENT::getMetaInfo(info);
info.fastThrough = true;
}
};

class CFilterGroupSlaveActivity : public CFilterSlaveActivityBase, public CThorSteppable
Expand Down
5 changes: 5 additions & 0 deletions thorlcr/activities/firstn/thfirstnslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,11 @@ class CFirstNSlaveLocal : public CFirstNSlaveBase
}
return NULL;
}
virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
{
PARENT::getMetaInfo(info);
info.fastThrough = true;
}
};

class CFirstNSlaveGrouped : public CFirstNSlaveBase
Expand Down
1 change: 0 additions & 1 deletion thorlcr/activities/limit/thlimitslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ class CLimitSlaveActivityBase : public CSlaveActivity
{
initMetaInfo(info);
info.canReduceNumRows = true;
info.canBufferInput = false;
info.totalRowsMax = rowLimit;
calcMetaInfoSize(info, queryInput(0));
}
Expand Down
3 changes: 2 additions & 1 deletion thorlcr/activities/msort/thgroupsortslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ class CLocalSortSlaveActivity : public CSlaveActivity
virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
{
initMetaInfo(info);
info.buffersInput = true;
info.canBufferInput = true;
calcMetaInfoSize(info, queryInput(0));
}
};
Expand Down Expand Up @@ -183,6 +183,7 @@ class CSortedSlaveActivity : public CSlaveActivity, public CThorSteppable
{
initMetaInfo(info);
calcMetaInfoSize(info, queryInput(0));
info.fastThrough = true;
}
// steppable
virtual void setInputStream(unsigned index, CThorInput &input, bool consumerOrdered) override
Expand Down
2 changes: 1 addition & 1 deletion thorlcr/activities/msort/thmsortslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ class MSortSlaveActivity : public CSlaveActivity
virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
{
initMetaInfo(info);
info.buffersInput = true;
info.canBufferInput = true;
info.unknownRowsOutput = false; // shuffles rows
if (totalrows!=RCUNSET) { // NB totalrows not available until after start
info.totalRowsMin = totalrows;
Expand Down
2 changes: 2 additions & 0 deletions thorlcr/activities/nsplitter/thnsplitterslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,8 @@ class NSplitterSlaveActivity : public CSlaveActivity, implements ISharedSmartBuf

PARENT::start();
initMetaInfo(cachedMetaInfo);
cachedMetaInfo.suppressLookAhead = true;

calcMetaInfoSize(cachedMetaInfo, queryInput(0));

ForEachItemIn(o, outputs)
Expand Down
4 changes: 3 additions & 1 deletion thorlcr/activities/pull/thpullslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ class PullSlaveActivity : public CSlaveActivity
virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
{
initMetaInfo(info);
info.buffersInput = true;
info.canBufferInput = true;
info.suppressLookAhead = true;
info.fastThrough = true;
calcMetaInfoSize(info, queryInput(0));
}
};
Expand Down
2 changes: 1 addition & 1 deletion thorlcr/activities/selfjoin/thselfjoinslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ class SelfJoinSlaveActivity : public CSlaveActivity
virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
{
initMetaInfo(info);
info.buffersInput = true;
info.canBufferInput = true;
info.unknownRowsOutput = true;
}
virtual void gatherActiveStats(CRuntimeStatisticCollection &activeStats) const
Expand Down
43 changes: 38 additions & 5 deletions thorlcr/graph/thgraphslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,12 @@ bool CThorInput::isFastThrough() const
{
return itdl->queryFromActivity()->isFastThrough();
}

bool CThorInput::suppressLookAhead() const
{
return itdl->queryFromActivity()->suppressLookAhead();
}

//

CSlaveActivity::CSlaveActivity(CGraphElementBase *_container, const StatisticsMapping &statsMapping)
Expand Down Expand Up @@ -251,28 +257,32 @@ bool CSlaveActivity::isInputFastThrough(unsigned index) const
return input.isFastThrough();
}

/* If fastThrough, return false.
* If !fastThrough (indicating needs look ahead) and has existing lookahead, start it, return false.
* If !fastThrough (indicating needs look ahead) and no existing lookahead, return true, caller will install.
/* If fastThrough or suppressLookAhead, return false.
* If not (indicating needs look ahead) and has existing lookahead, start it, return false.
* If not (indicating needs look ahead) and no existing lookahead, return true, caller will install.
*
* NB: only return true if new lookahead needs installing.
*/
bool CSlaveActivity::ensureStartFTLookAhead(unsigned index)
{
CThorInput &input = inputs.item(index);
if (input.isFastThrough())
if (input.isFastThrough() || input.suppressLookAhead())
return false; // no look ahead required
else
{
// look ahead required
if (input.hasLookAhead())
{
//ActPrintLog("Already has lookahead");
// no change, start existing look ahead
startLookAhead(index);
return false; // no [new] look ahead required
}
else
{
//ActPrintLog("lookahead will be inserted");
return true; // new look ahead required
}
}
}

Expand Down Expand Up @@ -308,7 +318,7 @@ bool CSlaveActivity::canStall() const
getMetaInfo(info);
if (info.canStall)
return true;
if (info.isSource || info.buffersInput || info.canBufferInput)
if (info.isSource || info.canBufferInput)
return false;

for (unsigned i=0; i<queryNumInputs(); i++)
Expand All @@ -324,6 +334,29 @@ bool CSlaveActivity::canStall() const
return false;
}

// check if activity is suppressLookAhead, or if fastThrough, check that inputs are suppressLookAhead
bool CSlaveActivity::suppressLookAhead() const
{
if (!hasStarted())
return true;
ThorDataLinkMetaInfo info;
getMetaInfo(info);
if (info.suppressLookAhead)
return true;
if (!info.fastThrough || info.canStall) // NB: JIC - but should never be marked fastThrough==true if canStall==true
return false;
for (unsigned i=0; i<queryNumInputs(); i++)
{
IThorDataLink *input = queryInput(i);
if (input && queryInputStarted(i))
{
CSlaveActivity *inputAct = input->queryFromActivity();
if (!inputAct->suppressLookAhead())
return false;
}
}
return true;
}


IStrandJunction *CSlaveActivity::getOutputStreams(CActivityBase &ctx, unsigned idx, PointerArrayOf<IEngineRowStream> &streams, const CThorStrandOptions * consumerOptions, bool consumerOrdered, IOrderedCallbackCollection * orderedCallbacks)
Expand Down
2 changes: 2 additions & 0 deletions thorlcr/graph/thgraphslave.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ class CThorInput : public CSimpleInterfaceOf<IInterface>
stopped = true;
}
bool isFastThrough() const;
bool suppressLookAhead() const;
};
typedef IArrayOf<CThorInput> CThorInputArray;

Expand Down Expand Up @@ -266,6 +267,7 @@ class graphslave_decl CSlaveActivity : public CActivityBase, public CEdgeProgres
void debugRequest(unsigned edgeIdx, MemoryBuffer &msg);
bool canStall() const;
bool isFastThrough() const;
bool suppressLookAhead() const;

// IThorDataLink
virtual CSlaveActivity *queryFromActivity() override { return this; }
Expand Down
17 changes: 8 additions & 9 deletions thorlcr/slave/slave.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,14 @@ struct ThorDataLinkMetaInfo
__int64 totalRowsMax = -1; // set to -1 if not known
offset_t spilled = (offset_t)-1; // amount "spilled" to disk (approx) (offset_t)-1 for not known

bool isSource = false;
bool isSequential = false;
bool canStall = false;
bool fastThrough = false;
bool buffersInput = false;
bool canBufferInput = false;
bool singleRowOutput = false;
bool canIncreaseNumRows = false;
bool canReduceNumRows = false;
bool isSource = false; // A source activity (disk read, index read, etc)
bool isSequential = false; // There is a sequential nature to the implementation, workers dependent on previous worker (e.g. global ChooseN)
bool canStall = false; // The activity may stall if its outputs are not pulled on each worker
bool fastThrough = false; // The activity will return rows quickly if it can (does not mean it can't block on its input)
bool canBufferInput = false; // The activity caches input rows
bool suppressLookAhead = false; // Downstream activities should avoid inserting lookaheads
bool canIncreaseNumRows = false; // The activity can produce more rows than it reads from its input (e.g. NORMALIZE)
bool canReduceNumRows = false; // The activity can produce less rows than it reads from its input (e.g. ENTH)
bool unknownRowsOutput = false; // cannot use input to deduce total
offset_t byteTotal = (offset_t)-1; // total (uncompressed) byte count of all rows
};
Expand Down

0 comments on commit d0f1f8a

Please sign in to comment.