diff --git a/common/workunit/wujobq.cpp b/common/workunit/wujobq.cpp index e5346670cc2..47a35e8f1dd 100644 --- a/common/workunit/wujobq.cpp +++ b/common/workunit/wujobq.cpp @@ -1742,33 +1742,23 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue return initiateconv.getClear(); } - IConversation *acceptConversation(IJobQueueItem *&retitem, unsigned prioritytransitiondelay,IDynamicPriority *maxp) + IConversation *acceptConversation(IJobQueueItem *&retitem, unsigned prioritytransitiondelay) { CriticalBlock block(crit); retitem = NULL; assertex(connected); // must be connected - int prevmp = maxp?maxp->get():0; - int curmp = prevmp; for (;;) { bool timedout = false; Owned item; { CriticalUnblock unblock(crit); // this is a bit complicated with multi-thor - if (prioritytransitiondelay||maxp) + if (prioritytransitiondelay) { - int itemmax = (std::max(prevmp,curmp)/10)*10; // round down to multiple of 10 - unsigned timeout = prioritytransitiondelay?prioritytransitiondelay:60000; - // if dynamic priority check every minute - bool usePrevPrio = prioritytransitiondelay>0; - item.setown(dodequeue(itemmax, timeout, usePrevPrio, &timedout)); - - //If not waiting for a transition, then update the minimum priroty that is acceptable - if (!prioritytransitiondelay) - { - prevmp = curmp; // using max above is a bit devious to allow transition - curmp = maxp->get(); - } + int minprio = 0; + unsigned timeout = prioritytransitiondelay; + bool usePrevPrio = true; + item.setown(dodequeue(minprio, timeout, usePrevPrio, &timedout)); } else item.setown(dequeue(INFINITE)); diff --git a/common/workunit/wujobq.hpp b/common/workunit/wujobq.hpp index 15f3b426106..c3209904a50 100644 --- a/common/workunit/wujobq.hpp +++ b/common/workunit/wujobq.hpp @@ -31,18 +31,12 @@ * * When an item is dequeued, the head of the queue is removed. * - * There is an option in acceptConversatin(), currently used in thor, to say wait for up to 30 seconds if the priority at the header of the queue is lower + * There is an option in acceptConversation(), currently used in thor, to wait for up to 30 seconds if the priority at the header of the queue is lower * than the last item dequeued from that queue. I think this is to ensure that high priority workunits get precedence even if there - * are short pauses between graphs. However, this will prevent all thor instances for the same queue from dequeuing for that period. + * are short pauses between graphs. However, this will prevent all thor instances for the same queue from dequeuing for that period. * * NOTE: If this logic is included I think this should really be the priority of the last item THIS thor dequeued. * - * After 30 seconds has passed it will only dequeue items with a higher priority than the maximum priority currently being processed - * by a thor instance reading from that queue. I think that means that if there is a thor instance processing a high priority workunit - * no lower priority workunits will be selected. - * - * NOTE: I don't understand this logic. - * * * We want to add the following semantics: * @@ -67,7 +61,7 @@ interface IJobQueueItem: extends serializable virtual unsigned getPort()=0; // conversation port (not used for DFU server) virtual bool equals(IJobQueueItem *other)=0; - virtual void setPriority(int priority) [[deprecated]]=0; + virtual void setPriority(int priority)=0; virtual void setOwner(const char *owner)=0; virtual void setEndpoint(const SocketEndpoint &ep)=0; virtual void setPort(unsigned)=0; @@ -92,11 +86,6 @@ class WORKUNIT_API CJobQueueContents: public IArrayOf IJobQueueIterator *getIterator(); // only valid during lifetime of CJobQueueContents }; -interface IDynamicPriority -{ - virtual int get()=0; -}; - interface IJobQueueConst: extends IInterface { virtual unsigned ordinality()=0; // number of items on queue @@ -160,7 +149,7 @@ interface IJobQueue: extends IJobQueueConst // conversations: virtual IConversation *initiateConversation(IJobQueueItem *item)=0; // does enqueue - take ownership of item - virtual IConversation *acceptConversation(IJobQueueItem *&item,unsigned prioritytransitiondelay=0,IDynamicPriority *maxp=NULL)=0; + virtual IConversation *acceptConversation(IJobQueueItem *&item,unsigned prioritytransitiondelay=0)=0; // does dequeue - returns queue item dequeued virtual void cancelInitiateConversation()=0; // cancels initiateConversation in progress virtual bool cancelInitiateConversation(const char *wuid)=0; // cancels remote initiate diff --git a/initfiles/componentfiles/configschema/xsd/thor.xsd b/initfiles/componentfiles/configschema/xsd/thor.xsd index c9183448b86..8316e0e91d7 100644 --- a/initfiles/componentfiles/configschema/xsd/thor.xsd +++ b/initfiles/componentfiles/configschema/xsd/thor.xsd @@ -175,8 +175,6 @@ hpcc:tooltip="Port increment between slaves on same node"/> - diff --git a/initfiles/componentfiles/configxml/thor.xsd.in b/initfiles/componentfiles/configxml/thor.xsd.in index 696e0985b5f..4e882a3e097 100644 --- a/initfiles/componentfiles/configxml/thor.xsd.in +++ b/initfiles/componentfiles/configxml/thor.xsd.in @@ -491,13 +491,6 @@ - - - - If set true, prevents lower priority jobs starting on a multithor - - - diff --git a/thorlcr/master/thgraphmanager.cpp b/thorlcr/master/thgraphmanager.cpp index 475432c45dd..ed2bd7d7f1c 100644 --- a/thorlcr/master/thgraphmanager.cpp +++ b/thorlcr/master/thgraphmanager.cpp @@ -475,47 +475,6 @@ class CIdleShutdown : public CSimpleInterface, implements IThreaded void stop() { sem.signal(); } }; -static int getRunningMaxPriority(const char *qname) -{ - int maxpriority = 0; // ignore neg - try { - Owned conn = querySDS().connect("/Status/Servers",myProcessSession(),RTM_LOCK_READ,30000); - if (conn.get()) - { - Owned it(conn->queryRoot()->getElements("Server")); - ForEach(*it) { - StringBuffer instance; - if(it->query().hasProp("@queue")) - { - const char* queue=it->query().queryProp("@queue"); - if(queue&&(strcmp(queue,qname)==0)) { - Owned wuids = it->query().getElements("WorkUnit"); - ForEach(*wuids) { - IPropertyTree &wu = wuids->query(); - const char* wuid=wu.queryProp(NULL); - if (wuid&&*wuid) { - Owned factory = getWorkUnitFactory(); - Owned workunit = factory->openWorkUnit(wuid); - if (workunit) { - int priority = workunit->getPriorityValue(); - if (priority>maxpriority) - maxpriority = priority; - } - } - } - } - } - } - } - } - catch (IException *e) - { - IERRLOG(e,"getRunningMaxPriority"); - e->Release(); - } - return maxpriority; -} - bool CJobManager::fireException(IException *e) { IArrayOf jobList; @@ -596,23 +555,6 @@ void CJobManager::run() #endif jobq.setown(createJobQueue(queueName.get())); - struct cdynprio: public IDynamicPriority - { - const char *qn; - int get() - { - int p = getRunningMaxPriority(qn); - if (p) - PROGLOG("Dynamic Min priority = %d",p); - return p; - } - } *dp = NULL; - - if (globals->getPropBool("@multiThorPriorityLock")) { - PROGLOG("multiThorPriorityLock enabled"); - dp = new cdynprio; - dp->qn = queueName.get(); - } PROGLOG("verifying mp connection to all slaves"); Owned mpServer = getMPServer(); @@ -666,13 +608,8 @@ void CJobManager::run() { if (exclusiveLockName.length()) { - if (globals->getPropBool("@multiThorPriorityLock")) - FLLOG(MCoperatorWarning, "multiThorPriorityLock cannot be used in conjunction with multiThorExclusionLockName"); - else - { - PROGLOG("Multi-Thor exclusive lock defined: %s", exclusiveLockName.str()); - exclLockDaliMutex.setown(createDaliMutex(exclusiveLockName.str())); - } + PROGLOG("Multi-Thor exclusive lock defined: %s", exclusiveLockName.str()); + exclLockDaliMutex.setown(createDaliMutex(exclusiveLockName.str())); } } bool jobQConnected = false; @@ -792,7 +729,7 @@ void CJobManager::run() jobQConnected = true; } IJobQueueItem *_item; - conversation.setown(jobq->acceptConversation(_item,30*1000,dp)); // 30s priority transition delay + conversation.setown(jobq->acceptConversation(_item,30*1000)); // 30s priority transition delay item.setown(_item); } } @@ -875,7 +812,6 @@ void CJobManager::run() // reset for next job setProcessAborted(false); } - delete dp; jobq.clear(); }