diff --git a/common/workunit/wujobq.cpp b/common/workunit/wujobq.cpp index 71a40ebd882..e5346670cc2 100644 --- a/common/workunit/wujobq.cpp +++ b/common/workunit/wujobq.cpp @@ -1747,21 +1747,27 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue CriticalBlock block(crit); retitem = NULL; assertex(connected); // must be connected - int curmp = maxp?maxp->get():0; - int nextmp = curmp; + int prevmp = maxp?maxp->get():0; + int curmp = prevmp; for (;;) { bool timedout = false; Owned item; { CriticalUnblock unblock(crit); // this is a bit complicated with multi-thor - if (prioritytransitiondelay||maxp) { - item.setown(dodequeue((std::max(curmp,nextmp)/10)*10, // round down to multiple of 10 - prioritytransitiondelay?prioritytransitiondelay:60000,prioritytransitiondelay>0,&timedout)); + if (prioritytransitiondelay||maxp) + { + int itemmax = (std::max(prevmp,curmp)/10)*10; // round down to multiple of 10 + unsigned timeout = prioritytransitiondelay?prioritytransitiondelay:60000; // if dynamic priority check every minute - if (!prioritytransitiondelay) { - curmp = nextmp; // using max above is a bit devious to allow transition - nextmp = maxp->get(); + bool usePrevPrio = prioritytransitiondelay>0; + item.setown(dodequeue(itemmax, timeout, usePrevPrio, &timedout)); + + //If not waiting for a transition, then update the minimum priroty that is acceptable + if (!prioritytransitiondelay) + { + prevmp = curmp; // using max above is a bit devious to allow transition + curmp = maxp->get(); } } else diff --git a/common/workunit/wujobq.hpp b/common/workunit/wujobq.hpp index efa74462aaf..15f3b426106 100644 --- a/common/workunit/wujobq.hpp +++ b/common/workunit/wujobq.hpp @@ -30,9 +30,30 @@ * If a position is given, then the priority may be adjusted to ensure it is consistent with the items before it and after it in the queue * * When an item is dequeued, the head of the queue is removed. - * - There is an option, currently used in thor, to say wait for up to 30 seconds if the priority at the header of the queue is lower - * than the last item dequeued from that queue. I think this is to ensure that high priority workunits get precedence even if there - * are short pauses between graphs. However, this will prevent all thor instances from dequeuing for that period. + * + * There is an option in acceptConversatin(), currently used in thor, to say wait for up to 30 seconds if the priority at the header of the queue is lower + * than the last item dequeued from that queue. I think this is to ensure that high priority workunits get precedence even if there + * are short pauses between graphs. However, this will prevent all thor instances for the same queue from dequeuing for that period. + * + * NOTE: If this logic is included I think this should really be the priority of the last item THIS thor dequeued. + * + * After 30 seconds has passed it will only dequeue items with a higher priority than the maximum priority currently being processed + * by a thor instance reading from that queue. I think that means that if there is a thor instance processing a high priority workunit + * no lower priority workunits will be selected. + * + * NOTE: I don't understand this logic. + * + * + * We want to add the following semantics: + * + * When a server requests to dequeue an item it can pass a worker priority. + * - If there is an item on the queue, then dequeue it + * - Otherwise record the worker priority in the Client information. (If there are multiple threads do they have to have the same priority??) + * - When an item is received, the WAITING worker with the highest priority gets to process it. + * + * Problems: + * Multiple threads from a single client + * Ensuring there are no race conditions * */ interface IJobQueueItem: extends serializable