From a2a6508554875df2647f8e928cecc6494a3ed7d5 Mon Sep 17 00:00:00 2001 From: Shamser Ahmed Date: Thu, 7 Nov 2024 10:40:10 +0000 Subject: [PATCH] HPCC-32930 Changes following code review Signed-off-by: Shamser Ahmed --- thorlcr/activities/lookupjoin/thlookupjoinslave.cpp | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/thorlcr/activities/lookupjoin/thlookupjoinslave.cpp b/thorlcr/activities/lookupjoin/thlookupjoinslave.cpp index 6b183cdef8f..ad202800c13 100644 --- a/thorlcr/activities/lookupjoin/thlookupjoinslave.cpp +++ b/thorlcr/activities/lookupjoin/thlookupjoinslave.cpp @@ -1445,14 +1445,17 @@ class CInMemJoinBase : public CSlaveActivity, public CAllOrLookupHelper, } HTHELPER *queryTable() { return table; } IBitSet *queryRhsChannelStopSet() { dbgassertex(0 == queryJobChannelNumber()); return rhsChannelStop; } - void startLeftInput() + void startLeftInput(bool async=false) { try { + if (async) { LookAheadTimer t(slaveTimerStats, timeActivities); startInput(0); } + else + startInput(0); if (ensureStartFTLookAhead(0)) setLookAhead(0, createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), LOOKUPJOINL_SMART_BUFFER_SIZE, ::canStall(input), grouped, RCUNBOUND, this), false); left.set(inputStream); // can be replaced by loader stream @@ -1462,6 +1465,10 @@ class CInMemJoinBase : public CSlaveActivity, public CAllOrLookupHelper, leftexception.setown(e); } } + void startLeftInputAsync() + { + startLeftInput(true); + } virtual bool isRhsConstant() const { return rhsConstant; } // IThorSlaveActivity overloaded methods @@ -1533,6 +1540,7 @@ class CInMemJoinBase : public CSlaveActivity, public CAllOrLookupHelper, } virtual void start() override { + ActivityTimer s(slaveTimerStats, timeActivities); joined = 0; joinCounter = 0; candidateCounter = 0; @@ -1565,10 +1573,9 @@ class CInMemJoinBase : public CSlaveActivity, public CAllOrLookupHelper, } else { - CAsyncCallStart asyncLeftStart(std::bind(&CInMemJoinBase::startLeftInput, this)); + CAsyncCallStart asyncLeftStart(std::bind(&CInMemJoinBase::startLeftInputAsync, this)); try { - ActivityTimer timer(slaveTimerStats, timeActivities); startInput(1); rhsStartedBefore = true; }