From 4f24b051b46667c9601e22860429b2770b287eb5 Mon Sep 17 00:00:00 2001 From: Gavin Halliday Date: Fri, 1 Mar 2024 12:50:41 +0000 Subject: [PATCH] HPCC-31389 HPCC Refactor the way per-thread job information is initialised Signed-off-by: Gavin Halliday --- common/thorhelper/persistent.cpp | 6 +- common/thorhelper/thorpipe.cpp | 2 +- common/thorhelper/thorsoapcall.cpp | 4 +- common/thorhelper/thorstrand.cpp | 4 +- common/workunit/workunit.cpp | 2 +- dali/base/daaudit.cpp | 2 +- dali/base/dadfs.cpp | 2 +- dali/base/dadiags.cpp | 2 +- dali/base/danqs.cpp | 6 +- dali/base/dasds.cpp | 10 +-- dali/base/dasess.cpp | 8 +- dali/base/dasubs.cpp | 4 +- dali/base/dautils.cpp | 2 +- dali/dalidiag/dalidiag.cpp | 4 +- dali/datest/datest.cpp | 26 +++--- dali/dfu/dfurepl.cpp | 6 +- dali/dfu/dfurun.cpp | 4 +- dali/dfuplus/dfuplus.cpp | 2 +- dali/ft/daftformat.cpp | 2 +- dali/ft/filecopy.cpp | 4 +- dali/sasha/saarch.cpp | 2 +- dali/sasha/sacoalescer.cpp | 4 +- dali/sasha/saqmon.cpp | 2 +- dali/sasha/saserver.cpp | 4 +- dali/sasha/saverify.cpp | 4 +- dali/sasha/saxref.cpp | 4 +- deployment/deploy/DeploymentEngine.cpp | 2 +- ecl/agentexec/agentexec.cpp | 2 +- ecl/eclagent/eclagent.cpp | 14 +-- ecl/eclcc/eclcc.cpp | 4 +- ecl/eclccserver/eclccserver.cpp | 10 +-- ecl/eclcmd/eclcmd_common.cpp | 2 +- ecl/hthor/hthor.cpp | 2 +- ecl/hthor/hthorkey.cpp | 8 +- ecl/schedulectrl/eventqueue.cpp | 4 +- esp/bindings/http/platform/httpprot.cpp | 10 ++- esp/clients/ws_dfsclient/ws_dfsclient.cpp | 2 +- esp/clients/wsdfuaccess/wsdfuaccess.cpp | 2 +- esp/logging/logginglib/logthread.cpp | 2 +- esp/logging/logginglib/logthread.hpp | 2 +- esp/platform/espcfg.cpp | 2 +- esp/platform/espp.hpp | 2 +- esp/platform/espthread.cpp | 6 -- esp/platform/espthread.hpp | 1 - esp/services/WsDeploy/WsDeployService.hpp | 10 +-- esp/services/esdl_svc_engine/esdl_monitor.cpp | 2 +- esp/services/ws_dfu/ws_dfuXRefService.cpp | 2 +- esp/services/ws_fs/ws_fsService.cpp | 2 +- esp/services/ws_machine/ws_machineService.cpp | 2 +- .../ws_workunits/ws_workunitsHelpers.cpp | 2 +- .../ws_workunits/ws_workunitsService.cpp | 4 +- .../ws_workunits/ws_workunitsService.hpp | 2 +- esp/smc/SMCLib/InfoCacheReader.hpp | 2 +- esp/test/httptest/httptest.cpp | 8 +- esp/tools/soapplus/http.cpp | 2 +- esp/tools/soapplus/httpproxy.cpp | 12 +-- esp/tools/soapplus/main.cpp | 4 +- fs/dafilesrv/dafilesrv.cpp | 2 +- fs/dafsserver/dafsserver.cpp | 9 +- plugins/cassandra/cassandrawu.cpp | 2 +- plugins/couchbase/couchbaseembed.cpp | 2 +- plugins/kafka/kafka.cpp | 4 +- plugins/mysql/mysqlembed.cpp | 2 +- roxie/ccd/ccddali.cpp | 6 +- roxie/ccd/ccdfile.cpp | 8 +- roxie/ccd/ccdlistener.cpp | 6 +- roxie/ccd/ccdmain.cpp | 2 +- roxie/ccd/ccdprotocol.cpp | 6 +- roxie/ccd/ccdqueue.cpp | 12 +-- roxie/ccd/ccdserver.cpp | 10 +-- roxie/ccd/ccdsnmp.cpp | 2 +- roxie/ccd/ccdstate.cpp | 4 +- roxie/roxiemem/roxiemem.cpp | 12 +-- roxie/roxiepipe/roxiepipe.cpp | 2 +- roxie/udplib/udpsha.cpp | 6 +- roxie/udplib/udptrr.cpp | 14 +-- roxie/udplib/udptrs.cpp | 10 +-- roxie/udplib/uttest.cpp | 6 +- rtl/eclrtl/eclrtl.cpp | 6 +- system/jhtree/keydiff.cpp | 2 +- system/jlib/jcomp.cpp | 2 +- system/jlib/jdebug.cpp | 2 +- system/jlib/jfile.cpp | 2 +- system/jlib/jlog.cpp | 45 +++++++++- system/jlib/jlog.hpp | 18 +++- system/jlib/jpqueue.hpp | 3 +- system/jlib/jsecrets.cpp | 2 +- system/jlib/jsmartsock.cpp | 4 +- system/jlib/jsocket.cpp | 8 +- system/jlib/jsort.cpp | 2 +- system/jlib/jtask.cpp | 2 +- system/jlib/jthread.cpp | 87 +++++++++---------- system/jlib/jthread.hpp | 21 +++-- system/mp/mpcomm.cpp | 8 +- system/mp/mplog.cpp | 6 +- system/mp/mplog.ipp | 2 +- system/mp/mputil.hpp | 2 +- system/mp/test/mptest.cpp | 12 +-- .../ldapsecuritytest/ldapsecuritytest.cpp | 2 +- system/xmllibtest/xmllibtest.cpp | 6 +- testing/unittests/dalitests.cpp | 6 +- testing/unittests/jlibtests.cpp | 8 +- testing/unittests/unittests.cpp | 48 +++++----- thorlcr/activities/funnel/thfunnelslave.cpp | 2 +- .../hashdistrib/thhashdistribslave.cpp | 8 +- .../keyedjoin/thkeyedjoinslave-legacy.cpp | 8 +- .../activities/keyedjoin/thkeyedjoinslave.cpp | 4 +- .../lookupjoin/thlookupjoinslave.cpp | 8 +- thorlcr/activities/loop/thloopslave.cpp | 2 +- thorlcr/activities/merge/thmergeslave.cpp | 2 +- thorlcr/activities/msort/thsortu.cpp | 8 +- .../activities/nsplitter/thnsplitterslave.cpp | 2 +- thorlcr/activities/piperead/thprslave.cpp | 2 +- thorlcr/activities/project/thprojectslave.cpp | 2 +- thorlcr/activities/thactivityutil.cpp | 4 +- thorlcr/activities/thactivityutil.ipp | 2 +- thorlcr/activities/wuidwrite/thwuidwrite.cpp | 2 +- thorlcr/graph/thgraph.cpp | 2 +- thorlcr/graph/thgraphmaster.cpp | 6 +- thorlcr/master/mawatchdog.cpp | 2 +- thorlcr/master/thgraphmanager.cpp | 17 +++- thorlcr/master/thmastermain.cpp | 4 +- thorlcr/msort/tsorts.cpp | 2 +- thorlcr/msort/tsorts1.cpp | 2 +- thorlcr/slave/backup.cpp | 2 +- thorlcr/slave/slave.cpp | 2 +- thorlcr/slave/slavmain.cpp | 6 +- thorlcr/slave/slwatchdog.cpp | 2 +- thorlcr/slave/thslavemain.cpp | 2 +- thorlcr/thorutil/thmem.cpp | 2 +- thorlcr/thorutil/thormisc.cpp | 2 +- thorlcr/thorutil/thormisc.hpp | 2 +- tools/testsocket/testsocket.cpp | 6 +- 133 files changed, 429 insertions(+), 374 deletions(-) diff --git a/common/thorhelper/persistent.cpp b/common/thorhelper/persistent.cpp index 9a3889e6195..11d0ef6783e 100644 --- a/common/thorhelper/persistent.cpp +++ b/common/thorhelper/persistent.cpp @@ -378,10 +378,10 @@ class CPersistentHandler : implements IPersistentHandler, implements ISocketSele } //Thread - virtual void start() override + virtual void start(bool inheritThreadContext) override { m_selectHandler->start(); - Thread::start(); + Thread::start(inheritThreadContext); PERSILOG(PersistentLogLevel::PLogNormal, "PERSISTENT: Handler %d started with max idle time %d and max requests %d", m_id, m_maxIdleTime, m_maxReqs); } @@ -462,6 +462,6 @@ int CPersistentHandler::CurID = 0; IPersistentHandler* createPersistentHandler(IPersistentSelectNotify* notify, int maxIdleTime, int maxReqs, PersistentLogLevel loglevel, bool enableDoNotReuseList) { Owned handler = new CPersistentHandler(notify, maxIdleTime, maxReqs, loglevel, enableDoNotReuseList); - handler->start(); + handler->start(false); return handler.getClear(); } diff --git a/common/thorhelper/thorpipe.cpp b/common/thorhelper/thorpipe.cpp index 1e0246aae17..f90b5923209 100644 --- a/common/thorhelper/thorpipe.cpp +++ b/common/thorhelper/thorpipe.cpp @@ -75,7 +75,7 @@ class CPipeErrorHelper : public Thread, implements IPipeErrorHelper void run(IPipeProcess *_pipe) { pipe.set(_pipe); - this->start(); + this->start(true); } void wait() diff --git a/common/thorhelper/thorsoapcall.cpp b/common/thorhelper/thorsoapcall.cpp index 903bbb52909..c8707a01ad6 100644 --- a/common/thorhelper/thorsoapcall.cpp +++ b/common/thorhelper/thorsoapcall.cpp @@ -404,7 +404,7 @@ class BlackLister : public CInterface, implements IThreadFactory BlackLister() { - pool.setown(createThreadPool("SocketBlacklistPool", this, NULL, 0, 0)); + pool.setown(createThreadPool("SocketBlacklistPool", this, false, nullptr, 0, 0)); } ISocket* connect(SocketEndpoint &ep, @@ -1244,7 +1244,7 @@ class CWSCHelper : implements IWSCHelper, public CInterface complete = aborted = timeLimitExceeded = false; ForEachItemIn(i,threads) - threads.item(i).start(); + threads.item(i).start(true); // inherit context because the threads always have a shorter lifetime than the caller. } void abort() { diff --git a/common/thorhelper/thorstrand.cpp b/common/thorhelper/thorstrand.cpp index c876c33105d..f38c755d12b 100644 --- a/common/thorhelper/thorstrand.cpp +++ b/common/thorhelper/thorstrand.cpp @@ -41,7 +41,7 @@ class CStrandBarrier : public CInterfaceOf { CThreaded * thread = new CThreaded("Strand", &strand); threads.append(*thread); - thread->start(); + thread->start(true); } virtual void waitForStrands() @@ -109,7 +109,7 @@ class CStrandJunction : public CInterfaceOf { CThreaded * thread = new CThreaded("ReadAheadThread", &mainthread); threads.append(*thread); - thread->start(); + thread->start(true); } void processConsumerStop() diff --git a/common/workunit/workunit.cpp b/common/workunit/workunit.cpp index ee9d3fecbf9..e5c391e9731 100644 --- a/common/workunit/workunit.cpp +++ b/common/workunit/workunit.cpp @@ -14433,7 +14433,7 @@ void executeThorGraph(const char * graphName, IConstWorkUnit &workunit, const IP } } pollthread(jq, &workunit, timelimit*1000); - pollthread.start(); + pollthread.start(false); { Owned w = &workunit.lock(); diff --git a/dali/base/daaudit.cpp b/dali/base/daaudit.cpp index a5004def56a..a09308c8806 100644 --- a/dali/base/daaudit.cpp +++ b/dali/base/daaudit.cpp @@ -236,7 +236,7 @@ class CDaliAuditServer: public IDaliServer, public Thread void start() { - Thread::start(); + Thread::start(false); } void ready() diff --git a/dali/base/dadfs.cpp b/dali/base/dadfs.cpp index 6e2c6b5f113..887cafe07f3 100644 --- a/dali/base/dadfs.cpp +++ b/dali/base/dadfs.cpp @@ -10840,7 +10840,7 @@ class CDaliDFSServer: public Thread, public CTransactionLogTracker, implements I void start() { - Thread::start(); + Thread::start(false); } void ready() diff --git a/dali/base/dadiags.cpp b/dali/base/dadiags.cpp index a6d8e5b06c0..961a732ec29 100644 --- a/dali/base/dadiags.cpp +++ b/dali/base/dadiags.cpp @@ -58,7 +58,7 @@ class CDaliDiagnosticsServer: public IDaliServer, public Thread void start() { - Thread::start(); + Thread::start(false); } void ready() diff --git a/dali/base/danqs.cpp b/dali/base/danqs.cpp index b2fd281d936..bf8bd7885fe 100644 --- a/dali/base/danqs.cpp +++ b/dali/base/danqs.cpp @@ -167,7 +167,7 @@ class CNamedQueueSubscriptionProxy: public Thread { owner = _owner; finished = false; - start(); + start(false); } ~CNamedQueueSubscriptionProxy(); @@ -292,7 +292,7 @@ class CQueueChannel: implements IQueueChannel, public CInterface return 0; } } timeoutthread(this,timeout); - timeoutthread.start(); + timeoutthread.start(false); bool ret = handler.get(buf); timeoutthread.sem.signal(); timeoutthread.join(); @@ -493,7 +493,7 @@ class CDaliNamedQueueServer: public IDaliServer, public Thread, implements IConn void start() { - Thread::start(); + Thread::start(false); } void ready() diff --git a/dali/base/dasds.cpp b/dali/base/dasds.cpp index ce63df31a7a..dcbc2bd7aa4 100644 --- a/dali/base/dasds.cpp +++ b/dali/base/dasds.cpp @@ -1410,7 +1410,7 @@ class CDeltaWriter : implements IThreaded PROGLOG("%s", msg.str()); if ((transactionQueueLimit > 1) && (transactionMaxMem > 0)) - threaded.init(this); + threaded.init(this, false); else PROGLOG("All transactions will be committed synchronously"); } @@ -5075,7 +5075,7 @@ class CLightCoalesceThread : implements ICoalesce, public CInterface CLightCoalesceThread *coalesce; public: CThreaded() : Thread("CLightCoalesceThread") { coalesce = NULL; } - void init(CLightCoalesceThread *_coalesce) { coalesce = _coalesce; start(); } + void init(CLightCoalesceThread *_coalesce) { coalesce = _coalesce; start(false); } virtual int run() { coalesce->threadmain(); return 1; } } threaded; public: @@ -7269,7 +7269,7 @@ unsigned CCovenSDSManager::queryCount(const char *xpath) void CCovenSDSManager::start() { - server.start(); + server.start(false); if (coalesce) coalesce->start(); } @@ -8367,7 +8367,7 @@ void CCovenSDSManager::handleNotify(CSubscriberContainerBase *_subscriber, Memor if (!notifyPool) { CNotifyPoolFactory *factory = new CNotifyPoolFactory; - notifyPool.setown(createThreadPool("SDS Notification Pool", factory, this, SUBNTFY_POOL_SIZE)); + notifyPool.setown(createThreadPool("SDS Notification Pool", factory, false, this, SUBNTFY_POOL_SIZE)); factory->Release(); } @@ -8699,7 +8699,7 @@ void CCovenSDSManager::startNotification(IPropertyTree &changeTree, CPTStack &st if (!scanNotifyPool) { CScanNotifyPoolFactory *factory = new CScanNotifyPoolFactory; - scanNotifyPool.setown(createThreadPool("SDS Scan-Notification Pool", factory, this, SUBSCAN_POOL_SIZE)); + scanNotifyPool.setown(createThreadPool("SDS Scan-Notification Pool", factory, false, this, SUBSCAN_POOL_SIZE)); factory->Release(); } diff --git a/dali/base/dasess.cpp b/dali/base/dasess.cpp index ba4a1e58b57..e356731ea86 100644 --- a/dali/base/dasess.cpp +++ b/dali/base/dasess.cpp @@ -198,7 +198,7 @@ class CdelayedTerminate: public Thread // slightly obfuscated stop code CdelayedTerminate(byte _err) { err = _err; - start(); + start(false); Release(); Sleep(100); } @@ -1176,7 +1176,7 @@ class CLdapWorkItem : public Thread ret = CLDAPE_ldapfailure; if (!running) { running = true; - Thread::start(); + Thread::start(false); } contsem.signal(); } @@ -1283,7 +1283,7 @@ class CCovenSessionManager: public CSessionManagerBase, implements ISessionManag void start() { - sessionrequestserver.start(); + sessionrequestserver.start(false); } void stop() @@ -1927,7 +1927,7 @@ bool registerClientProcess(ICommunicator *comm, IGroup *& retcoven,unsigned time return 0; } } *t = new cThread(comm,r,remaining); - t->start(); + t->start(false); t->sem.wait(remaining); ok = t->ok; if (t->exc.get()) { diff --git a/dali/base/dasubs.cpp b/dali/base/dasubs.cpp index 4b040530c92..d31f3660ed1 100644 --- a/dali/base/dasubs.cpp +++ b/dali/base/dasubs.cpp @@ -162,7 +162,7 @@ class CDaliPublisherServer: public IDaliServer, public Thread, public CDaliPubli void start() { running = true; - Thread::start(); + Thread::start(false); } void ready() { @@ -552,7 +552,7 @@ class CDaliPublisherClient: public Thread, public CDaliPublisher : Thread("CDaliPublisherClient") { running = true; - start(); + start(false); } ~CDaliPublisherClient() diff --git a/dali/base/dautils.cpp b/dali/base/dautils.cpp index 287c4334072..aba5b398f91 100644 --- a/dali/base/dautils.cpp +++ b/dali/base/dautils.cpp @@ -2442,7 +2442,7 @@ class CTimedCache void start() { - thread.start(); + thread.start(false); } void stop() diff --git a/dali/dalidiag/dalidiag.cpp b/dali/dalidiag/dalidiag.cpp index dfc9dabcbad..1aee4c12dfd 100644 --- a/dali/dalidiag/dalidiag.cpp +++ b/dali/dalidiag/dalidiag.cpp @@ -455,8 +455,8 @@ void nqPingPong(const char *q,const char *q2) threads[0].num = 1; threads[1].q = q; threads[1].num = 2; - threads[0].start(); - threads[1].start(); + threads[0].start(false); + threads[1].start(false); threads[0].join(); threads[1].join(); } diff --git a/dali/datest/datest.cpp b/dali/datest/datest.cpp index c0c7ecaf50d..88b23181d3e 100644 --- a/dali/datest/datest.cpp +++ b/dali/datest/datest.cpp @@ -1178,7 +1178,7 @@ class TSDSTestPool : public CInterface, implements IThreadFactory class CQPutTest : public Thread { public: - CQPutTest() : Thread("CQPutTest") { start(); } + CQPutTest() : Thread("CQPutTest") { start(false); } virtual int run() { try { @@ -1193,7 +1193,7 @@ class CQPutTest : public Thread class CQGetTest : public Thread { public: - CQGetTest() : Thread("CQPutTest") { start(); } + CQGetTest() : Thread("CQPutTest") { start(false); } virtual int run() { try { @@ -1319,7 +1319,7 @@ class CChange : public Thread conn.setown(querySDS().connect(_path, myProcessSession(), RTM_CREATE_QUERY, 1000000)); id1 = id2 = 0; - start(); + start(false); } virtual int run() { @@ -1407,7 +1407,7 @@ void TestStress() getTest.setown(new CQGetTest()); } TSDSTestPool poolFactory; - Owned pool = createThreadPool("TSDSTest", &poolFactory); + Owned pool = createThreadPool("TSDSTest", &poolFactory, false, nullptr); unsigned path = 0; while (count) @@ -1628,7 +1628,7 @@ void TestStress2() conn->changeMode(RTM_LOCK_READ); Owned factory = new CStressPoolFactory(); - Owned threadPool = createThreadPool("Stress2 Thread Pool", factory, NULL, 60); + Owned threadPool = createThreadPool("Stress2 Thread Pool", factory, false, nullptr, 60); unsigned totalCount = 0; unsigned subCount = 1; @@ -1818,7 +1818,7 @@ void TestExternal() class CSubTest : public Thread { public: - CSubTest(const char *_path) : path(_path) { start(); } + CSubTest(const char *_path) : path(_path) { start(false); } virtual int run() { @@ -2562,8 +2562,8 @@ void TestSDS2() CClientTestSDS *t1 = new CClientTestSDS(); CClientTestSDS *t2 = new CClientTestSDS(); - t1->start(); - t2->start(); + t1->start(false); + t2->start(false); t1->join(); t2->join(); @@ -2694,7 +2694,7 @@ void TestSDS3(IGroup *group) unsigned nthreads = testParams.ordinality()?atoi(testParams.item(0)):10; ReadWriteLock reinitLock; - Owned pool = createThreadPool("TSDS1", &poolFactory, NULL, nthreads); + Owned pool = createThreadPool("TSDS1", &poolFactory, false, nullptr, nthreads); SDS3Params params; params.reinitLock = &reinitLock; @@ -2817,7 +2817,7 @@ void TestNodeSubs() } } poolFactory; - Owned pool = createThreadPool("TSDSTest", &poolFactory, NULL, 100, 100000); + Owned pool = createThreadPool("TSDSTest", &poolFactory, false, nullptr, 100, 100000); unsigned tests = testParams.ordinality() ? atoi(testParams.item(0)) : 10; for (unsigned t=0; tstart(); + threads[k]->start(false); for (k=0;kjoin(); } @@ -3225,7 +3225,7 @@ static void TestMemThreads() } unsigned k; for (k=0;kstart(); + threads[k]->start(false); for (k=0;kjoin(); } @@ -3345,7 +3345,7 @@ int main(int argc, char* argv[]) #if defined(TEST_MEMTHREADS) printf("start...\n"); TestMemThread2 t("test"); - t.start(); + t.start(false); t.join(); printf("end...\n"); return 0; diff --git a/dali/dfu/dfurepl.cpp b/dali/dfu/dfurepl.cpp index f7b9c90a13c..cc431bdd4a8 100644 --- a/dali/dfu/dfurepl.cpp +++ b/dali/dfu/dfurepl.cpp @@ -229,7 +229,9 @@ struct ReplicateFileItem: extends CInterface { CriticalBlock block(sect); stopping = false; - thread.start(); + //MORE: This seems inefficient to create a thread for each item being replicated + //rather than using a thread pool. Should possibly pass true to start() to link to the dfuwu. + thread.start(false); } void stop() @@ -471,7 +473,7 @@ class CReplicateServer: public CInterface, implements IThreaded, implements IRep { stopping = false; thread.setown(new CThreaded("ReplicateServerThread")); - thread->init(this); + thread->init(this, false); } virtual void threadmain() override diff --git a/dali/dfu/dfurun.cpp b/dali/dfu/dfurun.cpp index 6dc1e1ee0b2..5e4ae47c28f 100644 --- a/dali/dfu/dfurun.cpp +++ b/dali/dfu/dfurun.cpp @@ -589,7 +589,7 @@ class CDFUengine: public CInterface, implements IDFUengine PROGLOG("DFU server waiting on queue %s",queuename); cDFUlistener *lt = new cDFUlistener(this,queuename,false,serverstatus); listeners.append(*lt); - lt->start(); + lt->start(false); } void startMonitor(const char *queuename,CSDSServerStatus *serverstatus,unsigned timeout) @@ -599,7 +599,7 @@ class CDFUengine: public CInterface, implements IDFUengine PROGLOG("DFU monitor waiting on queue %s timeout %d",queuename,timeout); cDFUlistener *lt = new cDFUmonitor(this,queuename,serverstatus,timeout); listeners.append(*lt); - lt->start(); + lt->start(false); } void joinListeners() diff --git a/dali/dfuplus/dfuplus.cpp b/dali/dfuplus/dfuplus.cpp index 0a729edbee4..f3d5c667640 100644 --- a/dali/dfuplus/dfuplus.cpp +++ b/dali/dfuplus/dfuplus.cpp @@ -155,7 +155,7 @@ bool CDfuPlusHelper::runLocalDaFileSvr(SocketEndpoint &listenep,bool requireauth progress("Started local Dali file server on %s%s\n", printep.getEndpointHostText(eps).str(), addlPort.str()); } - thr->start(); + thr->start(false); if (timeout==0) { setDafsTrace(nullptr,0); // disable client tracing diff --git a/dali/ft/daftformat.cpp b/dali/ft/daftformat.cpp index 02040383302..ecd55b91eaf 100644 --- a/dali/ft/daftformat.cpp +++ b/dali/ft/daftformat.cpp @@ -1855,7 +1855,7 @@ void CRemotePartitioner::calcPartitions(Semaphore * _sem) sem = _sem; #ifdef RUN_SLAVES_ON_THREADS - start(); + start(true); #else run(); #endif diff --git a/dali/ft/filecopy.cpp b/dali/ft/filecopy.cpp index d93ade88777..2288c874ae4 100644 --- a/dali/ft/filecopy.cpp +++ b/dali/ft/filecopy.cpp @@ -234,7 +234,7 @@ void FileTransferThread::go(Semaphore & _sem) else { #ifdef RUN_SLAVES_ON_THREADS - start(); + start(true); #else transferAndSignal(); #endif @@ -2021,7 +2021,7 @@ void FileSprayer::gatherFileSizes(FilePartInfoArray & fileSizeQueue, bool errorI for (idx = 0; idx < numThreads; idx++) threads.append(*new FileSizeThread(fileSizeQueue, fileSizeCS, compressedInput&&!copyCompressed, errorIfMissing)); for (idx = 0; idx < numThreads; idx++) - threads.item(idx).start(); + threads.item(idx).start(true); for (;;) { bool alldone = true; StringBuffer err; diff --git a/dali/sasha/saarch.cpp b/dali/sasha/saarch.cpp index acc51010536..84c55425791 100644 --- a/dali/sasha/saarch.cpp +++ b/dali/sasha/saarch.cpp @@ -1416,7 +1416,7 @@ class CSashaArchiverServerBase : public CSimpleInterfaceOf, implem } virtual void start() override { - threaded.start(); + threaded.start(false); } virtual void ready() override { diff --git a/dali/sasha/sacoalescer.cpp b/dali/sasha/sacoalescer.cpp index 7f22927c80a..7341411429a 100644 --- a/dali/sasha/sacoalescer.cpp +++ b/dali/sasha/sacoalescer.cpp @@ -188,7 +188,7 @@ class CSashaSDSCoalescingServer: public ISashaServer, public Thread { CriticalBlock b(suspendResumeCrit); stopped = false; - Thread::start(); + Thread::start(false); } void ready() @@ -221,7 +221,7 @@ class CSashaSDSCoalescingServer: public ISashaServer, public Thread if (!stopped) return; PROGLOG("COALESCER: resume"); stopped = false; - Thread::start(); + Thread::start(false); ready(); } diff --git a/dali/sasha/saqmon.cpp b/dali/sasha/saqmon.cpp index feab09e4f24..544c6c2880b 100644 --- a/dali/sasha/saqmon.cpp +++ b/dali/sasha/saqmon.cpp @@ -49,7 +49,7 @@ class CSashaQMonitorServer: public ISashaServer, public Thread void start() { - Thread::start(); + Thread::start(false); } void ready() diff --git a/dali/sasha/saserver.cpp b/dali/sasha/saserver.cpp index 16c8a4c221c..8dcc464b59d 100644 --- a/dali/sasha/saserver.cpp +++ b/dali/sasha/saserver.cpp @@ -250,7 +250,7 @@ void SashaMain() return new CSashaCmdThread; } } factory; - Owned threadpool = createThreadPool("sashaCmdPool",&factory); + Owned threadpool = createThreadPool("sashaCmdPool",&factory, false, nullptr); CMessageBuffer mb; while (!stopped) { try { @@ -461,7 +461,7 @@ int main(int argc, const char* argv[]) { CThreaded threaded; public: - CStopThread() : threaded("CStopThread") { threaded.init(this); } + CStopThread() : threaded("CStopThread") { threaded.init(this, false); } ~CStopThread() { threaded.join(); } virtual void threadmain() override { diff --git a/dali/sasha/saverify.cpp b/dali/sasha/saverify.cpp index aec54d09e2c..774fc4c7855 100644 --- a/dali/sasha/saverify.cpp +++ b/dali/sasha/saverify.cpp @@ -391,7 +391,7 @@ class CSashaVerifierServer: public ISashaServer, public Thread void start() { - Thread::start(); + Thread::start(false); } void ready() @@ -488,7 +488,7 @@ class CSashaDaFSMonitorServer: public ISashaServer, public Thread void start() { - Thread::start(); + Thread::start(false); } void ready() diff --git a/dali/sasha/saxref.cpp b/dali/sasha/saxref.cpp index a7ef0ce6685..eb223992d97 100644 --- a/dali/sasha/saxref.cpp +++ b/dali/sasha/saxref.cpp @@ -2033,7 +2033,7 @@ class CSashaXRefServer: public ISashaServer, public Thread void start() { - Thread::start(); + Thread::start(false); } void ready() @@ -2290,7 +2290,7 @@ class CSashaExpiryServer: public ISashaServer, public Thread void start() { - Thread::start(); + Thread::start(false); } void ready() diff --git a/deployment/deploy/DeploymentEngine.cpp b/deployment/deploy/DeploymentEngine.cpp index e86eaea0e86..9a182729961 100644 --- a/deployment/deploy/DeploymentEngine.cpp +++ b/deployment/deploy/DeploymentEngine.cpp @@ -1454,7 +1454,7 @@ void CDeploymentEngine::copyInstallFiles(const char* instanceName, int instanceI if (m_threadPool == NULL) { IThreadFactory* pThreadFactory = createDeployTaskThreadFactory(); - m_threadPool.setown(createThreadPool("Deploy Task Thread Pool", pThreadFactory, this, DEPLOY_THREAD_POOL_SIZE)); + m_threadPool.setown(createThreadPool("Deploy Task Thread Pool", pThreadFactory, false, this, DEPLOY_THREAD_POOL_SIZE)); pThreadFactory->Release(); } else diff --git a/ecl/agentexec/agentexec.cpp b/ecl/agentexec/agentexec.cpp index 3cb3364ebb4..ca5b686c2f2 100644 --- a/ecl/agentexec/agentexec.cpp +++ b/ecl/agentexec/agentexec.cpp @@ -91,7 +91,7 @@ CEclAgentExecutionServer::CEclAgentExecutionServer(IPropertyTree *_config) : con if (isContainerized()) // JCS - the pool approach would also work in bare-metal if it could be configured. { unsigned poolSize = config->getPropInt("@maxActive", 100); - pool.setown(createThreadPool("agentPool", this, NULL, poolSize, INFINITE)); + pool.setown(createThreadPool("agentPool", this, false, nullptr, poolSize, INFINITE)); } } diff --git a/ecl/eclagent/eclagent.cpp b/ecl/eclagent/eclagent.cpp index 1284dcc8b20..6d4f793b19d 100644 --- a/ecl/eclagent/eclagent.cpp +++ b/ecl/eclagent/eclagent.cpp @@ -135,7 +135,7 @@ void logGetResult(const char * name, const char * stepname, unsigned sequence) interface IHThorDebugSocketListener : extends IInterface { - virtual void start() = 0; + virtual void start(bool inheritThreadContext) = 0; virtual bool stop(unsigned timeout) = 0; virtual void stopListening() = 0; virtual unsigned queryPort() const = 0; @@ -166,13 +166,13 @@ class CHThorDebugSocketListener : public Thread, implements IHThorDebugSocketLis unsigned poolSize = 10;//MORE : What is a good threadcoutn? // Note we allow a few additional threads than requested - these are the threads that return "Too many active queries" responses - pool.setown(createThreadPool("HThorSocketWorkerPool", this, NULL, poolSize+5, INFINITE)); + pool.setown(createThreadPool("HThorSocketWorkerPool", this, false, nullptr, poolSize+5, INFINITE)); } - virtual void start() + virtual void start(bool inheritThreadContext) override { assertex(!running); - Thread::start(); + Thread::start(inheritThreadContext); started.wait(); } @@ -454,7 +454,7 @@ void CHThorDebugContext::debugInitialize(const char *_id, const char *_queryName //Start debug socket listener thread listener.setown(new CHThorDebugSocketListener(this)); - listener->start(); + listener->start(false); } void CHThorDebugContext::debugInterrupt(IXmlWriter *output) @@ -524,7 +524,7 @@ EclAgent::EclAgent(IConstWorkUnit *wu, const char *_wuid, bool _checkVersion, bo clusterNames.append(wuRead->queryClusterName()); clusterWidth = -1; abortmonitor = new cAbortMonitor(*this); - abortmonitor->start(); + abortmonitor->start(false); EnableSEHtoExceptionMapping(); setSEHtoExceptionHandler(abortmonitor); retcode = 0; @@ -3771,7 +3771,7 @@ extern int HTHOR_API eclagent_main(int argc, const char *argv[], Ownedstart(); + threads[i]->start(false); } } diff --git a/ecl/eclccserver/eclccserver.cpp b/ecl/eclccserver/eclccserver.cpp index 37837589bcf..6260d815069 100644 --- a/ecl/eclccserver/eclccserver.cpp +++ b/ecl/eclccserver/eclccserver.cpp @@ -700,8 +700,8 @@ class EclccCompileThread : implements IPooledThread, implements IErrorReporter, cycle_t startCompile = get_cycles_now(); if (!pipe->run(eclccProgName, eclccCmd, ".", true, false, true, 0, true)) throw makeStringExceptionV(999, "Failed to run eclcc command %s", eclccCmd.str()); - errorReader->start(); - abortWaiter.start(); + errorReader->start(true); + abortWaiter.start(true); try { pipe->write(eclQuery.s.length(), eclQuery.s.str()); @@ -892,7 +892,7 @@ class EclccCompileThread : implements IPooledThread, implements IErrorReporter, virtual void threadmain() override { - setDefaultJobId(wuid, true); + setDefaultJobName(wuid); DBGLOG("Compile request processing for workunit %s", wuid.get()); Owned config = getComponentConfig(); @@ -1079,7 +1079,7 @@ static void generatePrecompiledHeader() cmd.append("eclcc -pch"); if (pipe->run("eclcc", cmd, ".", false, false, true, 0)) { - errorReader->start(); + errorReader->start(true); unsigned retcode = pipe->wait(); errorReader->join(); if (retcode != 0 || errorReader->errCount() != 0) @@ -1182,7 +1182,7 @@ class EclccServer : public CInterface, implements IThreadFactory, implements IAb { threadsActive = 0; running = false; - pool.setown(createThreadPool("eclccServerPool", this, NULL, poolSize, INFINITE)); + pool.setown(createThreadPool("eclccServerPool", this, false, nullptr, poolSize, INFINITE)); serverstatus.queryProperties()->setProp("@cluster", getComponentConfigSP()->queryProp("@name")); serverstatus.commitProperties(); reloadConfigHook.installOnce(std::bind(&EclccServer::configUpdate, this), false); diff --git a/ecl/eclcmd/eclcmd_common.cpp b/ecl/eclcmd/eclcmd_common.cpp index fb83b5a1ddd..d44f5c94b3d 100644 --- a/ecl/eclcmd/eclcmd_common.cpp +++ b/ecl/eclcmd/eclcmd_common.cpp @@ -545,7 +545,7 @@ class ConvertEclParameterToArchive } StringBuffer errors; Owned errorReader = new EclCmdErrorReader(pipe, errors); - errorReader->start(); + errorReader->start(false); if (pipe->hasInput()) { diff --git a/ecl/hthor/hthor.cpp b/ecl/hthor/hthor.cpp index 036d6beb263..4ccb5c9860c 100644 --- a/ecl/hthor/hthor.cpp +++ b/ecl/hthor/hthor.cpp @@ -1690,7 +1690,7 @@ class CHThorPipeThroughActivity : public CHThorSimpleActivityBase, implements IP OwnedRoxieString pipeProgram(helper.getPipeProgram()); openPipe(pipeProgram); } - puller.start(); + puller.start(true); } void stop() diff --git a/ecl/hthor/hthorkey.cpp b/ecl/hthor/hthorkey.cpp index da82a9ef7cf..241641bd0f6 100644 --- a/ecl/hthor/hthorkey.cpp +++ b/ecl/hthor/hthorkey.cpp @@ -2205,7 +2205,7 @@ class CHThorThreadedActivityBase : public CHThorActivityBase, implements IThread } } inputThread.setown(new InputHandler(this)); - inputThread->start(); + inputThread->start(true); } protected: @@ -2236,7 +2236,7 @@ class CHThorFetchActivityBase : public CHThorThreadedActivityBase, public IFetch virtual void initializeThreadPool() { - threadPool.setown(createThreadPool("hthor fetch activity thread pool", &threadFactory)); + threadPool.setown(createThreadPool("hthor fetch activity thread pool", &threadFactory, true, nullptr)); } virtual void initParts(IDistributedFile * f) @@ -3099,7 +3099,7 @@ class DistributedKeyLookupHandler : public CInterface, implements IThreadedExcep DistributedKeyLookupHandler(IDistributedFile *f, IJoinProcessor &_owner, IAgentContext &_agent, IContextLogger & _contextLogger) : owner(_owner), file(f), agent(_agent), contextLogger(_contextLogger) { - threadPool.setown(createThreadPool("hthor keyed join lookup thread pool", &threadFactory)); + threadPool.setown(createThreadPool("hthor keyed join lookup thread pool", &threadFactory, true, nullptr)); IDistributedSuperFile *super = f->querySuperFile(); if (super) { @@ -3470,7 +3470,7 @@ class CHThorKeyedJoinActivity : public CHThorThreadedActivityBase, implements I virtual void initializeThreadPool() { - threadPool.setown(createThreadPool("hthor keyed join fetch thread pool", &threadFactory)); + threadPool.setown(createThreadPool("hthor keyed join fetch thread pool", &threadFactory, true, nullptr)); } virtual void initParts(IDistributedFile * f) diff --git a/ecl/schedulectrl/eventqueue.cpp b/ecl/schedulectrl/eventqueue.cpp index 0c079f91adb..539340728aa 100644 --- a/ecl/schedulectrl/eventqueue.cpp +++ b/ecl/schedulectrl/eventqueue.cpp @@ -189,8 +189,8 @@ class CScheduleEventProcessor : public CInterface, implements IScheduleEventProc virtual void start() { - puller->start(); - timer->start(); + puller->start(false); + timer->start(false); } virtual void stop() diff --git a/esp/bindings/http/platform/httpprot.cpp b/esp/bindings/http/platform/httpprot.cpp index e4e6af64ec1..5a821558e65 100644 --- a/esp/bindings/http/platform/httpprot.cpp +++ b/esp/bindings/http/platform/httpprot.cpp @@ -89,7 +89,7 @@ void CHttpProtocol::init(IPropertyTree * cfg, const char * process, const char * if(!http_pool_factory) http_pool_factory = new CHttpThreadPoolFactory(); if(!http_thread_pool) - http_thread_pool = createThreadPool("Http Thread", http_pool_factory, NULL, m_maxConcurrentThreads, INFINITE); + http_thread_pool = createThreadPool("Http Thread", http_pool_factory, false, nullptr, m_maxConcurrentThreads, INFINITE); } } @@ -175,7 +175,8 @@ bool CHttpProtocol::notifySelected(ISocket *sock,unsigned selected, IPersistentH CHttpThread *workthread = new CHttpThread(accepted.getLink(), apport, CEspProtocol::getViewConfig(), false, nullptr, persistentHandler); workthread->setMaxRequestEntityLength(getMaxRequestEntityLength()); workthread->setShouldClose(shouldClose); - workthread->start(); + workthread->start(false); + //MORE: The caller should wait for the thread to finish, otherwise the program can crash on exit workthread->Release(); } } @@ -284,7 +285,7 @@ void CSecureHttpProtocol::init(IPropertyTree * cfg, const char * process, const if(!http_pool_factory) http_pool_factory = new CHttpThreadPoolFactory(); if(!http_thread_pool) - http_thread_pool = createThreadPool("Http Thread", http_pool_factory, NULL, m_maxConcurrentThreads, INFINITE); + http_thread_pool = createThreadPool("Http Thread", http_pool_factory, false, nullptr, m_maxConcurrentThreads, INFINITE); } } @@ -348,8 +349,9 @@ bool CSecureHttpProtocol::notifySelected(ISocket *sock,unsigned selected, IPersi CHttpThread *workthread = new CHttpThread(accepted.getLink(), apport, CEspProtocol::getViewConfig(), true, m_ssctx.get(), persistentHandler); workthread->setMaxRequestEntityLength(getMaxRequestEntityLength()); workthread->setShouldClose(shouldClose); - workthread->start(); + workthread->start(false); ESPLOG(LogMax, "Request processing thread started."); + //MORE: The caller should wait for the thread to finish, otherwise the program can crash on exit workthread->Release(); } } diff --git a/esp/clients/ws_dfsclient/ws_dfsclient.cpp b/esp/clients/ws_dfsclient/ws_dfsclient.cpp index 3aa39d35bbd..f83b4eff2bc 100644 --- a/esp/clients/ws_dfsclient/ws_dfsclient.cpp +++ b/esp/clients/ws_dfsclient/ws_dfsclient.cpp @@ -49,7 +49,7 @@ class CKeepAliveThread : public CSimpleInterface, implements IThreaded public: CKeepAliveThread(unsigned _periodSecs) : threaded("CKeepAliveThread", this), periodMs(_periodSecs * 1000) { - threaded.start(); + threaded.start(false); } void stop() { diff --git a/esp/clients/wsdfuaccess/wsdfuaccess.cpp b/esp/clients/wsdfuaccess/wsdfuaccess.cpp index b8d59e7189e..69299b57cec 100644 --- a/esp/clients/wsdfuaccess/wsdfuaccess.cpp +++ b/esp/clients/wsdfuaccess/wsdfuaccess.cpp @@ -655,7 +655,7 @@ class DFUAccessTests : public CppUnit::TestFixture public: CServerThread(IRemoteFileServer *_server, ISocket *_socket) : server(_server), socket(_socket), threaded("CServerThread") { - threaded.init(this); + threaded.init(this, false); } ~CServerThread() { diff --git a/esp/logging/logginglib/logthread.cpp b/esp/logging/logginglib/logthread.cpp index 76a30bf7f62..3f51d0db707 100644 --- a/esp/logging/logginglib/logthread.cpp +++ b/esp/logging/logginglib/logthread.cpp @@ -128,7 +128,7 @@ CLogThread::~CLogThread() void CLogThread::start() { - Thread::start(); + Thread::start(false); } int CLogThread::run() diff --git a/esp/logging/logginglib/logthread.hpp b/esp/logging/logginglib/logthread.hpp index 8bdc8dbed3c..5d2345e95d6 100644 --- a/esp/logging/logginglib/logthread.hpp +++ b/esp/logging/logginglib/logthread.hpp @@ -87,7 +87,7 @@ class CLogRequestReader : public CInterface, implements ILogRequestReader CLogRequestReader(CLogRequestReaderSettings* _settings, CLogThread* _logThread) : settings(_settings), logThread(_logThread), threaded("LogRequestReader") { - threaded.init(this); + threaded.init(this, false); }; ~CLogRequestReader(); diff --git a/esp/platform/espcfg.cpp b/esp/platform/espcfg.cpp index 21a0fb87422..60c495aa97f 100644 --- a/esp/platform/espcfg.cpp +++ b/esp/platform/espcfg.cpp @@ -266,7 +266,7 @@ void CEspConfig::initSDSSessionCleaner(bool detachedFromDali) Owned proc_cfg = getProcessConfig(m_envpt, m_process); m_sessionCleaner.setown(new CSessionCleaner(espSessionSDSPath.str(), detachedFromDali, proc_cfg->getPropInt("@checkSessionTimeoutSeconds", ESP_CHECK_SESSION_TIMEOUT))); - m_sessionCleaner->start(); + m_sessionCleaner->start(false); } } diff --git a/esp/platform/espp.hpp b/esp/platform/espp.hpp index ffc3682b455..22fc9b742ef 100644 --- a/esp/platform/espp.hpp +++ b/esp/platform/espp.hpp @@ -146,7 +146,7 @@ class CEspServer : public CInterface, // YMA: there'll be a leak here, but it's ok. CEspTerminator* terminator = new CEspTerminator; - terminator->start(); + terminator->start(false); m_exiting=true; if(!m_useDali) diff --git a/esp/platform/espthread.cpp b/esp/platform/espthread.cpp index 1c07578c329..2daf2a3bf26 100644 --- a/esp/platform/espthread.cpp +++ b/esp/platform/espthread.cpp @@ -67,12 +67,6 @@ CEspProtocolThread::~CEspProtocolThread() { } -void CEspProtocolThread::start() -{ - Thread::start(); -} - - const char *CEspProtocolThread::getServiceName() { return m_name.get(); diff --git a/esp/platform/espthread.hpp b/esp/platform/espthread.hpp index eee5f3db913..42ce2ff1643 100644 --- a/esp/platform/espthread.hpp +++ b/esp/platform/espthread.hpp @@ -48,7 +48,6 @@ class CEspProtocolThread: public Thread, implements ISocketReturner CEspProtocolThread(ISocket *sock, const char *name = "Unknown service type"); virtual ~CEspProtocolThread(); - virtual void start(); void setSocket(ISocket *sock); void stop(bool wait); diff --git a/esp/services/WsDeploy/WsDeployService.hpp b/esp/services/WsDeploy/WsDeployService.hpp index b4f480f17f5..47a858b3617 100644 --- a/esp/services/WsDeploy/WsDeployService.hpp +++ b/esp/services/WsDeploy/WsDeployService.hpp @@ -127,7 +127,7 @@ class CWsDeployFileInfo : public CInterface, implements IConfigFileObserver { m_pWorkerThread = new CThreaded("CGenerateJSFactoryThread"); IThreaded* pIThreaded = this; - m_pWorkerThread->init(pIThreaded); + m_pWorkerThread->init(pIThreaded, false); } void refresh(IConstEnvironment* pConstEnv) @@ -258,7 +258,7 @@ class CWsDeployFileInfo : public CInterface, implements IConfigFileObserver { m_pWorkerThread = new CThreaded("CConfigFileMonitorThread"); IThreaded* pIThreaded = this; - m_pWorkerThread->init(pIThreaded); + m_pWorkerThread->init(pIThreaded, false); } }; @@ -324,7 +324,7 @@ class CWsDeployFileInfo : public CInterface, implements IConfigFileObserver m_quitThread = false; m_pWorkerThread = new CThreaded("CClientAliveThread"); IThreaded* pIThreaded = this; - m_pWorkerThread->init(pIThreaded); + m_pWorkerThread->init(pIThreaded, false); } void signal() @@ -376,7 +376,7 @@ class CWsDeployFileInfo : public CInterface, implements IConfigFileObserver m_quitThread = false; m_pWorkerThread = new CThreaded("CLockerAliveThread"); IThreaded* pIThreaded = this; - m_pWorkerThread->init(pIThreaded); + m_pWorkerThread->init(pIThreaded, false); } void signal() @@ -650,7 +650,7 @@ class CCloudActionHandler : public CInterface, implements IInterface if (m_threadPool == NULL) { IThreadFactory* pThreadFactory = new CCloudTaskThreadFactory(); - m_threadPool.setown(createThreadPool("WsDeploy Cloud Task Thread Pool", pThreadFactory, NULL, pComputers->numChildren())); + m_threadPool.setown(createThreadPool("WsDeploy Cloud Task Thread Pool", pThreadFactory, false, nullptr, pComputers->numChildren())); pThreadFactory->Release(); } else diff --git a/esp/services/esdl_svc_engine/esdl_monitor.cpp b/esp/services/esdl_svc_engine/esdl_monitor.cpp index 42cf2aef585..14210595c72 100644 --- a/esp/services/esdl_svc_engine/esdl_monitor.cpp +++ b/esp/services/esdl_svc_engine/esdl_monitor.cpp @@ -160,7 +160,7 @@ class CEsdlMonitor : implements IEsdlMonitor, public CInterface, implements IEsd constructEnvptTemplate(); m_pCentralStore.setown(getEsdlCentralStore(true)); m_esdlShare.setown(new CEsdlShare()); - m_esdlShare->start(); + m_esdlShare->start(false); DBGLOG("EsdlMonitor started."); } diff --git a/esp/services/ws_dfu/ws_dfuXRefService.cpp b/esp/services/ws_dfu/ws_dfuXRefService.cpp index c2dde784c38..0bd88094f0f 100644 --- a/esp/services/ws_dfu/ws_dfuXRefService.cpp +++ b/esp/services/ws_dfu/ws_dfuXRefService.cpp @@ -121,7 +121,7 @@ void CWsDfuXRefEx::init(IPropertyTree *cfg, const char *process, const char *ser //Start out builder thread...... m_XRefbuilder.setown(new CXRefExBuilderThread()); - m_XRefbuilder->start(); + m_XRefbuilder->start(false); } bool CWsDfuXRefEx::onDFUXRefArrayAction(IEspContext &context, IEspDFUXRefArrayActionRequest &req, IEspDFUXRefArrayActionResponse &resp) diff --git a/esp/services/ws_fs/ws_fsService.cpp b/esp/services/ws_fs/ws_fsService.cpp index 2f8fb5a27c0..9ac375c37ff 100644 --- a/esp/services/ws_fs/ws_fsService.cpp +++ b/esp/services/ws_fs/ws_fsService.cpp @@ -185,7 +185,7 @@ void CFileSprayEx::init(IPropertyTree *cfg, const char *process, const char *ser throw MakeStringException(-1, "No Dali Connection Active. Please Specify a Dali to connect to in your configuration file"); } - m_sched.start(); + m_sched.start(false); } diff --git a/esp/services/ws_machine/ws_machineService.cpp b/esp/services/ws_machine/ws_machineService.cpp index 0cfe8ef991c..26f833ce5cc 100644 --- a/esp/services/ws_machine/ws_machineService.cpp +++ b/esp/services/ws_machine/ws_machineService.cpp @@ -198,7 +198,7 @@ void Cws_machineEx::init(IPropertyTree *cfg, const char *process, const char *se //Start thread pool Owned pThreadFactory = new CWsMachineThreadFactory(); m_threadPool.setown(createThreadPool("WsMachine Thread Pool", pThreadFactory, - NULL, m_threadPoolSize, 10000, m_threadPoolStackSize)); //10 sec timeout for available thread; use stack size of 2MB + false, nullptr, m_threadPoolSize, 10000, m_threadPoolStackSize)); //10 sec timeout for available thread; use stack size of 2MB setupLegacyFilters(); diff --git a/esp/services/ws_workunits/ws_workunitsHelpers.cpp b/esp/services/ws_workunits/ws_workunitsHelpers.cpp index 8598752f888..309183dfd82 100644 --- a/esp/services/ws_workunits/ws_workunitsHelpers.cpp +++ b/esp/services/ws_workunits/ws_workunitsHelpers.cpp @@ -4033,7 +4033,7 @@ void CWsWuFileHelper::createThorSlaveLogfile(IConstWorkUnit* cwu, WsWuInfo& winf Owned threadFactory = new CGetThorSlaveLogToFileThreadFactory(); Owned threadPool = createThreadPool("WsWuFileHelper GetThorSlaveLogToFile Thread Pool", - threadFactory, NULL, thorSlaveLogThreadPoolSize, INFINITE); + threadFactory, false, nullptr, thorSlaveLogThreadPoolSize, INFINITE); unsigned numberOfSlaveLogs = clusterInfo->getNumberOfSlaveLogs(); BoolHash uniqueProcesses; diff --git a/esp/services/ws_workunits/ws_workunitsService.cpp b/esp/services/ws_workunits/ws_workunitsService.cpp index 4d77967a471..8a93cb5fa29 100644 --- a/esp/services/ws_workunits/ws_workunitsService.cpp +++ b/esp/services/ws_workunits/ws_workunitsService.cpp @@ -481,12 +481,12 @@ void CWsWorkunitsEx::init(IPropertyTree *cfg, const char *process, const char *s #ifdef _CONTAINERIZED initContainerRoxieTargets(roxieConnMap); #endif - m_sched.start(); + m_sched.start(false); //Start thread pool xpath.setf("Software/EspProcess[@name=\"%s\"]/EspService[@name=\"%s\"]/ClusterQueryStateThreadPoolSize", process, service); Owned threadFactory = new CClusterQueryStateThreadFactory(); - clusterQueryStatePool.setown(createThreadPool("CheckAndSetClusterQueryState Thread Pool", threadFactory, NULL, + clusterQueryStatePool.setown(createThreadPool("CheckAndSetClusterQueryState Thread Pool", threadFactory, false, nullptr, cfg->getPropInt(xpath.str(), CHECK_QUERY_STATUS_THREAD_POOL_SIZE))); } diff --git a/esp/services/ws_workunits/ws_workunitsService.hpp b/esp/services/ws_workunits/ws_workunitsService.hpp index 962456f315b..c0748b42e6a 100644 --- a/esp/services/ws_workunits/ws_workunitsService.hpp +++ b/esp/services/ws_workunits/ws_workunitsService.hpp @@ -71,7 +71,7 @@ class QueryFilesInUse : public CInterface, implements ISDSSubscription, implemen { tree.setown(createPTree("QueryFilesInUse")); updateUsers(); - threaded.init(this); + threaded.init(this, false); } ~QueryFilesInUse() diff --git a/esp/smc/SMCLib/InfoCacheReader.hpp b/esp/smc/SMCLib/InfoCacheReader.hpp index 9ea2b3d707a..614175717dd 100644 --- a/esp/smc/SMCLib/InfoCacheReader.hpp +++ b/esp/smc/SMCLib/InfoCacheReader.hpp @@ -79,7 +79,7 @@ class TPWRAPPER_API CInfoCacheReaderThread : public CSimpleInterfaceOfstart(); + thrdlist[i]->start(false); for(i = 0; i < thrds; i++) thrdlist[i]->join(); @@ -946,7 +946,7 @@ CHttpProxyThread::CHttpProxyThread(ISocket* client, FILE* ofile) void CHttpProxyThread::start() { - Thread::start(); + Thread::start(false); } int CHttpProxyThread::run() @@ -1008,8 +1008,8 @@ int CHttpProxyThread::run() m_remotesocket->set_nonblock(false); CReadWriteThread t1(m_client.get(), m_remotesocket.get()); CReadWriteThread t2(m_remotesocket.get(), m_client.get()); - t1.start(); - t2.start(); + t1.start(false); + t2.start(false); t1.join(); t2.join(); //printf("read/write threads returned\n"); diff --git a/esp/tools/soapplus/http.cpp b/esp/tools/soapplus/http.cpp index ceb70c6fba8..69344b14d08 100644 --- a/esp/tools/soapplus/http.cpp +++ b/esp/tools/soapplus/http.cpp @@ -1012,7 +1012,7 @@ int HttpClient::sendStressRequests(HttpStat* overall_stat) //__int64 start = msTick(); for(i = 0; i < m_stressthreads; i++) - thrdlist[i]->start(); + thrdlist[i]->start(false); if(http_tracelevel > 0) fprintf(m_logfile, "Started %d stress test threads.\n", m_stressthreads); diff --git a/esp/tools/soapplus/httpproxy.cpp b/esp/tools/soapplus/httpproxy.cpp index 4d9a8d303b2..59e0fd0fcdd 100644 --- a/esp/tools/soapplus/httpproxy.cpp +++ b/esp/tools/soapplus/httpproxy.cpp @@ -315,7 +315,7 @@ CHttpProxyThread::CHttpProxyThread(ISocket* client, FILE* ofile) void CHttpProxyThread::start() { - Thread::start(); + Thread::start(false); } int CHttpProxyThread::run() @@ -377,8 +377,8 @@ int CHttpProxyThread::run() m_remotesocket->set_nonblock(false); CReadWriteThread t1(m_client.get(), m_remotesocket.get(), m_ofile); CReadWriteThread t2(m_remotesocket.get(), m_client.get(), m_ofile); - t1.start(); - t2.start(); + t1.start(true); + t2.start(true); t1.join(); t2.join(); //printf("read/write threads returned\n"); @@ -643,7 +643,7 @@ class CSocksProxyThread : public Thread virtual void start() { - Thread::start(); + Thread::start(false); } virtual int run() @@ -716,8 +716,8 @@ class CSocksProxyThread : public Thread m_remotesocket->set_nonblock(false); CReadWriteThread t1(m_client.get(), m_remotesocket.get(), m_ofile); CReadWriteThread t2(m_remotesocket.get(), m_client.get(), m_ofile); - t1.start(); - t2.start(); + t1.start(true); + t2.start(true); t1.join(); t2.join(); m_remotesocket->close(); diff --git a/esp/tools/soapplus/main.cpp b/esp/tools/soapplus/main.cpp index a7344bfd888..5ac48f6f7e7 100644 --- a/esp/tools/soapplus/main.cpp +++ b/esp/tools/soapplus/main.cpp @@ -339,7 +339,7 @@ int processRequest(IProperties* globals, SoapPlusAction action, const char* url, } ThreadedSimpleServer server(globals, port, sfname.str(), writeToFiles?outpath.str():NULL, writeToFiles, 1); - server.start(); + server.start(false); HttpClient client(globals, url, file.queryFilename(), writeToFiles?outpath.str():NULL, outfilename, writeToFiles); client.start(); if(http_tracelevel >= 5) @@ -360,7 +360,7 @@ int processRequest(IProperties* globals, SoapPlusAction action, const char* url, return -1; } ThreadedSimpleServer server(globals, port, server_in_fname, writeToFiles?outpath.str():NULL, writeToFiles, 1); - server.start(); + server.start(false); HttpClient client(globals, url, in_fname, writeToFiles?outpath.str():NULL, outfilename, writeToFiles); client.start(); if(http_tracelevel >= 5) diff --git a/fs/dafilesrv/dafilesrv.cpp b/fs/dafilesrv/dafilesrv.cpp index 4d35c88c43e..0b991377e75 100644 --- a/fs/dafilesrv/dafilesrv.cpp +++ b/fs/dafilesrv/dafilesrv.cpp @@ -761,7 +761,7 @@ int main(int argc, const char* argv[]) { PROGLOG(DAFS_SERVICE_DISPLAY_NAME " Initialized"); started = true; - pollthread.start(); + pollthread.start(false); return true; } diff --git a/fs/dafsserver/dafsserver.cpp b/fs/dafsserver/dafsserver.cpp index 5cb9a26b118..02cef6b2998 100644 --- a/fs/dafsserver/dafsserver.cpp +++ b/fs/dafsserver/dafsserver.cpp @@ -467,7 +467,8 @@ class CAsyncCommandManager void start() { parent.wait(); - thread->start(); + //These are async jobs - so do not preserve the active thread context because it will go out of scope + thread->start(false); } void join() { @@ -3710,7 +3711,7 @@ class CRemoteFileServer : implements IRemoteFileServer, public CInterface } }; Owned factory = new CCommandFactory(*this); // NB: pool links factory, so takes ownership - threads.setown(createThreadPool("CRemoteFileServerPool", factory, NULL, maxThreads, maxThreadsDelayMs, + threads.setown(createThreadPool("CRemoteFileServerPool", factory, false, nullptr, maxThreads, maxThreadsDelayMs, #ifdef __64BIT__ 0, // Unlimited stack size #else @@ -6002,7 +6003,7 @@ class RemoteFileSlowTest : public CppUnit::TestFixture public: CServerThread(CRemoteFileServer *_server, ISocket *_socket) : server(_server), socket(_socket), threaded("CServerThread") { - threaded.init(this); + threaded.init(this, false); } ~CServerThread() { @@ -6199,7 +6200,7 @@ class RemoteFileSlowTest : public CppUnit::TestFixture public: CDelayedFileCreate(const char *_filePath) : filePath(_filePath), threaded("CDelayedFileCreate") { - threaded.init(this); + threaded.init(this, false); } ~CDelayedFileCreate() { diff --git a/plugins/cassandra/cassandrawu.cpp b/plugins/cassandra/cassandrawu.cpp index 6315ad7800b..293b0c21ea1 100644 --- a/plugins/cassandra/cassandrawu.cpp +++ b/plugins/cassandra/cassandrawu.cpp @@ -3316,7 +3316,7 @@ class CCasssandraWorkUnitFactory : public CWorkUnitFactory, implements ICassandr E->Release(); DBGLOG("WARNING: repository version could not be retrieved (repository not yet created?)"); } - cacheRetirer.start(); + cacheRetirer.start(false); LINK(_dll); // Yes, this leaks. Not really sure how to avoid that. } diff --git a/plugins/couchbase/couchbaseembed.cpp b/plugins/couchbase/couchbaseembed.cpp index 613571a8a99..d55e69507ef 100644 --- a/plugins/couchbase/couchbaseembed.cpp +++ b/plugins/couchbase/couchbaseembed.cpp @@ -636,7 +636,7 @@ namespace couchbaseembed if (!isAlive()) { shouldRun = true; - Thread::start(); + Thread::start(false); } } diff --git a/plugins/kafka/kafka.cpp b/plugins/kafka/kafka.cpp index 1da664ebc10..5ae3e9e24b5 100644 --- a/plugins/kafka/kafka.cpp +++ b/plugins/kafka/kafka.cpp @@ -261,7 +261,7 @@ namespace KafkaPlugin if (!isAlive() && parentPtr) { shouldRun = true; - Thread::start(); + Thread::start(false); } } @@ -860,7 +860,7 @@ namespace KafkaPlugin if (!isAlive()) { shouldRun = true; - Thread::start(); + Thread::start(false); } } diff --git a/plugins/mysql/mysqlembed.cpp b/plugins/mysql/mysqlembed.cpp index e80e987bd59..af829a92ad3 100644 --- a/plugins/mysql/mysqlembed.cpp +++ b/plugins/mysql/mysqlembed.cpp @@ -245,7 +245,7 @@ class MySQLConnection : public CInterface if (!connectionCloserThread) { connectionCloserThread = new MySQLConnectionCloserThread; - connectionCloserThread->start(); + connectionCloserThread->start(false); } } } diff --git a/roxie/ccd/ccddali.cpp b/roxie/ccd/ccddali.cpp index 5725c987339..bdab2a2c650 100644 --- a/roxie/ccd/ccddali.cpp +++ b/roxie/ccd/ccddali.cpp @@ -216,10 +216,6 @@ class CRoxieDaliHelper : implements IRoxieDaliHelper, public CInterface aborted = true; abortSem.signal(); } - virtual void start() - { - Thread::start(); - } virtual void join() { if (isAlive()) @@ -497,7 +493,7 @@ class CRoxieDaliHelper : implements IRoxieDaliHelper, public CInterface #endif } if (fileNameServiceDali.length()) - connectWatcher.start(); + connectWatcher.start(false); else initMyNode(1); // Hack } diff --git a/roxie/ccd/ccdfile.cpp b/roxie/ccd/ccdfile.cpp index b0733aa6aa8..e1e7cb3f1c5 100644 --- a/roxie/ccd/ccdfile.cpp +++ b/roxie/ccd/ccdfile.cpp @@ -1420,12 +1420,12 @@ class CRoxieFileCache : implements IRoxieFileCache, implements ICopyFileProgress delete activeCacheReportingBuffer; } - virtual void start() + virtual void start() override { if (!started) { - bct.start(); - hct.start(); + bct.start(false); + hct.start(false); bctStarted.wait(); hctStarted.wait(); } @@ -1440,7 +1440,7 @@ class CRoxieFileCache : implements IRoxieFileCache, implements ICopyFileProgress #endif if (activeCacheReportingBuffer && cacheReportPeriodSeconds) { - cidt.start(); + cidt.start(false); cidtStarted.wait(); cidtActive = true; } diff --git a/roxie/ccd/ccdlistener.cpp b/roxie/ccd/ccdlistener.cpp index 6427dfc6991..1549af1f34c 100644 --- a/roxie/ccd/ccdlistener.cpp +++ b/roxie/ccd/ccdlistener.cpp @@ -757,12 +757,12 @@ class RoxieListener : public Thread, implements IHpccProtocolListener, implement #endif } - virtual void start() + virtual void start() override { // Note we allow a few additional threads than requested - these are the threads that return "Too many active queries" responses - pool.setown(createThreadPool("RoxieSocketWorkerPool", this, NULL, poolSize+5, INFINITE)); + pool.setown(createThreadPool("RoxieSocketWorkerPool", this, false, nullptr, poolSize+5, INFINITE)); assertex(!running); - Thread::start(); + Thread::start(false); started.wait(); } diff --git a/roxie/ccd/ccdmain.cpp b/roxie/ccd/ccdmain.cpp index 73ffb10cc15..5057891032b 100644 --- a/roxie/ccd/ccdmain.cpp +++ b/roxie/ccd/ccdmain.cpp @@ -778,7 +778,7 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml) Owned standAloneDll; if (wuid) - setDefaultJobId(wuid); + setDefaultJobName(wuid); if (topology->hasProp("@loadWorkunit")) { StringBuffer workunitName; diff --git a/roxie/ccd/ccdprotocol.cpp b/roxie/ccd/ccdprotocol.cpp index d5a4fa01e84..c294c90b83d 100644 --- a/roxie/ccd/ccdprotocol.cpp +++ b/roxie/ccd/ccdprotocol.cpp @@ -120,12 +120,12 @@ class ProtocolListener : public Thread, implements IHpccProtocolListener, implem #endif } - virtual void start() + virtual void start() override { // Note we allow a few additional threads than requested - these are the threads that return "Too many active queries" responses - pool.setown(createThreadPool("RoxieSocketWorkerPool", this, NULL, sink->getPoolSize()+5, INFINITE)); + pool.setown(createThreadPool("RoxieSocketWorkerPool", this, false, nullptr, sink->getPoolSize()+5, INFINITE)); assertex(!running); - Thread::start(); + Thread::start(false); started.wait(); } diff --git a/roxie/ccd/ccdqueue.cpp b/roxie/ccd/ccdqueue.cpp index c9c3593b5d3..8219a9bbfb4 100644 --- a/roxie/ccd/ccdqueue.cpp +++ b/roxie/ccd/ccdqueue.cpp @@ -1158,7 +1158,7 @@ class RoxieQueue : public CInterface, implements IThreadFactory { headRegionSize = _headRegionSize; numWorkers = _numWorkers; - workers.setown(createThreadPool("RoxieWorkers", this, NULL, numWorkers)); + workers.setown(createThreadPool("RoxieWorkers", this, false, nullptr, numWorkers)); started = 0; idle = 0; if (IBYTIbufferSize) @@ -2046,7 +2046,7 @@ class RoxieThrottledPacketSender : public Thread RoxieThrottledPacketSender(TokenBucket &_bucket, unsigned _maxPacketSize) : Thread("RoxieThrottledPacketSender"), bucket(_bucket), maxPacketSize(_maxPacketSize) { - start(); + start(false); started.wait(); } @@ -2891,7 +2891,7 @@ class RoxieSocketQueueManager : public RoxieReceiverBase { RoxieReceiverBase::start(); running = true; - readThread.start(); + readThread.start(false); } void stop() @@ -3830,9 +3830,9 @@ class PacketDiscarder : public Thread, implements IPacketDiscarder return 0; } - virtual void start() + virtual void start() override { - Thread::start(); + Thread::start(false); } virtual void stop() @@ -3983,7 +3983,7 @@ extern void startPingTimer() if (!pingTimer) { pingTimer = new PingTimer(); - pingTimer->start(); + pingTimer->start(false); } } diff --git a/roxie/ccd/ccdserver.cpp b/roxie/ccd/ccdserver.cpp index 67e88252f3e..555d8ac29ae 100644 --- a/roxie/ccd/ccdserver.cpp +++ b/roxie/ccd/ccdserver.cpp @@ -135,7 +135,7 @@ class RestartableThread : public CInterface RestartableThread(const char *_name) : name(_name) { } - virtual void start(const char *namePrefix) + virtual void start(const char *namePrefix, bool inheritThreadContext) { StringBuffer s(namePrefix); s.append(name); @@ -143,7 +143,7 @@ class RestartableThread : public CInterface CriticalBlock b(crit); assertex(!thread); thread.setown(new MyThread(this, s)); - thread->start(); + thread->start(inheritThreadContext); } } @@ -2617,7 +2617,7 @@ class RecordPullerThread : public RestartableThread StringBuffer logPrefix("["); if (ctx) ctx->getLogPrefix(logPrefix); logPrefix.append("] "); - RestartableThread::start(logPrefix); + RestartableThread::start(logPrefix, true); } } } @@ -16044,7 +16044,7 @@ void LoopExecutorThread::start(unsigned parentExtractSize, const byte *parentExt eof = false; StringBuffer logPrefix("["); ctx->getLogPrefix(logPrefix).append("] "); - RestartableThread::start(logPrefix); + RestartableThread::start(logPrefix, true); } int LoopExecutorThread::run() @@ -28049,7 +28049,7 @@ class CActivityGraph : implements IActivityGraph, implements IThorChildGraph, im { parentExtract = _parentExtract; parentExtractSize = _parentExtractSize; - thread.start(); + thread.start(true); } inline void join() { diff --git a/roxie/ccd/ccdsnmp.cpp b/roxie/ccd/ccdsnmp.cpp index ba6fc6c9dd3..a4b913c994e 100644 --- a/roxie/ccd/ccdsnmp.cpp +++ b/roxie/ccd/ccdsnmp.cpp @@ -321,7 +321,7 @@ CRoxieMetricsManager::CRoxieMetricsManager() addMetric(flowRequestsSent, 1000); addMetric(flowPermitsReceived, 1000); addMetric(dataPacketsSent, 1000); - ticker.start(); + ticker.start(false); } void CRoxieMetricsManager::doAddMetric(RelaxedAtomic &counter, const char *name, unsigned interval, bool isMinVal) diff --git a/roxie/ccd/ccdstate.cpp b/roxie/ccd/ccdstate.cpp index 0387f243b83..6082592361b 100644 --- a/roxie/ccd/ccdstate.cpp +++ b/roxie/ccd/ccdstate.cpp @@ -217,7 +217,7 @@ class DelayedReleaserThread : public Thread CriticalBlock b(lock); if (!started) { - start(); + start(false); started = true; } queue.append(*new DelayedReleaseQueueItem(goer, delaySeconds)); @@ -2002,7 +2002,7 @@ class CRoxiePackageSetManager : implements IRoxieQueryPackageManagerSet, impleme reload(false); daliHelper->commitCache(); controlSem.signal(); - autoReloadThread.start(); // Don't want to overlap auto-reloads with the initial load + autoReloadThread.start(false); // Don't want to overlap auto-reloads with the initial load } catch(IException *E) { diff --git a/roxie/roxiemem/roxiemem.cpp b/roxie/roxiemem/roxiemem.cpp index 7643b56a99c..c572752779b 100644 --- a/roxie/roxiemem/roxiemem.cpp +++ b/roxie/roxiemem/roxiemem.cpp @@ -4336,7 +4336,7 @@ class BufferedRowCallbackManager if (!backgroundReleaseBuffersThread) { backgroundReleaseBuffersThread.setown(new BackgroundReleaseBufferThread(this)); - backgroundReleaseBuffersThread->start(); + backgroundReleaseBuffersThread->start(false); } } @@ -4360,7 +4360,7 @@ class BufferedRowCallbackManager if (!releaseBuffersThread) { releaseBuffersThread.setown(new ReleaseBufferThread(*this)); - releaseBuffersThread->start(); + releaseBuffersThread->start(false); } } else @@ -7481,7 +7481,7 @@ class RoxieMemTests : public CppUnit::TestFixture for (unsigned i1 = 0; i1 < numThreads; i1++) threads[i1] = new BitmapAllocatorThread(sem, size, numThreads); for (unsigned i2 = 0; i2 < numThreads; i2++) - threads[i2]->start(); + threads[i2]->start(false); unsigned startTime = msTick(); sem.signal(numThreads); @@ -7931,7 +7931,7 @@ class RoxieMemTests : public CppUnit::TestFixture { for (unsigned i2 = 0; i2 < numCasThreads; i2++) { - threads[i2]->start(); + threads[i2]->start(false); } bool aborted = false; @@ -8888,8 +8888,8 @@ class RoxieMemTests : public CppUnit::TestFixture callback.rows = (const void * *)rowManager->allocate(1, 0); Owned releaser = new ReleaseThread(callback, rowManager); Owned resizer = new ResizeThread(callback, rowManager); - releaser->start(); - resizer->start(); + releaser->start(false); + resizer->start(false); resizer->join(); releaser->join(); CPPUNIT_ASSERT(!callback.failed); diff --git a/roxie/roxiepipe/roxiepipe.cpp b/roxie/roxiepipe/roxiepipe.cpp index 1349d9710c1..4a9e700b6a0 100644 --- a/roxie/roxiepipe/roxiepipe.cpp +++ b/roxie/roxiepipe/roxiepipe.cpp @@ -694,7 +694,7 @@ int main(int argc, char *argv[]) { rt[i] = new RoxieThread(query.str(), resultName.str()); PROGLOG("Starting thread %d", i); - rt[i]->start(); + rt[i]->start(true); } unsigned totalRead = 0; diff --git a/roxie/udplib/udpsha.cpp b/roxie/udplib/udpsha.cpp index a2ac2986ed8..36fe5b4aa80 100644 --- a/roxie/udplib/udpsha.cpp +++ b/roxie/udplib/udpsha.cpp @@ -815,10 +815,10 @@ class DelayedSocketWriter : public Thread } return 0; } - virtual void start() + virtual void start(bool inheritThreadContext) override { running = true; - Thread::start(); + Thread::start(inheritThreadContext); } void stop() { @@ -835,7 +835,7 @@ CSimulatedQueueWriteSocket::CSimulatedQueueWriteSocket(const SocketEndpoint &ep) { CriticalBlock b(allWriteSocketsCrit); if (!allWriteSockets.length()) - delayedWriter.start(); + delayedWriter.start(false); allWriteSockets.append(*this); } } diff --git a/roxie/udplib/udptrr.cpp b/roxie/udplib/udptrr.cpp index 8dec8182b4f..9a31fc0d287 100644 --- a/roxie/udplib/udptrr.cpp +++ b/roxie/udplib/udptrr.cpp @@ -980,7 +980,7 @@ class CReceiveManager : implements IReceiveManager, public CInterface } } - virtual void start() + virtual void start(bool inheritThreadContext) override { running = true; if (check_max_socket_read_buffer(udpFlowSocketsSize) < 0) @@ -997,7 +997,7 @@ class CReceiveManager : implements IReceiveManager, public CInterface flow_socket->set_receive_buffer_size(udpFlowSocketsSize); size32_t actualSize = flow_socket->get_receive_buffer_size(); DBGLOG("UdpReceiver: receive_receive_flow created port=%d sockbuffsize=%d actual %d", flow_port, udpFlowSocketsSize, actualSize); - Thread::start(); + Thread::start(inheritThreadContext); } void doFlowRequest(const UdpRequestToSendMsg &msg) @@ -1297,10 +1297,10 @@ class CReceiveManager : implements IReceiveManager, public CInterface running = false; } - virtual void start() + virtual void start(bool inheritThreadContext) override { running = true; - Thread::start(); + Thread::start(inheritThreadContext); started.wait(); } @@ -1509,9 +1509,9 @@ class CReceiveManager : implements IReceiveManager, public CInterface receive_flow = new receive_receive_flow(*this, server_flow_port, maxSlotsPerClient); running = true; - collatorThread.start(); - data->start(); - receive_flow->start(); + collatorThread.start(false); + data->start(false); + receive_flow->start(false); MilliSleep(15); } diff --git a/roxie/udplib/udptrs.cpp b/roxie/udplib/udptrs.cpp index c873d0ea2ce..c223094c064 100644 --- a/roxie/udplib/udptrs.cpp +++ b/roxie/udplib/udptrs.cpp @@ -833,10 +833,10 @@ class CSendManager : implements ISendManager, public CInterface } } - virtual void start() + virtual void start(bool inheritThreadContext) override { running = true; - Thread::start(); + Thread::start(inheritThreadContext); started.wait(); } @@ -933,7 +933,7 @@ class CSendManager : implements ISendManager, public CInterface send_resend_flow(CSendManager &_parent) : StartedThread("UdpLib::send_resend_flow"), parent(_parent) { - start(); + start(false); } ~send_resend_flow() @@ -970,7 +970,7 @@ class CSendManager : implements ISendManager, public CInterface flow_socket->set_receive_buffer_size(udpFlowSocketsSize); size32_t actualSize = flow_socket->get_receive_buffer_size(); DBGLOG("UdpSender[%s]: rcv_flow_socket created port=%d sockbuffsize=%d actualsize=%d", parent.myId, receive_port, udpFlowSocketsSize, actualSize); - start(); + start(false); } ~send_receive_flow() @@ -1057,7 +1057,7 @@ class CSendManager : implements ISendManager, public CInterface { if (check_max_socket_write_buffer(udpLocalWriteSocketSize) < 0) throw MakeStringException(ROXIE_UDP_ERROR, "System Socket max write buffer is less than %i", udpLocalWriteSocketSize); - start(); + start(false); } ~send_data() diff --git a/roxie/udplib/uttest.cpp b/roxie/udplib/uttest.cpp index 87a4d07a9b4..de3f2472c0a 100644 --- a/roxie/udplib/uttest.cpp +++ b/roxie/udplib/uttest.cpp @@ -158,7 +158,7 @@ class Receiver : public Thread virtual void start() { - Thread::start(); + Thread::start(false); started.wait(); } @@ -395,7 +395,7 @@ void rawSendTest() { DBGLOG("Starting sender %d on port %d", senders+1, startPort); SendAsFastAsPossible *newSender = new SendAsFastAsPossible(startPort++, rawBufferSize); - newSender->start(); + newSender->start(false); Sleep(10000); } } @@ -592,7 +592,7 @@ void sortSimulator() for (unsigned i = 0; i < numSortSlaves; i++) { slaves[i].init(&master, i); - slaves[i].start(); + slaves[i].start(false); } for (unsigned j = 0; j < numSortSlaves; j++) { diff --git a/rtl/eclrtl/eclrtl.cpp b/rtl/eclrtl/eclrtl.cpp index 980ebf2198e..b5d73720409 100644 --- a/rtl/eclrtl/eclrtl.cpp +++ b/rtl/eclrtl/eclrtl.cpp @@ -6581,9 +6581,9 @@ class EclRtlTests : public CppUnit::TestFixture RegexTestThread t1; RegexTestThread t2; RegexTestThread t3; - t1.start(); - t2.start(); - t3.start(); + t1.start(false); + t2.start(false); + t3.start(false); t1.join(); t2.join(); t3.join(); diff --git a/system/jhtree/keydiff.cpp b/system/jhtree/keydiff.cpp index 2e9ac69d91d..d1b302cfa7c 100644 --- a/system/jhtree/keydiff.cpp +++ b/system/jhtree/keydiff.cpp @@ -1438,7 +1438,7 @@ class CKeyDiffApplicator : public IKeyDiffApplicator, public CInterface oldcurr.init(rowsize, oldInput->isVariableWidth()); oldprev.init(rowsize, oldInput->isVariableWidth()); if(tlkGen) - tlkGen->start(); + tlkGen->start(false); } bool readOld(unsigned count) diff --git a/system/jlib/jcomp.cpp b/system/jlib/jcomp.cpp index 4d0f218f253..0e86580ccc8 100644 --- a/system/jlib/jcomp.cpp +++ b/system/jlib/jcomp.cpp @@ -485,7 +485,7 @@ bool CppCompiler::compile() TIME_SECTION(!verbose ? NULL : ">compile"); - Owned pool = createThreadPool("CCompilerWorker", this, this, maxCompileThreads && !reportOnly() ? maxCompileThreads : 1, INFINITE); + Owned pool = createThreadPool("CCompilerWorker", this, false, this, maxCompileThreads && !reportOnly() ? maxCompileThreads : 1, INFINITE); addCompileOption(COMPILE_ONLY[targetCompiler]); bool ret = false; diff --git a/system/jlib/jdebug.cpp b/system/jlib/jdebug.cpp index 3aea22a4add..df6cc1a3802 100644 --- a/system/jlib/jdebug.cpp +++ b/system/jlib/jdebug.cpp @@ -3080,7 +3080,7 @@ void startPerformanceMonitor(unsigned interval, PerfMonMode traceMode, IPerfMonH stopPerformanceMonitor(); if (!MemoryUsageReporter) { MemoryUsageReporter = new CMemoryUsageReporter(interval, traceMode, hook, (traceMode&PerfMonExtended)!=0); - MemoryUsageReporter->start(); + MemoryUsageReporter->start(false); } } diff --git a/system/jlib/jfile.cpp b/system/jlib/jfile.cpp index 8b28d627d11..417fcff3404 100644 --- a/system/jlib/jfile.cpp +++ b/system/jlib/jfile.cpp @@ -7540,7 +7540,7 @@ class CFileEventWatcher : public CInterfaceOf, implements ITh if (stopped) { stopped = false; - threaded.start(); + threaded.start(false); } } virtual void stop() override diff --git a/system/jlib/jlog.cpp b/system/jlib/jlog.cpp index f21a6e1a5d2..66d94c8135b 100644 --- a/system/jlib/jlog.cpp +++ b/system/jlib/jlog.cpp @@ -293,6 +293,22 @@ void LogMsgJobInfo::deserialize(MemoryBuffer & in) //-------------------------------------------------------------------------------------------------------------------- +const LogMsgJobInfo unknownJob(UnknownJob, UnknownUser); + +static thread_local LogMsgJobInfo tempJobInfo; +const LogMsgJobInfo & checkDefaultJobInfo(const LogMsgJobInfo & _jobInfo) +{ + if (&_jobInfo != &unknownJob) + return _jobInfo; + + //Using a thread local as a temporary is a short term change + //the next step is to remove the parameter and always return a stack object + LogMsgJobInfo & result = tempJobInfo; + result.setJobID(queryThreadedJobId()); + return result; +} + + LogMsg::LogMsg(LogMsgJobId id, const char *job) : category(MSGAUD_programmer, job ? MSGCLS_addid : MSGCLS_removeid), sysInfo(), jobInfo(id), remoteFlag(false) { if (job) @@ -1497,7 +1513,7 @@ void CLogMsgManager::enterQueueingMode() if(processor) return; processor.setown(new MsgProcessor(this)); processor->setBlockingLimit(defaultMsgQueueLimit); - processor->start(); + processor->start(false); } void CLogMsgManager::setQueueBlockingLimit(unsigned lim) @@ -3314,11 +3330,34 @@ IRemoteLogAccess *queryRemoteLogAccessor() ); } -void setDefaultJobId(const char *id, bool threaded) +void setDefaultJobName(const char * name) +{ + setDefaultJobId(theManager->addJobId(name)); +} + + +JobNameTranslator::JobNameTranslator(const char * name) +{ + set(name); +} + +void JobNameTranslator::clear() +{ + if (id) + { + theManager->removeJobId(id); + id = 0; + } +} + +void JobNameTranslator::set(const char * name) { - setDefaultJobId(theManager->addJobId(id), threaded); + clear(); + id = theManager->addJobId(name); } +//--------------------------------------------------------------------------------------------------------------------- + TraceFlags loadTraceFlags(const IPropertyTree *ptree, const std::initializer_list &optNames, TraceFlags dft) { for (auto &o: optNames) diff --git a/system/jlib/jlog.hpp b/system/jlib/jlog.hpp index 2b8149ac163..4a609630b7e 100644 --- a/system/jlib/jlog.hpp +++ b/system/jlib/jlog.hpp @@ -880,7 +880,23 @@ extern jlib_decl void setupContainerizedLogMsgHandler(); //extern jlib_decl ILogMsgManager * createLogMsgManager(); // use with care! (needed by mplog listener facility) -extern jlib_decl void setDefaultJobId(const char *id, bool threaded = false); +extern jlib_decl void setDefaultJobName(const char *id); + +//A class for mapping job names to ids +class jlib_decl JobNameTranslator +{ +public: + JobNameTranslator() = default; + JobNameTranslator(const char * name); + ~JobNameTranslator() { clear(); } + + void clear(); + void set(const char * name); + LogMsgJobId queryId() const { return id; } + +protected: + LogMsgJobId id = 0; +}; // Macros to make logging as simple as possible diff --git a/system/jlib/jpqueue.hpp b/system/jlib/jpqueue.hpp index b958b5d9a13..3b96b419020 100644 --- a/system/jlib/jpqueue.hpp +++ b/system/jlib/jpqueue.hpp @@ -127,6 +127,7 @@ interface IErrorListener: extends IInterface virtual void reportError(const char* err,...)=0; }; +//MORE: This class is barely used - I think it is only used by some ancient windows specific code. class TaskQueue { public: @@ -149,7 +150,7 @@ class TaskQueue if(workers.size()start(); + workers.back()->start(false); } // DBGLOG("%d threads",workers.size()); } diff --git a/system/jlib/jsecrets.cpp b/system/jlib/jsecrets.cpp index f731e97c987..4903eb79f40 100644 --- a/system/jlib/jsecrets.cpp +++ b/system/jlib/jsecrets.cpp @@ -1400,7 +1400,7 @@ void startSecretUpdateThread(const unsigned lookaheadMs) if (!refreshThread) { refreshThread.setown(new SecretRefreshThread()); - refreshThread->start(); + refreshThread->start(false); } } diff --git a/system/jlib/jsmartsock.cpp b/system/jlib/jsmartsock.cpp index e8644827297..ce96fe254d9 100644 --- a/system/jlib/jsmartsock.cpp +++ b/system/jlib/jsmartsock.cpp @@ -238,7 +238,7 @@ CSmartSocketFactory::CSmartSocketFactory(IPropertyTree &service, bool _retry, un if (retry) { retryInterval = _retryInterval; - this->start(); + this->start(false); } } @@ -258,7 +258,7 @@ CSmartSocketFactory::CSmartSocketFactory(const char *_socklist, bool _retry, uns if (retry) { retryInterval = _retryInterval; - this->start(); + this->start(false); } } diff --git a/system/jlib/jsocket.cpp b/system/jlib/jsocket.cpp index 962d328364b..2bb0e7cefb1 100644 --- a/system/jlib/jsocket.cpp +++ b/system/jlib/jsocket.cpp @@ -4969,7 +4969,7 @@ class CSocketSelectHandler: implements ISocketSelectHandler, public CInterface if (!started) { started = true; ForEachItemIn(i,threads) { - threads.item(i).start(); + threads.item(i).start(false); } } @@ -4992,7 +4992,7 @@ class CSocketSelectHandler: implements ISocketSelectHandler, public CInterface CSocketSelectThread *thread = new CSocketSelectThread(selecttrace); threads.append(*thread); if (started) - thread->start(); + thread->start(false); } } void remove(ISocket *sock) @@ -5559,7 +5559,7 @@ class CSocketEpollHandler: implements ISocketSelectHandler, public CInterface started = true; ForEachItemIn(i,threads) { - threads.item(i).start(); + threads.item(i).start(false); } } } @@ -5600,7 +5600,7 @@ class CSocketEpollHandler: implements ISocketSelectHandler, public CInterface CSocketEpollThread *thread = new CSocketEpollThread(epolltrace, hdlPerThrd); threads.append(*thread); if (started) - thread->start(); + thread->start(false); thread->add(sock,mode,nfy); } diff --git a/system/jlib/jsort.cpp b/system/jlib/jsort.cpp index 5b6c5f19de7..4086911b776 100644 --- a/system/jlib/jsort.cpp +++ b/system/jlib/jsort.cpp @@ -338,7 +338,7 @@ class cParQSortBase void start() { for (unsigned i=0;istart(); + threads[i]->start(true); } void subsort(unsigned s, unsigned n) diff --git a/system/jlib/jtask.cpp b/system/jlib/jtask.cpp index feb357aec31..05cc83a8b30 100644 --- a/system/jlib/jtask.cpp +++ b/system/jlib/jtask.cpp @@ -522,7 +522,7 @@ TaskScheduler::TaskScheduler(unsigned _numThreads) : numThreads(_numThreads) for (unsigned i = 0; i < numThreads; i++) processors[i] = new CTaskProcessor(this, i); for (unsigned i2 = 0; i2 < numThreads; i2++) - processors[i2]->start(); + processors[i2]->start(false); } TaskScheduler::~TaskScheduler() diff --git a/system/jlib/jthread.cpp b/system/jlib/jthread.cpp index e77c14bd51b..177e539e1e2 100644 --- a/system/jlib/jthread.cpp +++ b/system/jlib/jthread.cpp @@ -381,8 +381,10 @@ void Thread::captureThreadLoggingInfo() ::saveThreadContext(savedCtx); } -void Thread::start() +void Thread::start(bool inheritThreadContext) { + if (inheritThreadContext) + captureThreadLoggingInfo(); if (alive) { IWARNLOG("Thread::start(%s) - Thread already started!",getName()); PrintStackReport(); @@ -551,7 +553,7 @@ unsigned getThreadCount() CThreadedPersistent::CThreadedPersistent(const char *name, IThreaded *_owner) : athread(*this, name), owner(_owner), state(s_ready) { halt = false; - athread.start(); + athread.start(false); } CThreadedPersistent::~CThreadedPersistent() @@ -593,7 +595,7 @@ void CThreadedPersistent::threadmain() } } -void CThreadedPersistent::start() +void CThreadedPersistent::start(bool inheritThreadContext) { unsigned expected = s_ready; if (!state.compare_exchange_strong(expected, s_running)) @@ -603,6 +605,8 @@ void CThreadedPersistent::start() PrintStackReport(); throw MakeStringExceptionDirect(-1, msg.str()); } + if (inheritThreadContext) + athread.captureThreadLoggingInfo(); sem.signal(); } @@ -682,7 +686,6 @@ void CAsyncFor::For(unsigned num,unsigned maxatonce,bool abortFollowingException { idx = _idx; self = _self; - captureThreadLoggingInfo(); } int run() { @@ -712,7 +715,7 @@ void CAsyncFor::For(unsigned num,unsigned maxatonce,bool abortFollowingException ready.wait(); if (abortFollowingException && e.isSet()) break; Owned thread = new cdothread(this,shuffled?shuffler->lookup(i):i,ready,e); - thread->start(); + thread->start(true); started.append(*thread.getClear()); } ForEachItemIn(idx, started) @@ -735,7 +738,6 @@ void CAsyncFor::For(unsigned num,unsigned maxatonce,bool abortFollowingException { idx = _idx; self = _self; - captureThreadLoggingInfo(); } int run() { @@ -760,7 +762,7 @@ void CAsyncFor::For(unsigned num,unsigned maxatonce,bool abortFollowingException for (i=0;i thread = new cdothread(this,i,e); - thread->start(); + thread->start(true); started.append(*thread.getClear()); } @@ -815,21 +817,23 @@ class CPooledThreadWrapper; class CThreadPoolBase { public: - CThreadPoolBase() {} + CThreadPoolBase(bool _inheritThreadContext) + : inheritThreadContext(_inheritThreadContext) + { + } virtual ~CThreadPoolBase() {} protected: friend class CPooledThreadWrapper; IExceptionHandler *exceptionHandler; CriticalSection crit; StringAttr poolname; - int donewaiting; Semaphore donesem; PointerArray waitingsems; UnsignedArray waitingids; bool stopall; + const bool inheritThreadContext; unsigned defaultmax; unsigned targetpoolsize; unsigned delay; - SavedThreadContext savedCtx; Semaphore availsem; std::atomic_uint numrunning{0}; virtual void notifyStarted(CPooledThreadWrapper *item)=0; @@ -850,7 +854,6 @@ class CPooledThreadWrapper: public Thread IPooledThread *_thread) // takes ownership of thread : Thread(StringBuffer("Member of thread pool: ").append(_parent.poolname).str()), parent(_parent) { - savedCtx = parent.savedCtx; thread = _thread; handle = _handle; runningName.set(_parent.poolname); @@ -890,7 +893,8 @@ class CPooledThreadWrapper: public Thread if (parent.stopall) break; } - restoreThreadContext(savedCtx); + if (parent.inheritThreadContext) + restoreThreadContext(savedCtx); parent.notifyStarted(this); try { @@ -990,9 +994,9 @@ class CThreadPool: public CThreadPoolBase, implements IThreadPool, public CInter IThreadFactory *factory; unsigned stacksize; unsigned timeoutOnRelease; - unsigned traceStartDelayPeriod; - unsigned startsInPeriod; - cycle_t startDelayInPeriod; + unsigned traceStartDelayPeriod = 0; + unsigned startsInPeriod = 0; + cycle_t startDelayInPeriod = 0; CCycleTimer overAllTimer; PooledThreadHandle _start(void *param,const char *name, bool noBlock, unsigned timeout=0) @@ -1033,6 +1037,8 @@ class CThreadPool: public CThreadPoolBase, implements IThreadPool, public CInter CPooledThreadWrapper &t = allocThread(); if (name) t.setName(name); + if (inheritThreadContext) + t.captureThreadLoggingInfo(); t.go(param); ret = t.queryHandle(); } @@ -1042,7 +1048,8 @@ class CThreadPool: public CThreadPoolBase, implements IThreadPool, public CInter public: IMPLEMENT_IINTERFACE; - CThreadPool(IThreadFactory *_factory,IExceptionHandler *_exceptionHandler,const char *_poolname,unsigned _defaultmax, unsigned _delay, unsigned _stacksize, unsigned _timeoutOnRelease, unsigned _targetpoolsize) + CThreadPool(IThreadFactory *_factory,bool _inheritThreadContext, IExceptionHandler *_exceptionHandler,const char *_poolname,unsigned _defaultmax, unsigned _delay, unsigned _stacksize, unsigned _timeoutOnRelease, unsigned _targetpoolsize) + : CThreadPoolBase(_inheritThreadContext) { poolname.set(_poolname); factory = LINK(_factory); @@ -1056,9 +1063,6 @@ class CThreadPool: public CThreadPoolBase, implements IThreadPool, public CInter stacksize = _stacksize; timeoutOnRelease = _timeoutOnRelease; targetpoolsize = _targetpoolsize?_targetpoolsize:defaultmax; - traceStartDelayPeriod = 0; - startsInPeriod = 0; - startDelayInPeriod = 0; } ~CThreadPool() @@ -1104,7 +1108,7 @@ class CThreadPool: public CThreadPoolBase, implements IThreadPool, public CInter CPooledThreadWrapper &ret = *new CPooledThreadWrapper(*this,newid,factory->createNew()); if (stacksize) ret.setStackSize(stacksize); - ret.start(); + ret.start(false); threadwrappers.append(ret); return ret; } @@ -1282,16 +1286,12 @@ class CThreadPool: public CThreadPoolBase, implements IThreadPool, public CInter } return false; } - void captureThreadLoggingInfo() - { - ::saveThreadContext(savedCtx); - } }; -IThreadPool *createThreadPool(const char *poolname,IThreadFactory *factory,IExceptionHandler *exceptionHandler,unsigned defaultmax, unsigned delay, unsigned stacksize, unsigned timeoutOnRelease, unsigned targetpoolsize) +IThreadPool *createThreadPool(const char *poolname,IThreadFactory *factory,bool inheritThreadContext, IExceptionHandler *exceptionHandler,unsigned defaultmax, unsigned delay, unsigned stacksize, unsigned timeoutOnRelease, unsigned targetpoolsize) { - return new CThreadPool(factory,exceptionHandler,poolname,defaultmax,delay,stacksize,timeoutOnRelease,targetpoolsize); + return new CThreadPool(factory,inheritThreadContext,exceptionHandler,poolname,defaultmax,delay,stacksize,timeoutOnRelease,targetpoolsize); } //======================================================================================================= @@ -2194,7 +2194,7 @@ protected: friend class PipeWriterThread; forkthread.clear(); } forkthread.setown(new cForkThread(this)); - forkthread->start(); + forkthread->start(true); bool joined = false; { CriticalUnblock unblock(sect); @@ -2216,7 +2216,7 @@ protected: friend class PipeWriterThread; delete stderrbufferthread; } stderrbufferthread = new cStdErrorBufferThread(stderrbufsize,hError,sect); - stderrbufferthread->start(); + stderrbufferthread->start(true); } return true; } @@ -2523,7 +2523,9 @@ class CWorkQueueThread: implements IWorkQueueThread, public CInterface } if (!work) break; + try { + //If the thread context needs to be preserved - it should be done inside the IWorkQueueItem implementation. work->execute(); work->Release(); } @@ -2554,7 +2556,7 @@ class CWorkQueueThread: implements IWorkQueueThread, public CInterface CriticalBlock block(crit); if (!worker) { worker.setown(new cWorkerThread(this,crit,persisttime)); - worker->start(); + worker->start(false); } worker->queue.enqueue(packet); worker->sem.signal(); @@ -2673,21 +2675,19 @@ void PerfTracer::dostop() } //--------------------------------------------------------------------------------------------------------------------- -#include "jlog.hpp" -static LogMsgJobInfo globalDefaultJobInfo(UnknownJob, UnknownUser); +//Global defaults used to initialize thread local variables +static TraceFlags defaultTraceFlags = TraceFlags::Standard; // NOTE - extern thread_local variables are very inefficient - don't be tempted to expose the variables directly -static TraceFlags defaultTraceFlags = TraceFlags::Standard; -static thread_local LogMsgJobInfo defaultJobInfo; +static thread_local LogMsgJobId defaultJobId = UnknownJob; static thread_local TraceFlags threadTraceFlags = TraceFlags::Standard; static thread_local const IContextLogger *default_thread_logctx = nullptr; -const LogMsgJobInfo unknownJob(UnknownJob, UnknownUser); - void saveThreadContext(SavedThreadContext & saveCtx) { + saveCtx.jobId = defaultJobId; saveCtx.logctx = default_thread_logctx; saveCtx.traceFlags = threadTraceFlags; } @@ -2697,25 +2697,20 @@ void restoreThreadContext(const SavedThreadContext & saveCtx) // Note - as implemented the thread default job info is determined by what the global one was when the thread was created. // There is an alternative interpretation, that an unset thread-local one should default to whatever the global one is at the time the thread one is used. // In practice I doubt there's a lot of difference as global one is likely to be set once at program startup - defaultJobInfo = globalDefaultJobInfo; + defaultJobId = saveCtx.jobId; default_thread_logctx = saveCtx.logctx; threadTraceFlags = saveCtx.traceFlags; } -const LogMsgJobInfo & checkDefaultJobInfo(const LogMsgJobInfo & _jobInfo) + +LogMsgJobId queryThreadedJobId() { - if (&_jobInfo == &unknownJob) - { - return defaultJobInfo; - } - return _jobInfo; + return defaultJobId; } -void setDefaultJobId(LogMsgJobId id, bool threaded) +void setDefaultJobId(LogMsgJobId id) { - if (!threaded) - globalDefaultJobInfo.setJobID(id); - defaultJobInfo.setJobID(id); + defaultJobId = id; } const IContextLogger * queryThreadedContextLogger() diff --git a/system/jlib/jthread.hpp b/system/jlib/jthread.hpp index f49c9efd312..3390dacd27d 100644 --- a/system/jlib/jthread.hpp +++ b/system/jlib/jthread.hpp @@ -34,10 +34,12 @@ #endif // Functions used to reset thread-local context variables, when a threadpool starts +typedef unsigned __int64 LogMsgJobId; interface IContextLogger; struct jlib_decl SavedThreadContext { const IContextLogger * logctx = nullptr; + LogMsgJobId jobId = (LogMsgJobId)-1; TraceFlags traceFlags = queryDefaultTraceFlags(); }; @@ -48,7 +50,7 @@ extern void saveThreadContext(SavedThreadContext & saveCtx); interface jlib_decl IThread : public IInterface { - virtual void start() = 0; + virtual void start(bool inheritThreadContext) = 0; virtual int run() = 0; }; @@ -134,7 +136,7 @@ friend class CThreadedPersistent; bool join(unsigned timeout=INFINITE); void captureThreadLoggingInfo(); // Capture current thread logging context to be used by this thread when started - virtual void start(); + virtual void start(bool inheritThreadContext); virtual void startRelease(); StringBuffer &getInfo(StringBuffer &str); @@ -168,7 +170,7 @@ class CThreaded : public Thread public: inline CThreaded(const char *name, IThreaded *_owner) : Thread(name), owner(_owner) { } inline CThreaded(const char *name) : Thread(name) { owner = NULL; } - inline void init(IThreaded *_owner) { owner = _owner; start(); } + inline void init(IThreaded *_owner, bool inheritThreadContext) { owner = _owner; start(inheritThreadContext); } virtual int run() { owner->threadmain(); return 1; } }; @@ -192,7 +194,7 @@ class jlib_decl CThreadedPersistent public: CThreadedPersistent(const char *name, IThreaded *_owner); ~CThreadedPersistent(); - void start(); + void start(bool inheritThreadContext); bool join(unsigned timeout, bool throwException = true); }; @@ -279,13 +281,13 @@ interface IThreadPool : extends IInterface virtual PooledThreadHandle startNoBlock(void *param)=0; // starts a new thread if it can do so without blocking, else throws exception virtual void setStartDelayTracing(unsigned secs) = 0; // set start delay tracing period virtual bool waitAvailable(unsigned timeout) = 0; // wait until a pool member is available - virtual void captureThreadLoggingInfo() = 0; // Capture current thread logging context to be used by thread in pool when started }; extern jlib_decl IThreadPool *createThreadPool( const char *poolname, // trace name of pool IThreadFactory *factory, // factory for creating new thread instances - IExceptionHandler *exceptionHandler=NULL, // optional exception handler + bool inheritThreadContext, // Are threads run independent of the calling context(false), or within the calling context(true) + IExceptionHandler *exceptionHandler, // optional exception handler unsigned defaultmax=50, // maximum number of threads before starts blocking unsigned delay=1000, // maximum delay on each block unsigned stacksize=0, // stack size (bytes) 0 is default @@ -374,11 +376,8 @@ extern jlib_decl IWorkQueueThread *createWorkQueueThread(unsigned persisttime=10 //--- Functions that manage the current thread state =- for context, tracing, spans etc. -typedef unsigned __int64 LogMsgJobId; -class LogMsgJobInfo; - -const LogMsgJobInfo & checkDefaultJobInfo(const LogMsgJobInfo & _jobInfo); -extern jlib_decl void setDefaultJobId(LogMsgJobId id, bool threaded = false); +extern jlib_decl LogMsgJobId queryThreadedJobId(); +extern jlib_decl void setDefaultJobId(LogMsgJobId id); extern const IContextLogger * queryThreadedContextLogger(); diff --git a/system/mp/mpcomm.cpp b/system/mp/mpcomm.cpp index 4c5d696f09d..b592cf3e070 100644 --- a/system/mp/mpcomm.cpp +++ b/system/mp/mpcomm.cpp @@ -472,7 +472,7 @@ class CMPConnectThread: public Thread void start() { stopped = false; - threaded.init(this); + threaded.init(this, false); } void stop() { @@ -2330,10 +2330,10 @@ void CMPConnectThread::startPort(unsigned short port) } }; Owned factory = new CMPConnectThreadFactory(*this); - threadPool.setown(createThreadPool("MPConnectPool", factory, nullptr, acceptThreadPoolSize, INFINITE)); + threadPool.setown(createThreadPool("MPConnectPool", factory, false, nullptr, acceptThreadPoolSize, INFINITE)); slowClientProcessor.start(); } - Thread::start(); + Thread::start(false); } // only returns true if !failOnTimeout and times out @@ -2654,7 +2654,7 @@ CMPServer::CMPServer(unsigned __int64 _role, unsigned _port, bool _listen) broadcastpackethandler = new BroadcastPacketHandler; // TAG_SYS_BCAST userpackethandler = new UserPacketHandler(this); // default notifyclosedthread = new CMPNotifyClosedThread(this); - notifyclosedthread->start(); + notifyclosedthread->start(false); selecthandler->start(); rettag = (int)TAG_REPLY_BASE; // NB negative diff --git a/system/mp/mplog.cpp b/system/mp/mplog.cpp index efb0e3765e5..9d3a466d1ba 100644 --- a/system/mp/mplog.cpp +++ b/system/mp/mplog.cpp @@ -75,7 +75,7 @@ CLogMsgLinkToChild::CLogMsgLinkToChild(MPLogId _cid, MPLogId _pid, INode * _chil : childNode(_childNode), cid(_cid), pid(_pid), connected(_connected) { receiverThread.setown(new LogMsgLogReceiverThread(cid, childNode)); - receiverThread->start(); + receiverThread->start(false); } CLogMsgLinkToChild::~CLogMsgLinkToChild() @@ -280,7 +280,7 @@ bool disconnectLogMsgManagerFromChildOwn(INode * childNode) void startLogMsgChildReceiver() { childReceiver = new LogMsgChildReceiverThread(); - childReceiver->start(); + childReceiver->start(false); } // CHILD-SIDE CLASSES @@ -559,7 +559,7 @@ bool disconnectLogMsgManagerFromParentOwn(INode * parentNode) void startLogMsgParentReceiver() { parentReceiver = new LogMsgParentReceiverThread(); - parentReceiver->start(); + parentReceiver->start(false); } // MISC. HELPER FUNCTION diff --git a/system/mp/mplog.ipp b/system/mp/mplog.ipp index a0150d650c9..2f3e91c15cb 100644 --- a/system/mp/mplog.ipp +++ b/system/mp/mplog.ipp @@ -142,7 +142,7 @@ public: unsigned queryMessageFields() const { return MSGFIELD_all; } void setMessageFields(unsigned _fields = MSGFIELD_all) {} ILogMsgFilter * receiveFilter() const; - void startReceiver() { receiverThread->start(); } + void startReceiver() { receiverThread->start(false); } void connect(); void disconnect(); bool queryConnected() const { return connected; } diff --git a/system/mp/mputil.hpp b/system/mp/mputil.hpp index 11ef128556f..835f896d5e1 100644 --- a/system/mp/mputil.hpp +++ b/system/mp/mputil.hpp @@ -50,7 +50,7 @@ class CMessageHandler: public CInterface, public IThreadFactory parent = _parent; handler = _handler; name = strdup(_name); - pool = createThreadPool(name,this,exceptionHandler,maxthreads,lowThreadsDelay,0,timeoutOnRelease); // this will cause this to be linked + pool = createThreadPool(name,this,false,exceptionHandler,maxthreads,lowThreadsDelay,0,timeoutOnRelease); // this will cause this to be linked hasexceptionhandler = exceptionHandler!=NULL; } ~CMessageHandler() diff --git a/system/mp/test/mptest.cpp b/system/mp/test/mptest.cpp index 23ffd37f714..399bc6ba907 100644 --- a/system/mp/test/mptest.cpp +++ b/system/mp/test/mptest.cpp @@ -488,7 +488,7 @@ void MultiTest(ICommunicator *_comm) Owned comm; comm.set(_comm); - server.start(); + server.start(false); CMessageBuffer mb; CRandomBuffer *buff = new CRandomBuffer(); @@ -675,7 +675,7 @@ void MPAlltoAll(IGroup *group, ICommunicator *mpicomm, size32_t buffsize=0, unsi unsigned startTime = msTick(); - sender.start(); + sender.start(false); // --------- @@ -982,7 +982,7 @@ void MPMultiMTSendRecv(ICommunicator* comm, int counter) } for(int i=0;istart(); + workers[i]->start(false); } for(int i=0;istart(); + thrds[j]->start(false); for(int k = 0; k < numthrds; k++) thrds[k]->join(); } diff --git a/system/xmllibtest/xmllibtest.cpp b/system/xmllibtest/xmllibtest.cpp index 95e6b7793d5..0fabc9e6efe 100644 --- a/system/xmllibtest/xmllibtest.cpp +++ b/system/xmllibtest/xmllibtest.cpp @@ -260,9 +260,9 @@ void doTransformUseThead() } for(i = 0; i < NUMTHREADS; i++) { - t1s[i]->start(); - t2s[i]->start(); - t3s[i]->start(); + t1s[i]->start(false); + t2s[i]->start(false); + t3s[i]->start(false); } for(i = 0; i < NUMTHREADS; i++) diff --git a/testing/unittests/dalitests.cpp b/testing/unittests/dalitests.cpp index 44b471d0ae5..042546a3995 100644 --- a/testing/unittests/dalitests.cpp +++ b/testing/unittests/dalitests.cpp @@ -851,7 +851,7 @@ class CDaliSDSStressTests : public CppUnit::TestFixture } void start() { - threaded.init(this); + threaded.init(this, false); } void join() { @@ -1069,7 +1069,7 @@ class CDaliSDSStressTests : public CppUnit::TestFixture id[i] = querySDS().subscribe(s.str(),*sub,false,true); } count = 0; - start(); + start(false); } virtual int run() @@ -2627,7 +2627,7 @@ class CDaliDFSRetrySlowTests : public CppUnit::TestFixture PROGLOG("Unlocking file: %s", fileName.get()); } - void start() { threaded.start(); } + void start() { threaded.start(false); } }; void testDFSAddFailReAdd() diff --git a/testing/unittests/jlibtests.cpp b/testing/unittests/jlibtests.cpp index d6406999973..62224a93fc2 100644 --- a/testing/unittests/jlibtests.cpp +++ b/testing/unittests/jlibtests.cpp @@ -904,7 +904,7 @@ class JlibSetTestStress : public JlibSetTest setValue = !initial; clearValue = initial; } - void start() { threaded.start(); } + void start() { threaded.start(false); } void join() { threaded.join(); @@ -1618,14 +1618,14 @@ class JlibReaderWriterTestTiming : public CppUnit::TestFixture for (unsigned i2 = 0; i2 < numConsumers; i2++) { consumers[i2] = new Reader(queue, stopSem, readerWork); - consumers[i2]->start(); + consumers[i2]->start(false); } WriterBase * * producers = new WriterBase *[numProducers]; for (unsigned i1 = 0; i1 < numProducers; i1++) { producers[i1] = new Writer(queue, sizePerProducer, buffer + i1 * sizePerProducer, startSem, writerDoneSem, writerWork); - producers[i1]->start(); + producers[i1]->start(false); } cycle_t startTime = get_cycles_now(); @@ -3010,7 +3010,7 @@ class AtomicTimingTest : public CppUnit::TestFixture { LockTestThread * next = new LockTestThread(startSem, endSem, lock, value1, lock, extraValues, numIterations); threads.append(*next); - next->start(); + next->start(false); } cycle_t startCycles = get_cycles_now(); diff --git a/testing/unittests/unittests.cpp b/testing/unittests/unittests.cpp index b5ba68369af..8e0f4efd089 100644 --- a/testing/unittests/unittests.cpp +++ b/testing/unittests/unittests.cpp @@ -764,9 +764,9 @@ class ThreadedPersistStressTest : public CppUnit::TestFixture CThreadedPersistent thread1("1", &t1), thread2("2", &t2), thread3("3", &t3); for (unsigned i = 0; i < iters; i++) { - thread1.start(); - thread2.start(); - thread3.start(); + thread1.start(false); + thread2.start(false); + thread3.start(false); ret = call_from_thread(count); thread1.join(INFINITE); thread2.join(INFINITE); @@ -792,9 +792,9 @@ class ThreadedPersistStressTest : public CppUnit::TestFixture CThreaded tthread1("1", &t1), tthread2("2", &t2), tthread3("3", &t3); for (unsigned i = 0; i < iters; i++) { - tthread1.start(); - tthread2.start(); - tthread3.start(); + tthread1.start(false); + tthread2.start(false); + tthread3.start(false); ret = call_from_thread(count); tthread1.join(); tthread2.join(); @@ -828,9 +828,9 @@ class ThreadedPersistStressTest : public CppUnit::TestFixture CPersistentTask task1("1", &t1), task2("2", &t2), task3("3", &t3); for (unsigned i = 0; i < iters; i++) { - task1.start(); - task2.start(); - task3.start(); + task1.start(false); + task2.start(false); + task3.start(false); ret = call_from_thread(count); task1.join(INFINITE); task2.join(INFINITE); @@ -844,9 +844,9 @@ class ThreadedPersistStressTest : public CppUnit::TestFixture MyThread thread1(count), thread2(count), thread3(count); for (unsigned i = 0; i < iters; i++) { - thread1.start(); - thread2.start(); - thread3.start(); + thread1.start(false); + thread2.start(false); + thread3.start(false); ret = call_from_thread(count); thread1.join(INFINITE); thread2.join(INFINITE); @@ -867,9 +867,9 @@ class ThreadedPersistStressTest : public CppUnit::TestFixture threads.append(*thread2); threads.append(*thread3); - thread1->start(); - thread2->start(); - thread3->start(); + thread1->start(false); + thread2->start(false); + thread3->start(false); ret = call_from_thread(count); thread1->join(INFINITE); thread2->join(INFINITE); @@ -1020,9 +1020,9 @@ class RelaxedAtomicTimingTest : public CppUnit::TestFixture t1b(count, ra[0], lock[0], mode), t2b(count, ra[1], lock[1], mode), t3b(count, ra[2], lock[2], mode), t1c(count, ra[0], lock[0], mode), t2c(count, ra[100], lock[100], mode), t3c(count, ra[200], lock[200], mode);; DBGLOG("Testing RelaxedAtomics (test mode %u)", mode); - t1a.start(); - t2a.start(); - t3a.start(); + t1a.start(false); + t2a.start(false); + t3a.start(false); t1a.join(); t2a.join(); t3a.join(); @@ -1030,9 +1030,9 @@ class RelaxedAtomicTimingTest : public CppUnit::TestFixture for (int a = 0; a < 201; a++) ra[a] = 0; timer.reset(); - t1b.start(); - t2b.start(); - t3b.start(); + t1b.start(false); + t2b.start(false); + t3b.start(false); t1b.join(); t2b.join(); t3b.join(); @@ -1040,9 +1040,9 @@ class RelaxedAtomicTimingTest : public CppUnit::TestFixture for (int a = 0; a < 201; a++) ra[a] = 0; timer.reset(); - t1c.start(); - t2c.start(); - t3c.start(); + t1c.start(false); + t2c.start(false); + t3c.start(false); t1c.join(); t2c.join(); t3c.join(); diff --git a/thorlcr/activities/funnel/thfunnelslave.cpp b/thorlcr/activities/funnel/thfunnelslave.cpp index faa4628195a..91deee8d18f 100644 --- a/thorlcr/activities/funnel/thfunnelslave.cpp +++ b/thorlcr/activities/funnel/thfunnelslave.cpp @@ -59,7 +59,7 @@ class CParallelFunnel : implements IRowStream, public CSimpleInterface void start() { // NB don't start in constructor - threaded.start(); + threaded.start(true); } void stop() { diff --git a/thorlcr/activities/hashdistrib/thhashdistribslave.cpp b/thorlcr/activities/hashdistrib/thhashdistribslave.cpp index 6413f395132..1ce8551fd34 100644 --- a/thorlcr/activities/hashdistrib/thhashdistribslave.cpp +++ b/thorlcr/activities/hashdistrib/thhashdistribslave.cpp @@ -496,7 +496,7 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl stoppedTargets = 0; dedupSamples = dedupSuccesses = 0; doDedup = owner.doDedup; - writerPool.setown(createThreadPool("HashDist writer pool", this, this, owner.writerPoolSize, 5*60*1000)); + writerPool.setown(createThreadPool("HashDist writer pool", this, true, this, owner.writerPoolSize, 5*60*1000)); self = owner.activity->queryJobChannel().queryMyRank()-1; sendersFinished = new std::atomic[owner.numnodes]; @@ -943,7 +943,7 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl { parent = _parent; } - void start() { threaded.start(); } + void start() { threaded.start(true); } void join(unsigned timeout=INFINITE) { threaded.join(timeout); } void stop() { @@ -967,7 +967,7 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl { parent = _parent; } - void start() { threaded.start(); } + void start() { threaded.start(true); } void join(unsigned timeout=INFINITE) { threaded.join(timeout); } // IThreaded impl. virtual void threadmain() override @@ -1973,7 +1973,7 @@ class CRowPullDistributor: public CDistributorBase stopped = false; stopping = false; txthread = new cTxThread(*this); - txthread->start(); + txthread->start(true); } virtual void join() // probably does nothing diff --git a/thorlcr/activities/keyedjoin/thkeyedjoinslave-legacy.cpp b/thorlcr/activities/keyedjoin/thkeyedjoinslave-legacy.cpp index 481abecda01..8c6f06ced04 100644 --- a/thorlcr/activities/keyedjoin/thkeyedjoinslave-legacy.cpp +++ b/thorlcr/activities/keyedjoin/thkeyedjoinslave-legacy.cpp @@ -610,7 +610,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem CKeyedFetchResultProcessor(CKeyedJoinSlave &_owner, ICommunicator &_comm, mptag_t _mpTag) : threaded("CKeyedFetchResultProcessor", this), owner(_owner), comm(_comm), resultMpTag(_mpTag) { aborted = false; - threaded.start(); + threaded.start(true); } ~CKeyedFetchResultProcessor() { @@ -722,7 +722,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem CKeyedFetchRequestProcessor(CKeyedJoinSlave &_owner, ICommunicator &_comm, mptag_t _requestMpTag, mptag_t _resultMpTag) : threaded("CKeyedFetchRequestProcessor", this), owner(_owner), comm(_comm), requestMpTag(_requestMpTag), resultMpTag(_resultMpTag) { aborted = false; - threaded.start(); + threaded.start(true); unsigned expectedFormatCrc = owner.helper->getDiskFormatCrc(); unsigned projectedFormatCrc = owner.helper->getProjectedFormatCrc(); IOutputMetaData *projectedFormat = owner.helper->queryProjectedDiskRecordSize(); @@ -946,7 +946,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem requestProcessor = new CKeyedFetchRequestProcessor(owner, owner.queryJobChannel().queryJobComm(), requestMpTag, resultMpTag); // remote receive of fetch fpos' resultProcessor = new CKeyedFetchResultProcessor(owner, owner.queryJobChannel().queryJobComm(), resultMpTag); // asynchronously receiving results back - threaded.start(); + threaded.start(true); } ~CKeyedFetchHandler() { @@ -1587,7 +1587,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem queueSize = _queueSizestart(NULL); diff --git a/thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp b/thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp index 59997c87a34..36b53ed6f1e 100644 --- a/thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp +++ b/thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp @@ -1114,7 +1114,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem limiter->dec(); // normally handled at end of thread return; } - threaded.start(); + threaded.start(true); break; } else if (state == ts_stopping) @@ -2125,7 +2125,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem } void start() { - threaded.start(); + threaded.start(true); } void join() { diff --git a/thorlcr/activities/lookupjoin/thlookupjoinslave.cpp b/thorlcr/activities/lookupjoin/thlookupjoinslave.cpp index ae5dabcb914..47d4835213f 100644 --- a/thorlcr/activities/lookupjoin/thlookupjoinslave.cpp +++ b/thorlcr/activities/lookupjoin/thlookupjoinslave.cpp @@ -118,7 +118,7 @@ class CBroadcaster : public CSimpleInterface void start() { aborted = false; - threaded.start(); + threaded.start(true); } void abort(bool join) { @@ -188,7 +188,7 @@ class CBroadcaster : public CSimpleInterface { aborted = false; exception.clear(); - threaded.start(); + threaded.start(true); } void abort(bool join) { @@ -556,7 +556,7 @@ class CMarker chunkUnique = 0; } rowidx_t getUnique() const { return chunkUnique; } - void start() { threaded.start(); } + void start() { threaded.start(true); } void join() { threaded.join(); } // IThreaded virtual void threadmain() override @@ -860,7 +860,7 @@ class CInMemJoinBase : public CSlaveActivity, public CAllOrLookupHelper, stopped = false; clearQueue(); exception.clear(); - threaded.start(); + threaded.start(true); } void abort() { diff --git a/thorlcr/activities/loop/thloopslave.cpp b/thorlcr/activities/loop/thloopslave.cpp index 356ac36023f..722358f0e4d 100644 --- a/thorlcr/activities/loop/thloopslave.cpp +++ b/thorlcr/activities/loop/thloopslave.cpp @@ -163,7 +163,7 @@ class CLoopSlaveActivity : public CLoopSlaveActivityBase stopped = true; stopping = false; smartbuf.setown(createSmartInMemoryBuffer(activity, activity, SMALL_SMART_BUFFER_SIZE)); - threaded.init(this); + threaded.init(this, true); } ~CNextRowFeeder() { diff --git a/thorlcr/activities/merge/thmergeslave.cpp b/thorlcr/activities/merge/thmergeslave.cpp index 1e59f5a7bd5..3dbfed026af 100644 --- a/thorlcr/activities/merge/thmergeslave.cpp +++ b/thorlcr/activities/merge/thmergeslave.cpp @@ -236,7 +236,7 @@ class GlobalMergeSlaveActivity : public CSlaveActivity } delete [] intertags; provider.init(this,queryRowSerializer(),intertag); - provider.start(); + provider.start(true); if (!streams.ordinality()) return NULL; if (streams.ordinality()==1) diff --git a/thorlcr/activities/msort/thsortu.cpp b/thorlcr/activities/msort/thsortu.cpp index fdb9ed222ad..4583b6f4485 100644 --- a/thorlcr/activities/msort/thsortu.cpp +++ b/thorlcr/activities/msort/thsortu.cpp @@ -1752,8 +1752,8 @@ class CMultiCoreJoinHelper: public CMultiCoreJoinHelperBase return false; stopped = false; for (unsigned i=0;istart(); - reader.start(); + workers[i]->start(true); + reader.start(true); return true; } virtual const void *nextRow() @@ -1948,8 +1948,8 @@ class CMultiCoreUnorderedJoinHelper: public CMultiCoreJoinHelperBase workqueue.setLimit(numworkers+1); rowWriter.setown(multiWriter->getWriter()); for (unsigned i=0;istart(); - reader.start(); + workers[i]->start(true); + reader.start(true); return true; } virtual const void *nextRow() diff --git a/thorlcr/activities/nsplitter/thnsplitterslave.cpp b/thorlcr/activities/nsplitter/thnsplitterslave.cpp index 7d76cdaccdf..56c5dbe0c91 100644 --- a/thorlcr/activities/nsplitter/thnsplitterslave.cpp +++ b/thorlcr/activities/nsplitter/thnsplitterslave.cpp @@ -126,7 +126,7 @@ class NSplitterSlaveActivity : public CSlaveActivity, implements ISharedSmartBuf void start() { stopped = false; - threaded.start(); + threaded.start(true); } virtual void stop() { diff --git a/thorlcr/activities/piperead/thprslave.cpp b/thorlcr/activities/piperead/thprslave.cpp index 6f8b260ce98..400a27797c0 100644 --- a/thorlcr/activities/piperead/thprslave.cpp +++ b/thorlcr/activities/piperead/thprslave.cpp @@ -404,7 +404,7 @@ class CPipeThroughSlaveActivity : public CPipeSlaveBase openPipe(pipeProgram, "PIPETHROUGH"); } pipeWriter = new PipeWriterThread(*this); - pipeWriter->start(); + pipeWriter->start(true); } CATCH_NEXTROW() { diff --git a/thorlcr/activities/project/thprojectslave.cpp b/thorlcr/activities/project/thprojectslave.cpp index 85042aeef46..5c9ce6ee953 100644 --- a/thorlcr/activities/project/thprojectslave.cpp +++ b/thorlcr/activities/project/thprojectslave.cpp @@ -176,7 +176,7 @@ class CPrefetchProjectSlaveActivity : public CSlaveActivity eoi = true; return NULL; } - void start() { recordCount = 0; full = blocked = eoq = eoi = stopped = false; eog = true; threaded.start(); } + void start() { recordCount = 0; full = blocked = eoq = eoi = stopped = false; eog = true; threaded.start(true); } void stop() { stopped = true; diff --git a/thorlcr/activities/thactivityutil.cpp b/thorlcr/activities/thactivityutil.cpp index c77f5ede002..bb8bbb10279 100644 --- a/thorlcr/activities/thactivityutil.cpp +++ b/thorlcr/activities/thactivityutil.cpp @@ -159,7 +159,7 @@ class CRowStreamLookAhead : public CSimpleInterfaceOf public: CNotifyThread(CRowStreamLookAhead &_owner) : threaded("Lookahead-CNotifyThread"), owner(_owner) { - threaded.init(this); + threaded.init(this, true); } ~CNotifyThread() { @@ -236,7 +236,7 @@ class CRowStreamLookAhead : public CSimpleInterfaceOf #endif started = true; running = true; - thread.start(); + thread.start(true); startSem.wait(); } // IEngineRowStream diff --git a/thorlcr/activities/thactivityutil.ipp b/thorlcr/activities/thactivityutil.ipp index ee6ff4ea159..3e90d92d6aa 100644 --- a/thorlcr/activities/thactivityutil.ipp +++ b/thorlcr/activities/thactivityutil.ipp @@ -98,7 +98,7 @@ class CAsyncCall : implements IThreaded std::function func; public: CAsyncCall(std::function _func) : threaded("CAsyncCall", this), func(_func) { } - void start() { threaded.start(); } + void start() { threaded.start(true); } void wait() { threaded.join(); } // IThreaded virtual void threadmain() override { func(); } diff --git a/thorlcr/activities/wuidwrite/thwuidwrite.cpp b/thorlcr/activities/wuidwrite/thwuidwrite.cpp index 458f36575c1..2e5aeb11077 100644 --- a/thorlcr/activities/wuidwrite/thwuidwrite.cpp +++ b/thorlcr/activities/wuidwrite/thwuidwrite.cpp @@ -192,7 +192,7 @@ class CWorkUnitWriteLocalActivityMaster : public CWorkUnitWriteMasterBase CMessageHandler(CWorkUnitWriteLocalActivityMaster &_act) : act(_act), threaded("CWorkUnitWriteLocalActivityMaster::CMessageHandler") { started = waiting = stopped = false; - threaded.init(this); + threaded.init(this, true); } ~CMessageHandler() { diff --git a/thorlcr/graph/thgraph.cpp b/thorlcr/graph/thgraph.cpp index 6a301336a01..056d9f82d58 100644 --- a/thorlcr/graph/thgraph.cpp +++ b/thorlcr/graph/thgraph.cpp @@ -2472,7 +2472,7 @@ class CGraphExecutor : implements IGraphExecutor, public CInterface waitOnRunning = 0; stopped = false; factory = new CGraphExecutorFactory(); - graphPool.setown(createThreadPool("CGraphExecutor pool", factory, &jobChannel, limit)); + graphPool.setown(createThreadPool("CGraphExecutor pool", factory, true, &jobChannel, limit)); } ~CGraphExecutor() { diff --git a/thorlcr/graph/thgraphmaster.cpp b/thorlcr/graph/thgraphmaster.cpp index 2feb711afad..665c8778a39 100644 --- a/thorlcr/graph/thgraphmaster.cpp +++ b/thorlcr/graph/thgraphmaster.cpp @@ -103,7 +103,7 @@ class CFatalHandler : public CTimeoutTrigger, implements IFatalHandler CSlaveMessageHandler::CSlaveMessageHandler(CJobMaster &_job, mptag_t _mptag) : threaded("CSlaveMessageHandler"), job(_job), mptag(_mptag) { stopped = false; - threaded.init(this); + threaded.init(this, false); childGraphInitTimeout = job.getOptUInt(THOROPT_CHILD_GRAPH_INIT_TIMEOUT, 5*60) * 1000; // default 5 minutes } @@ -545,7 +545,7 @@ void CMasterActivity::startProcess(bool async) if (async) { asyncStart = true; - threaded.start(); + threaded.start(true); } else threadmain(); @@ -2016,7 +2016,7 @@ void CJobMaster::pause(bool doAbort) public: CAbortThread(CJobMaster &_owner, IException *_exception) : owner(_owner), exception(_exception), threaded("SaveSpillThread", this) { - threaded.start(); + threaded.start(true); } ~CAbortThread() { diff --git a/thorlcr/master/mawatchdog.cpp b/thorlcr/master/mawatchdog.cpp index 3290bb5da12..88744ed8b91 100644 --- a/thorlcr/master/mawatchdog.cpp +++ b/thorlcr/master/mawatchdog.cpp @@ -88,7 +88,7 @@ void CMasterWatchdog::start() { PROGLOG("Starting watchdog"); stopped = false; - threaded.init(this); + threaded.init(this, false); #ifdef _WIN32 threaded.adjustPriority(+1); // it is critical that watchdog packets get through. #endif diff --git a/thorlcr/master/thgraphmanager.cpp b/thorlcr/master/thgraphmanager.cpp index cbd7f0aa788..1b6ff4ca6a6 100644 --- a/thorlcr/master/thgraphmanager.cpp +++ b/thorlcr/master/thgraphmanager.cpp @@ -191,7 +191,7 @@ class CJobManager : public CSimpleInterface, implements IJobManager, implements unsigned defaultThorDebugPort = getFixedPort(getMasterPortBase(), TPORT_debug); port = globals->getPropInt("DebugPort", defaultThorDebugPort); running = true; - threaded.start(); + threaded.start(false); } ~CThorDebugListener() { @@ -460,7 +460,7 @@ class CIdleShutdown : public CSimpleInterface, implements IThreaded Semaphore sem; CThreaded threaded; public: - CIdleShutdown(unsigned _timeout) : timeout(_timeout*60000), threaded("CIdleShutdown") { threaded.init(this); } + CIdleShutdown(unsigned _timeout) : timeout(_timeout*60000), threaded("CIdleShutdown") { threaded.init(this, false); } ~CIdleShutdown() { stop(); threaded.join(); } virtual void threadmain() override { @@ -634,7 +634,7 @@ void CJobManager::run() public: CThorListener(mptag_t _mptag) : threaded("CDaliConnectionValidator"), mptag(_mptag) { - threaded.init(this); + threaded.init(this, false); } ~CThorListener() { stop(); threaded.join(); } void stop() @@ -887,6 +887,10 @@ bool CJobManager::doit(IConstWorkUnit *workunit, const char *graphName, const So StringAttr wuid(workunit->queryWuid()); StringAttr user(workunit->queryUser()); + LogMsgJobId thorJobId = queryLogMsgManager()->addJobId(wuid); + thorJob.setJobID(thorJobId); + setDefaultJobId(thorJobId); + LOG(MCdebugInfo, thorJob, "Processing wuid=%s, graph=%s from agent: %s", wuid.str(), graphName, agentep.getEndpointHostText(s).str()); LOG(MCauditInfo,",Progress,Thor,Start,%s,%s,%s,%s,%s,%s", queryServerStatus().queryProperties()->queryProp("@thorname"), @@ -909,6 +913,11 @@ bool CJobManager::doit(IConstWorkUnit *workunit, const char *graphName, const So user.str(), queryServerStatus().queryProperties()->queryProp("@nodeGroup"), queryServerStatus().queryProperties()->queryProp("@queue")); + + thorJob.setJobID(UnknownJob); + setDefaultJobId(UnknownJob); + queryLogMsgManager()->removeJobId(thorJobId); + if (e.get()) throw e.getClear(); return allDone; } @@ -1243,7 +1252,7 @@ class CDaliConnectionValidator : public CSimpleInterface, implements IThreaded Semaphore poll; CThreaded threaded; public: - CDaliConnectionValidator(unsigned _pollDelay) : threaded("CDaliConnectionValidator") { pollDelay = _pollDelay*1000; stopped = false; threaded.init(this); } + CDaliConnectionValidator(unsigned _pollDelay) : threaded("CDaliConnectionValidator") { pollDelay = _pollDelay*1000; stopped = false; threaded.init(this, false); } ~CDaliConnectionValidator() { stop(); threaded.join(); } virtual void threadmain() override { diff --git a/thorlcr/master/thmastermain.cpp b/thorlcr/master/thmastermain.cpp index a0a2f5469ea..748c6011a08 100644 --- a/thorlcr/master/thmastermain.cpp +++ b/thorlcr/master/thmastermain.cpp @@ -87,7 +87,7 @@ class CThorEndHandler : implements IThreaded public: CThorEndHandler() : threaded("CThorEndHandler") { - threaded.init(this); // starts thread + threaded.init(this, false); // starts thread } ~CThorEndHandler() { @@ -161,7 +161,7 @@ class CRegistryServer : public CSimpleInterface { stop(); } - void start() { threaded.init(this); } + void start() { threaded.init(this, false); } void stop() { if (running) diff --git a/thorlcr/msort/tsorts.cpp b/thorlcr/msort/tsorts.cpp index 41d8b798002..266d3a4d740 100644 --- a/thorlcr/msort/tsorts.cpp +++ b/thorlcr/msort/tsorts.cpp @@ -848,7 +848,7 @@ class CThorSorter : public CSimpleInterface, implements IThorSorter, implements } } #endif - threaded.start(); + threaded.start(true); } ~CThorSorter() { diff --git a/thorlcr/msort/tsorts1.cpp b/thorlcr/msort/tsorts1.cpp index a0810adae20..d5353f973f5 100644 --- a/thorlcr/msort/tsorts1.cpp +++ b/thorlcr/msort/tsorts1.cpp @@ -303,7 +303,7 @@ protected: friend class CSortMerge; void start() { - Thread::start(); + Thread::start(true); } CSortTransferServerThread(ISortSlaveBase &in) diff --git a/thorlcr/slave/backup.cpp b/thorlcr/slave/backup.cpp index 2875af63c0a..43c39a32c30 100644 --- a/thorlcr/slave/backup.cpp +++ b/thorlcr/slave/backup.cpp @@ -216,7 +216,7 @@ class CThorBackupHandler : public CSimpleInterface, implements IBackup, implemen } if (async) { - threaded.init(this); + threaded.init(this, false); sem.signal(); } } diff --git a/thorlcr/slave/slave.cpp b/thorlcr/slave/slave.cpp index fcd1b023e94..6b5bdc378a7 100644 --- a/thorlcr/slave/slave.cpp +++ b/thorlcr/slave/slave.cpp @@ -70,7 +70,7 @@ void ProcessSlaveActivity::beforeDispose() void ProcessSlaveActivity::startProcess(bool async) { if (async) - threaded.start(); + threaded.start(true); else threadmain(); } diff --git a/thorlcr/slave/slavmain.cpp b/thorlcr/slave/slavmain.cpp index c9acc3303a5..049f61b34a7 100644 --- a/thorlcr/slave/slavmain.cpp +++ b/thorlcr/slave/slavmain.cpp @@ -1189,7 +1189,7 @@ class CKJService : public CSimpleInterfaceOf, implements IThreaded, void setupProcessorPool() { Owned factory = new CProcessorFactory(*this); - processorPool.setown(createThreadPool("KJService processor pool", factory, this, keyLookupMaxProcessThreads, 10000)); + processorPool.setown(createThreadPool("KJService processor pool", factory, true, this, keyLookupMaxProcessThreads, 10000)); processorPool->setStartDelayTracing(60000); } public: @@ -1571,7 +1571,7 @@ class CKJService : public CSimpleInterfaceOf, implements IThreaded, virtual void start() override { aborted = false; - threaded.start(); + threaded.start(false); } virtual void stop() override { @@ -1735,7 +1735,7 @@ class CJobListener : public CSimpleInterface start(); } ~CVerifyThread() { join(); } - void start() { threaded.start(); } + void start() { threaded.start(false); } void join() { threaded.join(); } virtual void threadmain() override { diff --git a/thorlcr/slave/slwatchdog.cpp b/thorlcr/slave/slwatchdog.cpp index 7860eaeecde..c3dd6078efc 100644 --- a/thorlcr/slave/slwatchdog.cpp +++ b/thorlcr/slave/slwatchdog.cpp @@ -68,7 +68,7 @@ class CGraphProgressHandlerBase : public CInterfaceOf, implement void start() { stopped = false; - threaded.start(); + threaded.start(false); } virtual void beforeDispose() override { diff --git a/thorlcr/slave/thslavemain.cpp b/thorlcr/slave/thslavemain.cpp index 1620955ed99..896a6b042a7 100644 --- a/thorlcr/slave/thslavemain.cpp +++ b/thorlcr/slave/thslavemain.cpp @@ -549,7 +549,7 @@ int main( int argc, const char *argv[] ) CServerThread() : threaded("CServerThread") { dafsInstance.setown(createRemoteFileServer()); - threaded.init(this); + threaded.init(this, false); } ~CServerThread() { diff --git a/thorlcr/thorutil/thmem.cpp b/thorlcr/thorutil/thmem.cpp index 77091fcd426..65d11882106 100644 --- a/thorlcr/thorutil/thmem.cpp +++ b/thorlcr/thorutil/thmem.cpp @@ -2183,7 +2183,7 @@ class cMultiThorResourceMutex: public CSimpleInterface, implements ILargeMemLimi nodeComm.set(&queryNodeComm()); if (nodeComm->queryGroup().rank(queryMyNode())==0) { // master so start thread thread.setown(new cMultiThorResourceMutexThread(*this)); - thread->start(); + thread->start(false); StringBuffer mname("thorres:"); mname.append(groupname); mutex.setown(createDaliMutex(mname.str())); diff --git a/thorlcr/thorutil/thormisc.cpp b/thorlcr/thorutil/thormisc.cpp index c2598961693..ff98bd8c015 100644 --- a/thorlcr/thorutil/thormisc.cpp +++ b/thorlcr/thorutil/thormisc.cpp @@ -1310,7 +1310,7 @@ class CRowServer : public CSimpleInterface, implements IThreaded, implements IRo { fetchBuffSize = DEFAULT_ROWSERVER_BUFF_SIZE; running = true; - threaded.init(this); + threaded.init(this, true); } ~CRowServer() { diff --git a/thorlcr/thorutil/thormisc.hpp b/thorlcr/thorutil/thormisc.hpp index ea131972dd0..f378ab1e604 100644 --- a/thorlcr/thorutil/thormisc.hpp +++ b/thorlcr/thorutil/thormisc.hpp @@ -252,7 +252,7 @@ class graph_decl CTimeoutTrigger : public CInterface, implements IThreaded { running = (timeout!=0); if (running) - threaded.start(); + threaded.start(false); } virtual void beforeDispose() override { diff --git a/tools/testsocket/testsocket.cpp b/tools/testsocket/testsocket.cpp index ca531a09dde..221e4361a72 100644 --- a/tools/testsocket/testsocket.cpp +++ b/tools/testsocket/testsocket.cpp @@ -629,7 +629,8 @@ int doSendQuery(const char * ip, unsigned port, const char * base) if (sendToSocket) { Thread * receive = new ReceiveThread(); - receive->start(); + //MORE: The caller should really join this thread before terminating + receive->start(false); receive->Release(); } @@ -800,7 +801,8 @@ int sendQuery(const char * ip, unsigned port, const char * base) runningQueries++; Thread * thread = new QueryThread(ip, port, base); - thread->start(); + //MORE: The caller should really join this thread before terminating + thread->start(false); thread->Release(); if (multiThread && queryAbsDelayMS && !multiThreadMax)