From 5e124d3011f3f66aaff4ed37cdb664645d7344e1 Mon Sep 17 00:00:00 2001 From: Gavin Halliday Date: Tue, 12 Nov 2024 20:08:42 +0000 Subject: [PATCH] HPCC-32945 Add support for queue clients with priorities Signed-off-by: Gavin Halliday --- common/workunit/wujobq.cpp | 137 ++++++++++++++------ common/workunit/wujobq.hpp | 1 + testing/unittests/dalitests.cpp | 217 ++++++++++++++++++++++---------- 3 files changed, 253 insertions(+), 102 deletions(-) diff --git a/common/workunit/wujobq.cpp b/common/workunit/wujobq.cpp index cd1f0d5b330..fe20091c17a 100644 --- a/common/workunit/wujobq.cpp +++ b/common/workunit/wujobq.cpp @@ -18,6 +18,7 @@ #include "platform.h" #include +#include #include "limits.h" #include "jlib.hpp" #include "jbuff.hpp" @@ -51,13 +52,12 @@ JobQueues JobQueue @name= @count= @state=active|paused|stopped Edition - Client @session= @connected= @waiting= -- connections and waiting can be > 1 (multiple threads) + Client @session= @connected= [@priority=n] @waiting= -- connections and waiting can no longer be > 1 Item* @wuid @owner @node @port @priority @session #endif - class CJobQueueItem: implements IJobQueueItem, public CInterface { int priority; @@ -789,16 +789,17 @@ class CJobQueueConst: public CJobQueueBase class CJobQueue: public CJobQueueBase, implements IJobQueue { public: - sQueueData *activeq; + sQueueData *activeq = nullptr; SessionId sessionid; - unsigned locknest; - bool writemode; - bool connected; + unsigned locknest = 0; + bool writemode = false; + bool connected = false; Owned initiateconv; StringAttr initiatewu; - bool dequeuestop; - bool cancelwaiting; - bool validateitemsessions; + std::atomic isProcessingDequeue = 0; + bool dequeuestop = false; + bool cancelwaiting = false; + bool validateitemsessions = false; class csubs: implements ISDSSubscription, public CInterface { @@ -811,15 +812,21 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue } void notify(SubscriptionId id, const char *xpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData) { - CriticalBlock block(parent->crit); + //There is a race condition - a callback may be at this point while the CJobQueue is deleted. + //Adding a critical section in parent makes it much more likely to be hit. + //Ultimately the semaphore should be moved to this class instead + //CriticalBlock block(parent->crit); parent->notifysem.signal(); } - } subs; + }; - IMPLEMENT_IINTERFACE; + Owned subs; + + IMPLEMENT_IINTERFACE_USING(CJobQueueBase); - CJobQueue(const char *_qname) : CJobQueueBase(_qname), subs(this) + CJobQueue(const char *_qname) : CJobQueueBase(_qname) { + subs.setown(new csubs(this)); activeq = qdata; sessionid = myProcessSession(); validateitemsessions = false; @@ -1037,7 +1044,7 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue } StringBuffer path; path.appendf("/JobQueues/Queue[@name=\"%s\"]/Edition",qd->qname.get()); - qd->subscriberid = querySDS().subscribe(path.str(), subs, false); + qd->subscriberid = querySDS().subscribe(path.str(), *subs, false); } } @@ -1048,7 +1055,7 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue if (!qd->subscriberid) { StringBuffer path; path.appendf("/JobQueues/Queue[@name=\"%s\"]/Edition",qd->qname.get()); - qd->subscriberid = querySDS().subscribe(path.str(), subs, false); + qd->subscriberid = querySDS().subscribe(path.str(), *subs, false); } unsigned e = (unsigned)qd->root->getPropInt("Edition", 1); if (e!=qd->lastWaitEdition) { @@ -1128,7 +1135,24 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue } } - sQueueData *findbestqueue(bool useprev,int minprio,unsigned numqueues,sQueueData **queues) + bool hasHigherPriorityClients(IPropertyTree * queueTree, __uint64 clientPrio, unsigned threshold) + { + unsigned higher = 0; + Owned iter = queueTree->getElements("Client"); + ForEach(*iter) + { + unsigned __int64 priority = iter->query().getPropInt64("@priority", 0); + if (priority > clientPrio) + { + higher++; + if (higher >= threshold) + return true; + } + } + return false; + } + + sQueueData *findbestqueue(bool useprev,int minprio,__uint64 clientPrio,unsigned numqueues,sQueueData **queues) { if (numqueues==0) return NULL; @@ -1139,7 +1163,11 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue for (unsigned i=0;iroot->getPropInt("@count"); - if (count) { + if (count) + { + if (hasHigherPriorityClients(qd->root, clientPrio, count)) + continue; + int mpr = useprev?std::max(qd->root->getPropInt("@prevpriority"),minprio):minprio; if (count&&((minprio==INT_MIN)||checkprio(*qd,mpr))) { StringBuffer path; @@ -1160,17 +1188,33 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue return best; } - void setWaiting(unsigned numqueues,sQueueData **queues, bool set) + void setWaiting(unsigned numqueues,sQueueData **queues, unsigned __int64 clientPrio, bool set) { for (unsigned i=0; isetPropInt64("@waiting",croot->getPropInt64("@waiting",0)+(set?1:-1)); + //If a non-zero client priority has been specified, add (or remove) it from the list of priorities + if (clientPrio) + { + if (set) + croot->setPropInt64("@priority", clientPrio); + else + croot->removeProp("@priority"); + } } } // 'simple' queuing - IJobQueueItem *dodequeue(int minprio,unsigned timeout=INFINITE, bool useprev=false, bool *timedout=NULL) + IJobQueueItem *dodequeue(int minprio, __uint64 clientPrio, unsigned timeout, bool useprev, bool * timedout) { + //If more than one thread is waiting on the queue, then the queue code does not work correctly + //It is undefined which thread the semaphore signal will wake up. + //E.g. there is one thread with a minimum priority of 0, and another with a minimum of 100, and an item of + //priority 50 is queued. If the minimum priority of 100 is woken twice nothing will be dequeued. + //Similar problems occur when the clientPriority is mixed. + if (isProcessingDequeue.exchange(true)) + throw MakeStringException(0, "Multiple concurrent dequeue not supported"); + bool hasminprio=(minprio!=INT_MIN); if (timedout) *timedout = false; @@ -1200,23 +1244,30 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue active.append(qd); } if (stopped==total) + { + isProcessingDequeue.store(false); return NULL; // all stopped + } sQueueData **activeqds = (sQueueData **)active.getArray(); unsigned activenum = active.ordinality(); if (activenum) { - sQueueData *bestqd = findbestqueue(useprev,minprio,activenum,activeqds); + sQueueData *bestqd = findbestqueue(useprev,minprio,clientPrio,activenum,activeqds); unsigned count = bestqd?bestqd->root->getPropInt("@count"):0; // load minp from cache - if (count) { - int mpr = useprev?std::max(bestqd->root->getPropInt("@prevpriority"),minprio):minprio; - if (!hasminprio||checkprio(*bestqd,mpr)) { - block.setRollback(false); - ret = dotake(*bestqd,NULL,true,hasminprio,mpr); - if (ret) // think it must be! - timeout = 0; // so mark that done - else if (!hasminprio) { - WARNLOG("Resetting queue %s",bestqd->qname.get()); - clear(*bestqd); // reset queue as seems to have become out of sync + if (count) + { + if (!hasHigherPriorityClients(bestqd->root, clientPrio, count)) + { + int mpr = useprev?std::max(bestqd->root->getPropInt("@prevpriority"),minprio):minprio; + if (!hasminprio||checkprio(*bestqd,mpr)) { + block.setRollback(false); + ret = dotake(*bestqd,NULL,true,hasminprio,mpr); + if (ret) // think it must be! + timeout = 0; // so mark that done + else if (!hasminprio) { + WARNLOG("Resetting queue %s",bestqd->qname.get()); + clear(*bestqd); // reset queue as seems to have become out of sync + } } } } @@ -1226,7 +1277,7 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue block.setRollback(false); } if (!waitingset) { - setWaiting(activenum,activeqds,true); + setWaiting(activenum, activeqds, clientPrio, true); block.commit(); waitingset = true; } @@ -1234,7 +1285,7 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue } if (timeout==0) { if (waitingset) { - setWaiting(activenum,activeqds,false); + setWaiting(activenum, activeqds, clientPrio, false); block.commit(); } if (timedout) @@ -1255,12 +1306,14 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue timeout = 0; } } + + isProcessingDequeue.store(false); return ret; } IJobQueueItem *dequeue(unsigned timeout=INFINITE) { - return dodequeue(INT_MIN,timeout); + return dodequeue(INT_MIN, 0, timeout, false, nullptr); } @@ -1271,13 +1324,18 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue { unsigned timeout = prioritytransitiondelay; bool usePrevPrio = true; - item.setown(dodequeue(minPrio, timeout, usePrevPrio, nullptr)); + item.setown(dodequeue(minPrio, 0, timeout, usePrevPrio, nullptr)); } if (!item) - item.setown(dodequeue(minPrio, timeout-prioritytransitiondelay, false, nullptr)); + item.setown(dodequeue(minPrio, 0, timeout-prioritytransitiondelay, false, nullptr)); return item.getClear(); } + IJobQueueItem *dequeuePriority(unsigned __int64 priority, unsigned timeout=INFINITE) + { + return dodequeue(INT_MIN, priority, timeout, false, nullptr); + } + void placeonqueue(sQueueData &qd, IJobQueueItem *qitem,unsigned idx) // takes ownership of qitem { Owned qi = qitem; @@ -1628,6 +1686,11 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue return (state&&(strcmp(state,"stopped")==0)); } + void removeClient(sQueueData & qd, IPropertyTree * croot) + { + qd.root->removeTree(croot); + } + void doGetStats(sQueueData &qd,unsigned &connected,unsigned &waiting,unsigned &enqueued) { Cconnlockblock block(this,false); @@ -1640,7 +1703,7 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue break; if (validateitemsessions && !validSession(croot)) { Cconnlockblock block(this,true); - qd.root->removeTree(croot); + removeClient(qd, croot); } else { waiting += croot->getPropInt("@waiting"); @@ -1772,7 +1835,7 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue int minprio = 0; unsigned timeout = prioritytransitiondelay; bool usePrevPrio = true; - item.setown(dodequeue(minprio, timeout, usePrevPrio, &timedout)); + item.setown(dodequeue(minprio, 0, timeout, usePrevPrio, &timedout)); } else item.setown(dequeue(INFINITE)); diff --git a/common/workunit/wujobq.hpp b/common/workunit/wujobq.hpp index 4fdcda7a837..ed7cc2e2257 100644 --- a/common/workunit/wujobq.hpp +++ b/common/workunit/wujobq.hpp @@ -121,6 +121,7 @@ interface IJobQueue: extends IJobQueueConst // validateitemsessions ensures that all queue items have running session virtual IJobQueueItem *dequeue(unsigned timeout=INFINITE)=0; virtual IJobQueueItem *dequeue(int minPrio, unsigned timeout, unsigned prioritytransitiondelay)=0; + virtual IJobQueueItem *dequeuePriority(unsigned __int64 priority, unsigned timeout=INFINITE)=0; virtual void disconnect()=0; // signal no longer wil be dequeing (optional - done automatically on release) virtual void getStats(unsigned &connected,unsigned &waiting, unsigned &enqueued)=0; // this not quick as validates clients still running virtual bool waitStatsChange(unsigned timeout)=0; diff --git a/testing/unittests/dalitests.cpp b/testing/unittests/dalitests.cpp index 2a08b473519..d82f2eb1192 100644 --- a/testing/unittests/dalitests.cpp +++ b/testing/unittests/dalitests.cpp @@ -3430,11 +3430,36 @@ class JobQueueTester : public CppUnit::TestFixture }; + class PriorityJobProcessor : public JobProcessor + { + public: + PriorityJobProcessor(Semaphore & _startedSem, Semaphore & _processedSem, IJobQueue * _queue, unsigned _id) + : JobProcessor(_startedSem, _processedSem, _queue, _id) + { + } + + virtual void processAll() override + { + __uint64 priority = 0; + for (;;) + { + Owned item = queue->dequeuePriority(priority); + if (!item) + item.setown(queue->dequeue(0, INFINITE, 0)); + bool ret = processItem(item); + if (!ret) + break; + priority = getTimeStampNowValue(); + } + } + }; + enum JobProcessorType { StandardProcessor, ThorProcessor, NewThorProcessor, + PriorityProcessor, }; void testInit() @@ -3449,88 +3474,139 @@ class JobQueueTester : public CppUnit::TestFixture void runTestCase(const char * name, const std::initializer_list & jobs, const std::initializer_list & processors, const std::initializer_list & expectedResults, bool uniqueQueues) { - Owned queue = createJobQueue("JobQueueTester"); - Semaphore startedSem; - Semaphore processedSem; - - CIArrayOf jobProcessors; - for (auto & processor : processors) + try { - JobProcessor * cur = nullptr; - Owned localQueue; - IJobQueue * processorQueue = queue; - if (uniqueQueues) + Owned queue = createJobQueue("JobQueueTester"); + queue->connect(true); + queue->clear(); + + Semaphore startedSem; + Semaphore processedSem; + + CIArrayOf jobProcessors; + for (auto & processor : processors) { - localQueue.setown(createJobQueue("JobQueueTester")); - processorQueue = localQueue; + JobProcessor * cur = nullptr; + Owned localQueue; + IJobQueue * processorQueue = queue; + if (uniqueQueues) + { + localQueue.setown(createJobQueue("JobQueueTester")); + processorQueue = localQueue; + } + + switch (processor) + { + case StandardProcessor: + cur = new StandardJobProcessor(startedSem, processedSem, processorQueue, jobProcessors.ordinality()); + break; + case ThorProcessor: + cur = new ThorJobProcessor(startedSem, processedSem, processorQueue, jobProcessors.ordinality()); + break; + case NewThorProcessor: + cur = new NewThorJobProcessor(startedSem, processedSem, processorQueue, jobProcessors.ordinality()); + break; + case PriorityProcessor: + cur = new PriorityJobProcessor(startedSem, processedSem, processorQueue, jobProcessors.ordinality()); + break; + default: + UNIMPLEMENTED; + } + jobProcessors.append(*cur); + cur->start(true); } - switch (processor) + for (auto & processor : processors) + startedSem.wait(); + + IArrayOf conversations; + jobQueueStartTick = msTick(); + for (auto & job : jobs) { - case StandardProcessor: - cur = new StandardJobProcessor(startedSem, processedSem, processorQueue, jobProcessors.ordinality()); - break; - case ThorProcessor: - cur = new ThorJobProcessor(startedSem, processedSem, processorQueue, jobProcessors.ordinality()); - break; - case NewThorProcessor: - cur = new NewThorJobProcessor(startedSem, processedSem, processorQueue, jobProcessors.ordinality()); - break; - default: - UNIMPLEMENTED; + JobQueueSleep(job.delayMs); + if (traceJobQueue) + DBGLOG("Add (%s, %d, %d) @%u", job.name, job.delayMs, job.processingMs, getJobQueueTick()); + Owned item = createJobQueueItem(job.name); + item->setPort(job.processingMs); + item->setPriority(job.priority); + + queue->enqueue(item.getClear()); } - jobProcessors.append(*cur); - cur->start(true); - } - for (auto & processor : processors) - startedSem.wait(); + for (;;) + { + //Wait until all the items have been processed before adding the special end markers + //otherwise the ends will be interpreted as valid items, and may cause the items to + //be dequeued by the wrong thread. + unsigned connected; + unsigned waiting; + unsigned enqueued; + queue->getStats(connected,waiting,enqueued); + if (enqueued == 0) + break; + MilliSleep(100 * tickScaling); + } - IArrayOf conversations; - jobQueueStartTick = msTick(); - for (auto & job : jobs) - { - JobQueueSleep(job.delayMs); - if (traceJobQueue) - DBGLOG("Add (%s, %d, %d) @%u", job.name, job.delayMs, job.processingMs, getJobQueueTick()); - Owned item = createJobQueueItem(job.name); - item->setPort(job.processingMs); - item->setPriority(job.priority); + ForEachItemIn(i1, jobProcessors) + { + if (traceJobQueue) + DBGLOG("Add (eoj) @%u", getJobQueueTick()); - queue->enqueue(item.getClear()); - } + //The queue code dedups by "wuid", so we need to add a unique "stop" entry + std::string end = std::string("!") + std::to_string(i1); + Owned item = createJobQueueItem(end.c_str()); + queue->enqueue(item.getClear()); + } - ForEachItemIn(i1, jobProcessors) - { - if (traceJobQueue) - DBGLOG("Add (eoj) @%u", getJobQueueTick()); + ForEachItemIn(i2, jobProcessors) + { + if (traceJobQueue) + DBGLOG("Wait for %u", i2); + jobProcessors.item(i2).join(); + } - //The queue code dedups by "wuid", so we need to add a unique "stop" entry - std::string end = std::string("!") + std::to_string(i1); - Owned item = createJobQueueItem(end.c_str()); - queue->enqueue(item.getClear()); - } + DBGLOG("%s:%s, %ums", name, uniqueQueues ? " unique queues" : "", getJobQueueTick()); + unsigned numProcessors = processors.size(); + ForEachItemIn(i3, jobProcessors) + { + JobProcessor & cur = jobProcessors.item(i3); + DBGLOG(" Result: '%s' '%s'", cur.queryOutput(), cur.queryLog()); + } - ForEachItemIn(i2, jobProcessors) - { - if (traceJobQueue) - DBGLOG("Wait for %u", i2); - jobProcessors.item(i2).join(); + if (numProcessors == expectedResults.size()) + { + ForEachItemIn(i3, jobProcessors) + { + JobProcessor & cur = jobProcessors.item(i3); + unsigned matchedPos = numProcessors; + for (unsigned i =0; i < numProcessors; i++) + { + if (streq(expectedResults.begin()[i], cur.queryOutput())) + { + matchedPos = i; + break; + } + } + if (matchedPos == numProcessors) + { + VStringBuffer msg("Test %s: No match for output %u", name, i3); + CPPUNIT_ASSERT_MESSAGE(msg.str(), 0); + } + } + } } - - DBGLOG("%s:%s, %ums", name, uniqueQueues ? " unique queues" : "", getJobQueueTick()); - ForEachItemIn(i3, jobProcessors) + catch (IException * e) { - JobProcessor & cur = jobProcessors.item(i3); - DBGLOG(" Result: '%s' '%s'", cur.queryOutput(), cur.queryLog()); -// if (i3 < expectedResults.size()) -// CPPUNIT_ASSERT_EQUAL(std::string(expectedResults.begin()[i3]), std::string(cur.queryOutput())); + StringBuffer msg("Fail: "); + e->errorMessage(msg); + e->Release(); + CPPUNIT_ASSERT_MESSAGE(msg.str(), 0); } } void runTestCaseX2(const char * name, const std::initializer_list & jobs, const std::initializer_list & processors, const std::initializer_list & expectedResults) { - runTestCase(name, jobs, processors, expectedResults, false); + //runTestCase(name, jobs, processors, expectedResults, false); runTestCase(name, jobs, processors, expectedResults, true); } @@ -3620,28 +3696,39 @@ class JobQueueTester : public CppUnit::TestFixture runTestCase("lo hi2 wu, 1 thor", lowHigh2Test, { ThorProcessor }, {}, false); runTestCase("lo hi2 wu, 1 newthor", lowHigh2Test, { NewThorProcessor }, {}, false); runTestCase("drip wu, 1 std", dripFeedTest, { StandardProcessor }, {}, false); - + runTestCase("drip wu, 1 std", dripFeedTest, { PriorityProcessor }, {}, false); } void testDouble() { runTestCaseX2("2 wu, 2 standard", twoWuTest, { StandardProcessor, StandardProcessor }, { "abcd", "ABCD" }); runTestCaseX2("lo hi wu, 2 standard", lowHighTest, { StandardProcessor, StandardProcessor }, { "aBDc" "ACbd" }); - runTestCaseX2("lo hi2 wu, 2 standard", lowHigh2Test, { StandardProcessor, StandardProcessor }, { "a"}); + runTestCaseX2("lo hi2 wu, 2 standard", lowHigh2Test, { StandardProcessor, StandardProcessor }, { }); runTestCaseX2("lo hi2 wu, 2 thor", lowHigh2Test, { ThorProcessor, ThorProcessor }, {}); runTestCaseX2("lo hi2 wu, 2 newthor", lowHigh2Test, { NewThorProcessor, NewThorProcessor }, {}); runTestCaseX2("lo hi3 wu, 2 thor", lowHigh3Test, { ThorProcessor, ThorProcessor }, {}); runTestCaseX2("lo hi3 wu, 2 newthor", lowHigh3Test, { NewThorProcessor, NewThorProcessor }, {}); + runTestCaseX2("lo hi3 wu, 2 prio", lowHigh3Test, { PriorityProcessor, PriorityProcessor }, {}); runTestCaseX2("drip wu, 2 std", dripFeedTest, { StandardProcessor, StandardProcessor }, {}); runTestCaseX2("drip wu, 2 newthor", dripFeedTest, { NewThorProcessor, NewThorProcessor }, {}); + runTestCaseX2("drip wu, 2 prio", dripFeedTest, { PriorityProcessor, PriorityProcessor }, { "abcdefghij", "" }); } void testMany() { runTestCaseX2("drip wu, 3 std", dripFeedTest, { StandardProcessor, StandardProcessor, StandardProcessor }, {}); runTestCaseX2("drip2 wu, 3 std", drip2FeedTest, { StandardProcessor, StandardProcessor, StandardProcessor }, {}); + runTestCaseX2("drip wu, 3 prio", dripFeedTest, { PriorityProcessor, PriorityProcessor, PriorityProcessor }, { "abcdefghij", "", "" }); + runTestCaseX2("drip2 wu, 3 prio", drip2FeedTest, { PriorityProcessor, PriorityProcessor, PriorityProcessor }, { "acegikmo", "bdfhjln", ""}); } + + //MORE Tests: + //Many requests at a time in waves + //Priority 1,2,3 fixed - not dynamic + //Stopping listening after N to check priorities removed correctly + //Mix standard and priority + //Priority with expiring and gaps to ensure the correct client picks up the items. }; CPPUNIT_TEST_SUITE_REGISTRATION( JobQueueTester );