From f479f0b06fabcd18981f097e9805852f6de340c2 Mon Sep 17 00:00:00 2001 From: Gavin Halliday Date: Tue, 12 Nov 2024 20:08:42 +0000 Subject: [PATCH] HPCC-32945 Add support for queue clients with priorities Signed-off-by: Gavin Halliday --- common/workunit/wujobq.cpp | 159 +++++++++++++++------- common/workunit/wujobq.hpp | 1 + testing/unittests/dalitests.cpp | 227 +++++++++++++++++++++----------- 3 files changed, 258 insertions(+), 129 deletions(-) diff --git a/common/workunit/wujobq.cpp b/common/workunit/wujobq.cpp index ecd34fbdee5..cec26212180 100644 --- a/common/workunit/wujobq.cpp +++ b/common/workunit/wujobq.cpp @@ -18,6 +18,7 @@ #include "platform.h" #include +#include #include "limits.h" #include "jlib.hpp" #include "jbuff.hpp" @@ -51,13 +52,12 @@ JobQueues JobQueue @name= @count= @state=active|paused|stopped Edition - Client @session= @connected= @waiting= -- connections and waiting can be > 1 (multiple threads) + Client @session= @connected= [@priority=n] @waiting= -- connections and waiting can no longer be > 1 Item* @wuid @owner @node @port @priority @session #endif - class CJobQueueItem: implements IJobQueueItem, public CInterface { int priority; @@ -395,7 +395,6 @@ class CJobQueueBase: implements IJobQueueConst, public CInterface } public: sQueueData *qdata; - Semaphore notifysem; CriticalSection crit; IMPLEMENT_IINTERFACE; @@ -789,37 +788,43 @@ class CJobQueueConst: public CJobQueueBase class CJobQueue: public CJobQueueBase, implements IJobQueue { public: - sQueueData *activeq; + sQueueData *activeq = nullptr; SessionId sessionid; - unsigned locknest; - bool writemode; - bool connected; + unsigned locknest = 0; + bool writemode = false; + bool connected = false; Owned initiateconv; StringAttr initiatewu; - bool dequeuestop; - bool cancelwaiting; - bool validateitemsessions; + std::atomic isProcessingDequeue = 0; + bool dequeuestop = false; + bool cancelwaiting = false; + bool validateitemsessions = false; - class csubs: implements ISDSSubscription, public CInterface + class QueueChangeSubscription : implements ISDSSubscription, public CInterface { - CJobQueue *parent; + public: + //If this semaphone is in the CJobQueue class then there is a race condition + //A callback may be at this point while the CJobQueue is deleted - causing it to signal + //a deleted semaphore + Semaphore notifysem; public: IMPLEMENT_IINTERFACE; - csubs(CJobQueue *_parent) - { - parent = _parent; - } + void notify(SubscriptionId id, const char *xpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData) { - CriticalBlock block(parent->crit); - parent->notifysem.signal(); + notifysem.signal(); } - } subs; + }; - IMPLEMENT_IINTERFACE; + //This must be an owned pointer, rather than a member, to avoid it being deleted while the notify() + //callback is being called. + Owned notifySubscription; - CJobQueue(const char *_qname) : CJobQueueBase(_qname), subs(this) + IMPLEMENT_IINTERFACE_USING(CJobQueueBase); + + CJobQueue(const char *_qname) : CJobQueueBase(_qname) { + notifySubscription.setown(new QueueChangeSubscription); activeq = qdata; sessionid = myProcessSession(); validateitemsessions = false; @@ -1037,7 +1042,7 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue } StringBuffer path; path.appendf("/JobQueues/Queue[@name=\"%s\"]/Edition",qd->qname.get()); - qd->subscriberid = querySDS().subscribe(path.str(), subs, false); + qd->subscriberid = querySDS().subscribe(path.str(), *notifySubscription, false); } } @@ -1048,7 +1053,7 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue if (!qd->subscriberid) { StringBuffer path; path.appendf("/JobQueues/Queue[@name=\"%s\"]/Edition",qd->qname.get()); - qd->subscriberid = querySDS().subscribe(path.str(), subs, false); + qd->subscriberid = querySDS().subscribe(path.str(), *notifySubscription, false); } unsigned e = (unsigned)qd->root->getPropInt("Edition", 1); if (e!=qd->lastWaitEdition) { @@ -1128,7 +1133,24 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue } } - sQueueData *findbestqueue(bool useprev,int minprio,unsigned numqueues,sQueueData **queues) + bool hasHigherPriorityClients(IPropertyTree * queueTree, __uint64 clientPrio, unsigned threshold) + { + unsigned higher = 0; + Owned iter = queueTree->getElements("Client"); + ForEach(*iter) + { + unsigned __int64 priority = iter->query().getPropInt64("@priority", 0); + if (priority > clientPrio) + { + higher++; + if (higher >= threshold) + return true; + } + } + return false; + } + + sQueueData *findbestqueue(bool useprev,int minprio,__uint64 clientPrio,unsigned numqueues,sQueueData **queues) { if (numqueues==0) return NULL; @@ -1139,7 +1161,11 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue for (unsigned i=0;iroot->getPropInt("@count"); - if (count) { + if (count) + { + if (hasHigherPriorityClients(qd->root, clientPrio, count)) + continue; + int mpr = useprev?std::max(qd->root->getPropInt("@prevpriority"),minprio):minprio; if (count&&((minprio==INT_MIN)||checkprio(*qd,mpr))) { StringBuffer path; @@ -1160,17 +1186,33 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue return best; } - void setWaiting(unsigned numqueues,sQueueData **queues, bool set) + void setWaiting(unsigned numqueues,sQueueData **queues, unsigned __int64 clientPrio, bool set) { for (unsigned i=0; isetPropInt64("@waiting",croot->getPropInt64("@waiting",0)+(set?1:-1)); + //If a non-zero client priority has been specified, add (or remove) it from the list of priorities + if (clientPrio) + { + if (set) + croot->setPropInt64("@priority", clientPrio); + else + croot->removeProp("@priority"); + } } } // 'simple' queuing - IJobQueueItem *dodequeue(int minprio,unsigned timeout=INFINITE, bool useprev=false, bool *timedout=NULL) + IJobQueueItem *dodequeue(int minprio, __uint64 clientPrio, unsigned timeout, bool useprev, bool * timedout) { + //If more than one thread is waiting on the queue, then the queue code does not work correctly + //It is undefined which thread the semaphore signal will wake up. + //E.g. there is one thread with a minimum priority of 0, and another with a minimum of 100, and an item of + //priority 50 is queued. If the minimum priority of 100 is woken twice nothing will be dequeued. + //Similar problems occur when the clientPriority is mixed. + if (isProcessingDequeue.exchange(true)) + throw MakeStringException(0, "Multiple concurrent dequeue not supported"); + bool hasminprio=(minprio!=INT_MIN); if (timedout) *timedout = false; @@ -1200,23 +1242,30 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue active.append(qd); } if (stopped==total) + { + isProcessingDequeue.store(false); return NULL; // all stopped + } sQueueData **activeqds = (sQueueData **)active.getArray(); unsigned activenum = active.ordinality(); if (activenum) { - sQueueData *bestqd = findbestqueue(useprev,minprio,activenum,activeqds); + sQueueData *bestqd = findbestqueue(useprev,minprio,clientPrio,activenum,activeqds); unsigned count = bestqd?bestqd->root->getPropInt("@count"):0; // load minp from cache - if (count) { - int mpr = useprev?std::max(bestqd->root->getPropInt("@prevpriority"),minprio):minprio; - if (!hasminprio||checkprio(*bestqd,mpr)) { - block.setRollback(false); - ret = dotake(*bestqd,NULL,true,hasminprio,mpr); - if (ret) // think it must be! - timeout = 0; // so mark that done - else if (!hasminprio) { - WARNLOG("Resetting queue %s",bestqd->qname.get()); - clear(*bestqd); // reset queue as seems to have become out of sync + if (count) + { + if (!hasHigherPriorityClients(bestqd->root, clientPrio, count)) + { + int mpr = useprev?std::max(bestqd->root->getPropInt("@prevpriority"),minprio):minprio; + if (!hasminprio||checkprio(*bestqd,mpr)) { + block.setRollback(false); + ret = dotake(*bestqd,NULL,true,hasminprio,mpr); + if (ret) // think it must be! + timeout = 0; // so mark that done + else if (!hasminprio) { + WARNLOG("Resetting queue %s",bestqd->qname.get()); + clear(*bestqd); // reset queue as seems to have become out of sync + } } } } @@ -1226,7 +1275,7 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue block.setRollback(false); } if (!waitingset) { - setWaiting(activenum,activeqds,true); + setWaiting(activenum, activeqds, clientPrio, true); block.commit(); waitingset = true; } @@ -1234,7 +1283,7 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue } if (timeout==0) { if (waitingset) { - setWaiting(activenum,activeqds,false); + setWaiting(activenum, activeqds, clientPrio, false); block.commit(); } if (timedout) @@ -1246,7 +1295,7 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue // check every 5 mins independant of notify (in case subscription lost for some reason) if (to>timeout) to = timeout; - notifysem.wait(to); + notifySubscription->notifysem.wait(to); if (timeout!=(unsigned)INFINITE) { t = msTick()-t; if (t qi = qitem; @@ -1627,6 +1683,11 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue return (state&&(strcmp(state,"stopped")==0)); } + void removeClient(sQueueData & qd, IPropertyTree * croot) + { + qd.root->removeTree(croot); + } + void doGetStats(sQueueData &qd,unsigned &connected,unsigned &waiting,unsigned &enqueued) { Cconnlockblock block(this,false); @@ -1639,7 +1700,7 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue break; if (validateitemsessions && !validSession(croot)) { Cconnlockblock block(this,true); - qd.root->removeTree(croot); + removeClient(qd, croot); } else { waiting += croot->getPropInt("@waiting"); @@ -1771,7 +1832,7 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue int minprio = 0; unsigned timeout = prioritytransitiondelay; bool usePrevPrio = true; - item.setown(dodequeue(minprio, timeout, usePrevPrio, &timedout)); + item.setown(dodequeue(minprio, 0, timeout, usePrevPrio, &timedout)); } else item.setown(dequeue(INFINITE)); @@ -1814,7 +1875,7 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue { CriticalBlock block(crit); dequeuestop = true; - notifysem.signal(); + notifySubscription->notifysem.signal(); } bool cancelInitiateConversation(sQueueData &qd,const char *wuid) @@ -1851,7 +1912,7 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue if (haschanged()) return true; } - if (!notifysem.wait(timeout)) + if (!notifySubscription->notifysem.wait(timeout)) break; } return false; @@ -1860,7 +1921,7 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue { CriticalBlock block(crit); cancelwaiting = true; - notifysem.signal(); + notifySubscription->notifysem.signal(); } virtual void enqueue(IJobQueueItem *qitem) diff --git a/common/workunit/wujobq.hpp b/common/workunit/wujobq.hpp index 4fdcda7a837..ed7cc2e2257 100644 --- a/common/workunit/wujobq.hpp +++ b/common/workunit/wujobq.hpp @@ -121,6 +121,7 @@ interface IJobQueue: extends IJobQueueConst // validateitemsessions ensures that all queue items have running session virtual IJobQueueItem *dequeue(unsigned timeout=INFINITE)=0; virtual IJobQueueItem *dequeue(int minPrio, unsigned timeout, unsigned prioritytransitiondelay)=0; + virtual IJobQueueItem *dequeuePriority(unsigned __int64 priority, unsigned timeout=INFINITE)=0; virtual void disconnect()=0; // signal no longer wil be dequeing (optional - done automatically on release) virtual void getStats(unsigned &connected,unsigned &waiting, unsigned &enqueued)=0; // this not quick as validates clients still running virtual bool waitStatsChange(unsigned timeout)=0; diff --git a/testing/unittests/dalitests.cpp b/testing/unittests/dalitests.cpp index 444f2117e8c..a7e1bd124c5 100644 --- a/testing/unittests/dalitests.cpp +++ b/testing/unittests/dalitests.cpp @@ -3434,11 +3434,36 @@ class DaliJobQueueTester : public CppUnit::TestFixture }; + class PriorityJobProcessor : public JobProcessor + { + public: + PriorityJobProcessor(Semaphore & _startedSem, Semaphore & _processedSem, IJobQueue * _queue, unsigned _id) + : JobProcessor(_startedSem, _processedSem, _queue, _id) + { + } + + virtual void processAll() override + { + __uint64 priority = 0; + for (;;) + { + Owned item = queue->dequeuePriority(priority); + if (!item) + item.setown(queue->dequeue(0, INFINITE, 0)); + bool ret = processItem(item); + if (!ret) + break; + priority = getTimeStampNowValue(); + } + } + }; + enum JobProcessorType { StandardProcessor, ThorProcessor, NewThorProcessor, + PriorityProcessor, }; void testInit() @@ -3453,108 +3478,139 @@ class DaliJobQueueTester : public CppUnit::TestFixture void runTestCase(const char * name, const std::initializer_list & jobs, const std::initializer_list & processors, const std::initializer_list & expectedResults, bool uniqueQueues) { - Owned queue = createJobQueue("DaliJobQueueTester"); - Semaphore startedSem; - Semaphore processedSem; - - CIArrayOf jobProcessors; - for (auto & processor : processors) + try { - JobProcessor * cur = nullptr; - Owned localQueue; - IJobQueue * processorQueue = queue; - if (uniqueQueues) - { - localQueue.setown(createJobQueue("DaliJobQueueTester")); - processorQueue = localQueue; - } + Owned queue = createJobQueue("DaliJobQueueTester"); + queue->connect(true); + queue->clear(); - switch (processor) + Semaphore startedSem; + Semaphore processedSem; + + CIArrayOf jobProcessors; + for (auto & processor : processors) { - case StandardProcessor: - cur = new StandardJobProcessor(startedSem, processedSem, processorQueue, jobProcessors.ordinality()); - break; - case ThorProcessor: - cur = new ThorJobProcessor(startedSem, processedSem, processorQueue, jobProcessors.ordinality()); - break; - case NewThorProcessor: - cur = new NewThorJobProcessor(startedSem, processedSem, processorQueue, jobProcessors.ordinality()); - break; - default: - UNIMPLEMENTED; + JobProcessor * cur = nullptr; + Owned localQueue; + IJobQueue * processorQueue = queue; + if (uniqueQueues) + { + localQueue.setown(createJobQueue("DaliJobQueueTester")); + processorQueue = localQueue; + } + + switch (processor) + { + case StandardProcessor: + cur = new StandardJobProcessor(startedSem, processedSem, processorQueue, jobProcessors.ordinality()); + break; + case ThorProcessor: + cur = new ThorJobProcessor(startedSem, processedSem, processorQueue, jobProcessors.ordinality()); + break; + case NewThorProcessor: + cur = new NewThorJobProcessor(startedSem, processedSem, processorQueue, jobProcessors.ordinality()); + break; + case PriorityProcessor: + cur = new PriorityJobProcessor(startedSem, processedSem, processorQueue, jobProcessors.ordinality()); + break; + default: + UNIMPLEMENTED; + } + jobProcessors.append(*cur); + cur->start(true); } - jobProcessors.append(*cur); - cur->start(true); - } - for (auto & processor : processors) - startedSem.wait(); + for (auto & processor : processors) + startedSem.wait(); - IArrayOf conversations; - jobQueueStartTick = msTick(); - for (auto & job : jobs) - { - jobQueueSleep(job.delayMs); - if (traceJobQueue) - DBGLOG("Add (%s, %d, %d) @%u", job.name, job.delayMs, job.processingMs, getJobQueueTick()); - Owned item = createJobQueueItem(job.name); - item->setPort(job.processingMs); - item->setPriority(job.priority); + IArrayOf conversations; + jobQueueStartTick = msTick(); + for (auto & job : jobs) + { + jobQueueSleep(job.delayMs); + if (traceJobQueue) + DBGLOG("Add (%s, %d, %d) @%u", job.name, job.delayMs, job.processingMs, getJobQueueTick()); + Owned item = createJobQueueItem(job.name); + item->setPort(job.processingMs); + item->setPriority(job.priority); - queue->enqueue(item.getClear()); - } + queue->enqueue(item.getClear()); + } - ForEachItemIn(i1, jobProcessors) - { - if (traceJobQueue) - DBGLOG("Add (eoj) @%u", getJobQueueTick()); + for (;;) + { + //Wait until all the items have been processed before adding the special end markers + //otherwise the ends will be interpreted as valid items, and may cause the items to + //be dequeued by the wrong thread. + unsigned connected; + unsigned waiting; + unsigned enqueued; + queue->getStats(connected,waiting,enqueued); + if (enqueued == 0) + break; + MilliSleep(100 * tickScaling); + } - //The queue code dedups by "wuid", so we need to add a unique "stop" entry - std::string end = std::string("!") + std::to_string(i1); - Owned item = createJobQueueItem(end.c_str()); - queue->enqueue(item.getClear()); - } + ForEachItemIn(i1, jobProcessors) + { + if (traceJobQueue) + DBGLOG("Add (eoj) @%u", getJobQueueTick()); - ForEachItemIn(i2, jobProcessors) - { - if (traceJobQueue) - DBGLOG("Wait for %u", i2); - jobProcessors.item(i2).join(); - } + //The queue code dedups by "wuid", so we need to add a unique "stop" entry + std::string end = std::string("!") + std::to_string(i1); + Owned item = createJobQueueItem(end.c_str()); + queue->enqueue(item.getClear()); + } - DBGLOG("%s:%s, %ums", name, uniqueQueues ? " unique queues" : "", getJobQueueTick()); - ForEachItemIn(i3, jobProcessors) - { - JobProcessor & cur = jobProcessors.item(i3); - DBGLOG(" Result: '%s' '%s'", cur.queryOutput(), cur.queryLog()); + ForEachItemIn(i2, jobProcessors) + { + if (traceJobQueue) + DBGLOG("Wait for %u", i2); + jobProcessors.item(i2).join(); + } - //If expected results are provided, check that the result matches one of them (it is undefined which - //processor will match which result) - if (expectedResults.size()) + DBGLOG("%s:%s, %ums", name, uniqueQueues ? " unique queues" : "", getJobQueueTick()); + unsigned numProcessors = processors.size(); + ForEachItemIn(i3, jobProcessors) { - bool matched = false; - StringBuffer expectedText; - for (auto & expected : expectedResults) + JobProcessor & cur = jobProcessors.item(i3); + DBGLOG(" Result: '%s' '%s'", cur.queryOutput(), cur.queryLog()); + + //If expected results are provided, check that the result matches one of them (it is undefined which + //processor will match which result) + if (expectedResults.size()) { - if (streq(expected, cur.queryOutput())) + bool matched = false; + StringBuffer expectedText; + for (auto & expected : expectedResults) { - matched = true; - break; + if (streq(expected, cur.queryOutput())) + { + matched = true; + break; + } + expectedText.append(", ").append(expected); + } + if (!matched) + { + DBGLOG("Test %s: No match for output %u: %s", name, i3, expectedText.str()+2); + CPPUNIT_ASSERT_MESSAGE("Result does not match any of the expected results", false); } - expectedText.append(", ").append(expected); - } - if (!matched) - { - DBGLOG("Result does not match any expected: %s", expectedText.str()+2); - CPPUNIT_ASSERT_MESSAGE("Result does not match any of the expected results", false); } } } + catch (IException * e) + { + StringBuffer msg("Fail: "); + e->errorMessage(msg); + e->Release(); + CPPUNIT_ASSERT_MESSAGE(msg.str(), 0); + } } void runTestCaseX2(const char * name, const std::initializer_list & jobs, const std::initializer_list & processors, const std::initializer_list & expectedResults) { - runTestCase(name, jobs, processors, expectedResults, false); + //runTestCase(name, jobs, processors, expectedResults, false); runTestCase(name, jobs, processors, expectedResults, true); } @@ -3644,7 +3700,7 @@ class DaliJobQueueTester : public CppUnit::TestFixture runTestCase("lo hi2 wu, 1 thor", lowHigh2Test, { ThorProcessor }, {}, false); runTestCase("lo hi2 wu, 1 newthor", lowHigh2Test, { NewThorProcessor }, {}, false); runTestCase("drip wu, 1 std", dripFeedTest, { StandardProcessor }, {}, false); - + runTestCase("drip wu, 1 std", dripFeedTest, { PriorityProcessor }, {}, false); } void testDouble() @@ -3657,15 +3713,26 @@ class DaliJobQueueTester : public CppUnit::TestFixture runTestCaseX2("lo hi3 wu, 2 thor", lowHigh3Test, { ThorProcessor, ThorProcessor }, {}); runTestCaseX2("lo hi3 wu, 2 newthor", lowHigh3Test, { NewThorProcessor, NewThorProcessor }, {}); + runTestCaseX2("lo hi3 wu, 2 prio", lowHigh3Test, { PriorityProcessor, PriorityProcessor }, {}); runTestCaseX2("drip wu, 2 std", dripFeedTest, { StandardProcessor, StandardProcessor }, {}); runTestCaseX2("drip wu, 2 newthor", dripFeedTest, { NewThorProcessor, NewThorProcessor }, {}); + runTestCaseX2("drip wu, 2 prio", dripFeedTest, { PriorityProcessor, PriorityProcessor }, { "abcdefghij", "" }); } void testMany() { runTestCaseX2("drip wu, 3 std", dripFeedTest, { StandardProcessor, StandardProcessor, StandardProcessor }, {}); runTestCaseX2("drip2 wu, 3 std", drip2FeedTest, { StandardProcessor, StandardProcessor, StandardProcessor }, {}); + runTestCaseX2("drip wu, 3 prio", dripFeedTest, { PriorityProcessor, PriorityProcessor, PriorityProcessor }, { "abcdefghij", "", "" }); + runTestCaseX2("drip2 wu, 3 prio", drip2FeedTest, { PriorityProcessor, PriorityProcessor, PriorityProcessor }, { "acegikmo", "bdfhjln", ""}); } + + //MORE Tests: + //Many requests at a time in waves + //Priority 1,2,3 fixed - not dynamic + //Stopping listening after N to check priorities removed correctly + //Mix standard and priority + //Priority with expiring and gaps to ensure the correct client picks up the items. }; CPPUNIT_TEST_SUITE_REGISTRATION( DaliJobQueueTester );