Skip to content

Commit

Permalink
Merge pull request #18378 from ghalliday/newContext
Browse files Browse the repository at this point in the history
HPCC-31389 HPCC Refactor the way per-thread job information is initialised

Reviewed-by: Jake Smith <[email protected]>
Merged-by: Gavin Halliday <[email protected]>
  • Loading branch information
ghalliday authored Mar 8, 2024
2 parents 91ea460 + 4f24b05 commit 16f61e0
Show file tree
Hide file tree
Showing 133 changed files with 429 additions and 374 deletions.
6 changes: 3 additions & 3 deletions common/thorhelper/persistent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -378,10 +378,10 @@ class CPersistentHandler : implements IPersistentHandler, implements ISocketSele
}

//Thread
virtual void start() override
virtual void start(bool inheritThreadContext) override
{
m_selectHandler->start();
Thread::start();
Thread::start(inheritThreadContext);
PERSILOG(PersistentLogLevel::PLogNormal, "PERSISTENT: Handler %d started with max idle time %d and max requests %d", m_id, m_maxIdleTime, m_maxReqs);
}

Expand Down Expand Up @@ -462,6 +462,6 @@ int CPersistentHandler::CurID = 0;
IPersistentHandler* createPersistentHandler(IPersistentSelectNotify* notify, int maxIdleTime, int maxReqs, PersistentLogLevel loglevel, bool enableDoNotReuseList)
{
Owned<CPersistentHandler> handler = new CPersistentHandler(notify, maxIdleTime, maxReqs, loglevel, enableDoNotReuseList);
handler->start();
handler->start(false);
return handler.getClear();
}
2 changes: 1 addition & 1 deletion common/thorhelper/thorpipe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class CPipeErrorHelper : public Thread, implements IPipeErrorHelper
void run(IPipeProcess *_pipe)
{
pipe.set(_pipe);
this->start();
this->start(true);
}

void wait()
Expand Down
4 changes: 2 additions & 2 deletions common/thorhelper/thorsoapcall.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ class BlackLister : public CInterface, implements IThreadFactory

BlackLister()
{
pool.setown(createThreadPool("SocketBlacklistPool", this, NULL, 0, 0));
pool.setown(createThreadPool("SocketBlacklistPool", this, false, nullptr, 0, 0));
}

ISocket* connect(SocketEndpoint &ep,
Expand Down Expand Up @@ -1244,7 +1244,7 @@ class CWSCHelper : implements IWSCHelper, public CInterface
complete = aborted = timeLimitExceeded = false;

ForEachItemIn(i,threads)
threads.item(i).start();
threads.item(i).start(true); // inherit context because the threads always have a shorter lifetime than the caller.
}
void abort()
{
Expand Down
4 changes: 2 additions & 2 deletions common/thorhelper/thorstrand.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class CStrandBarrier : public CInterfaceOf<IStrandBarrier>
{
CThreaded * thread = new CThreaded("Strand", &strand);
threads.append(*thread);
thread->start();
thread->start(true);
}

virtual void waitForStrands()
Expand Down Expand Up @@ -109,7 +109,7 @@ class CStrandJunction : public CInterfaceOf<IStrandJunction>
{
CThreaded * thread = new CThreaded("ReadAheadThread", &mainthread);
threads.append(*thread);
thread->start();
thread->start(true);
}

void processConsumerStop()
Expand Down
2 changes: 1 addition & 1 deletion common/workunit/workunit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14433,7 +14433,7 @@ void executeThorGraph(const char * graphName, IConstWorkUnit &workunit, const IP
}
} pollthread(jq, &workunit, timelimit*1000);

pollthread.start();
pollthread.start(false);

{
Owned<IWorkUnit> w = &workunit.lock();
Expand Down
2 changes: 1 addition & 1 deletion dali/base/daaudit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ class CDaliAuditServer: public IDaliServer, public Thread

void start()
{
Thread::start();
Thread::start(false);
}

void ready()
Expand Down
2 changes: 1 addition & 1 deletion dali/base/dadfs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10840,7 +10840,7 @@ class CDaliDFSServer: public Thread, public CTransactionLogTracker, implements I

void start()
{
Thread::start();
Thread::start(false);
}

void ready()
Expand Down
2 changes: 1 addition & 1 deletion dali/base/dadiags.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class CDaliDiagnosticsServer: public IDaliServer, public Thread

void start()
{
Thread::start();
Thread::start(false);
}

void ready()
Expand Down
6 changes: 3 additions & 3 deletions dali/base/danqs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ class CNamedQueueSubscriptionProxy: public Thread
{
owner = _owner;
finished = false;
start();
start(false);
}

~CNamedQueueSubscriptionProxy();
Expand Down Expand Up @@ -292,7 +292,7 @@ class CQueueChannel: implements IQueueChannel, public CInterface
return 0;
}
} timeoutthread(this,timeout);
timeoutthread.start();
timeoutthread.start(false);
bool ret = handler.get(buf);
timeoutthread.sem.signal();
timeoutthread.join();
Expand Down Expand Up @@ -493,7 +493,7 @@ class CDaliNamedQueueServer: public IDaliServer, public Thread, implements IConn

void start()
{
Thread::start();
Thread::start(false);
}

void ready()
Expand Down
10 changes: 5 additions & 5 deletions dali/base/dasds.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1410,7 +1410,7 @@ class CDeltaWriter : implements IThreaded
PROGLOG("%s", msg.str());

if ((transactionQueueLimit > 1) && (transactionMaxMem > 0))
threaded.init(this);
threaded.init(this, false);
else
PROGLOG("All transactions will be committed synchronously");
}
Expand Down Expand Up @@ -5075,7 +5075,7 @@ class CLightCoalesceThread : implements ICoalesce, public CInterface
CLightCoalesceThread *coalesce;
public:
CThreaded() : Thread("CLightCoalesceThread") { coalesce = NULL; }
void init(CLightCoalesceThread *_coalesce) { coalesce = _coalesce; start(); }
void init(CLightCoalesceThread *_coalesce) { coalesce = _coalesce; start(false); }
virtual int run() { coalesce->threadmain(); return 1; }
} threaded;
public:
Expand Down Expand Up @@ -7269,7 +7269,7 @@ unsigned CCovenSDSManager::queryCount(const char *xpath)

void CCovenSDSManager::start()
{
server.start();
server.start(false);
if (coalesce) coalesce->start();
}

Expand Down Expand Up @@ -8367,7 +8367,7 @@ void CCovenSDSManager::handleNotify(CSubscriberContainerBase *_subscriber, Memor
if (!notifyPool)
{
CNotifyPoolFactory *factory = new CNotifyPoolFactory;
notifyPool.setown(createThreadPool("SDS Notification Pool", factory, this, SUBNTFY_POOL_SIZE));
notifyPool.setown(createThreadPool("SDS Notification Pool", factory, false, this, SUBNTFY_POOL_SIZE));
factory->Release();
}

Expand Down Expand Up @@ -8699,7 +8699,7 @@ void CCovenSDSManager::startNotification(IPropertyTree &changeTree, CPTStack &st
if (!scanNotifyPool)
{
CScanNotifyPoolFactory *factory = new CScanNotifyPoolFactory;
scanNotifyPool.setown(createThreadPool("SDS Scan-Notification Pool", factory, this, SUBSCAN_POOL_SIZE));
scanNotifyPool.setown(createThreadPool("SDS Scan-Notification Pool", factory, false, this, SUBSCAN_POOL_SIZE));
factory->Release();
}

Expand Down
8 changes: 4 additions & 4 deletions dali/base/dasess.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ class CdelayedTerminate: public Thread // slightly obfuscated stop code
CdelayedTerminate(byte _err)
{
err = _err;
start();
start(false);
Release();
Sleep(100);
}
Expand Down Expand Up @@ -1176,7 +1176,7 @@ class CLdapWorkItem : public Thread
ret = CLDAPE_ldapfailure;
if (!running) {
running = true;
Thread::start();
Thread::start(false);
}
contsem.signal();
}
Expand Down Expand Up @@ -1283,7 +1283,7 @@ class CCovenSessionManager: public CSessionManagerBase, implements ISessionManag

void start()
{
sessionrequestserver.start();
sessionrequestserver.start(false);
}

void stop()
Expand Down Expand Up @@ -1927,7 +1927,7 @@ bool registerClientProcess(ICommunicator *comm, IGroup *& retcoven,unsigned time
return 0;
}
} *t = new cThread(comm,r,remaining);
t->start();
t->start(false);
t->sem.wait(remaining);
ok = t->ok;
if (t->exc.get()) {
Expand Down
4 changes: 2 additions & 2 deletions dali/base/dasubs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ class CDaliPublisherServer: public IDaliServer, public Thread, public CDaliPubli
void start()
{
running = true;
Thread::start();
Thread::start(false);
}
void ready()
{
Expand Down Expand Up @@ -552,7 +552,7 @@ class CDaliPublisherClient: public Thread, public CDaliPublisher
: Thread("CDaliPublisherClient")
{
running = true;
start();
start(false);
}

~CDaliPublisherClient()
Expand Down
2 changes: 1 addition & 1 deletion dali/base/dautils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2442,7 +2442,7 @@ class CTimedCache

void start()
{
thread.start();
thread.start(false);
}

void stop()
Expand Down
4 changes: 2 additions & 2 deletions dali/dalidiag/dalidiag.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -455,8 +455,8 @@ void nqPingPong(const char *q,const char *q2)
threads[0].num = 1;
threads[1].q = q;
threads[1].num = 2;
threads[0].start();
threads[1].start();
threads[0].start(false);
threads[1].start(false);
threads[0].join();
threads[1].join();
}
Expand Down
26 changes: 13 additions & 13 deletions dali/datest/datest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1178,7 +1178,7 @@ class TSDSTestPool : public CInterface, implements IThreadFactory
class CQPutTest : public Thread
{
public:
CQPutTest() : Thread("CQPutTest") { start(); }
CQPutTest() : Thread("CQPutTest") { start(false); }
virtual int run()
{
try {
Expand All @@ -1193,7 +1193,7 @@ class CQPutTest : public Thread
class CQGetTest : public Thread
{
public:
CQGetTest() : Thread("CQPutTest") { start(); }
CQGetTest() : Thread("CQPutTest") { start(false); }
virtual int run()
{
try {
Expand Down Expand Up @@ -1319,7 +1319,7 @@ class CChange : public Thread
conn.setown(querySDS().connect(_path, myProcessSession(), RTM_CREATE_QUERY, 1000000));

id1 = id2 = 0;
start();
start(false);
}
virtual int run()
{
Expand Down Expand Up @@ -1407,7 +1407,7 @@ void TestStress()
getTest.setown(new CQGetTest());
}
TSDSTestPool poolFactory;
Owned<IThreadPool> pool = createThreadPool("TSDSTest", &poolFactory);
Owned<IThreadPool> pool = createThreadPool("TSDSTest", &poolFactory, false, nullptr);

unsigned path = 0;
while (count)
Expand Down Expand Up @@ -1628,7 +1628,7 @@ void TestStress2()
conn->changeMode(RTM_LOCK_READ);

Owned<CStressPoolFactory> factory = new CStressPoolFactory();
Owned<IThreadPool> threadPool = createThreadPool("Stress2 Thread Pool", factory, NULL, 60);
Owned<IThreadPool> threadPool = createThreadPool("Stress2 Thread Pool", factory, false, nullptr, 60);

unsigned totalCount = 0;
unsigned subCount = 1;
Expand Down Expand Up @@ -1818,7 +1818,7 @@ void TestExternal()
class CSubTest : public Thread
{
public:
CSubTest(const char *_path) : path(_path) { start(); }
CSubTest(const char *_path) : path(_path) { start(false); }

virtual int run()
{
Expand Down Expand Up @@ -2562,8 +2562,8 @@ void TestSDS2()
CClientTestSDS *t1 = new CClientTestSDS();
CClientTestSDS *t2 = new CClientTestSDS();

t1->start();
t2->start();
t1->start(false);
t2->start(false);

t1->join();
t2->join();
Expand Down Expand Up @@ -2694,7 +2694,7 @@ void TestSDS3(IGroup *group)

unsigned nthreads = testParams.ordinality()?atoi(testParams.item(0)):10;
ReadWriteLock reinitLock;
Owned<IThreadPool> pool = createThreadPool("TSDS1", &poolFactory, NULL, nthreads);
Owned<IThreadPool> pool = createThreadPool("TSDS1", &poolFactory, false, nullptr, nthreads);

SDS3Params params;
params.reinitLock = &reinitLock;
Expand Down Expand Up @@ -2817,7 +2817,7 @@ void TestNodeSubs()
}
} poolFactory;

Owned<IThreadPool> pool = createThreadPool("TSDSTest", &poolFactory, NULL, 100, 100000);
Owned<IThreadPool> pool = createThreadPool("TSDSTest", &poolFactory, false, nullptr, 100, 100000);

unsigned tests = testParams.ordinality() ? atoi(testParams.item(0)) : 10;
for (unsigned t=0; t<tests; t++)
Expand Down Expand Up @@ -3179,7 +3179,7 @@ static void TestCriticalSection()
}
unsigned k;
for (k=0;k<NCCSTHREAD; k++)
threads[k]->start();
threads[k]->start(false);
for (k=0;k<NCCSTHREAD; k++)
threads[k]->join();
}
Expand Down Expand Up @@ -3225,7 +3225,7 @@ static void TestMemThreads()
}
unsigned k;
for (k=0;k<NCCSTHREAD; k++)
threads[k]->start();
threads[k]->start(false);
for (k=0;k<NCCSTHREAD; k++)
threads[k]->join();
}
Expand Down Expand Up @@ -3345,7 +3345,7 @@ int main(int argc, char* argv[])
#if defined(TEST_MEMTHREADS)
printf("start...\n");
TestMemThread2 t("test");
t.start();
t.start(false);
t.join();
printf("end...\n");
return 0;
Expand Down
6 changes: 4 additions & 2 deletions dali/dfu/dfurepl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,9 @@ struct ReplicateFileItem: extends CInterface
{
CriticalBlock block(sect);
stopping = false;
thread.start();
//MORE: This seems inefficient to create a thread for each item being replicated
//rather than using a thread pool. Should possibly pass true to start() to link to the dfuwu.
thread.start(false);
}

void stop()
Expand Down Expand Up @@ -471,7 +473,7 @@ class CReplicateServer: public CInterface, implements IThreaded, implements IRep
{
stopping = false;
thread.setown(new CThreaded("ReplicateServerThread"));
thread->init(this);
thread->init(this, false);
}

virtual void threadmain() override
Expand Down
4 changes: 2 additions & 2 deletions dali/dfu/dfurun.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,7 @@ class CDFUengine: public CInterface, implements IDFUengine
PROGLOG("DFU server waiting on queue %s",queuename);
cDFUlistener *lt = new cDFUlistener(this,queuename,false,serverstatus);
listeners.append(*lt);
lt->start();
lt->start(false);
}

void startMonitor(const char *queuename,CSDSServerStatus *serverstatus,unsigned timeout)
Expand All @@ -599,7 +599,7 @@ class CDFUengine: public CInterface, implements IDFUengine
PROGLOG("DFU monitor waiting on queue %s timeout %d",queuename,timeout);
cDFUlistener *lt = new cDFUmonitor(this,queuename,serverstatus,timeout);
listeners.append(*lt);
lt->start();
lt->start(false);
}

void joinListeners()
Expand Down
Loading

0 comments on commit 16f61e0

Please sign in to comment.