Skip to content

Commit

Permalink
Rename variables and update comments
Browse files Browse the repository at this point in the history
Signed-off-by: Gavin Halliday <[email protected]>
  • Loading branch information
ghalliday committed Nov 5, 2024
1 parent f466031 commit 31dc9b3
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 11 deletions.
22 changes: 14 additions & 8 deletions common/workunit/wujobq.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1747,21 +1747,27 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue
CriticalBlock block(crit);
retitem = NULL;
assertex(connected); // must be connected
int curmp = maxp?maxp->get():0;
int nextmp = curmp;
int prevmp = maxp?maxp->get():0;
int curmp = prevmp;
for (;;) {
bool timedout = false;
Owned<IJobQueueItem> item;
{
CriticalUnblock unblock(crit);
// this is a bit complicated with multi-thor
if (prioritytransitiondelay||maxp) {
item.setown(dodequeue((std::max(curmp,nextmp)/10)*10, // round down to multiple of 10
prioritytransitiondelay?prioritytransitiondelay:60000,prioritytransitiondelay>0,&timedout));
if (prioritytransitiondelay||maxp)
{
int itemmax = (std::max(prevmp,curmp)/10)*10; // round down to multiple of 10
unsigned timeout = prioritytransitiondelay?prioritytransitiondelay:60000;
// if dynamic priority check every minute
if (!prioritytransitiondelay) {
curmp = nextmp; // using max above is a bit devious to allow transition
nextmp = maxp->get();
bool usePrevPrio = prioritytransitiondelay>0;
item.setown(dodequeue(itemmax, timeout, usePrevPrio, &timedout));

//If not waiting for a transition, then update the minimum priroty that is acceptable
if (!prioritytransitiondelay)
{
prevmp = curmp; // using max above is a bit devious to allow transition
curmp = maxp->get();
}
}
else
Expand Down
27 changes: 24 additions & 3 deletions common/workunit/wujobq.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,30 @@
* If a position is given, then the priority may be adjusted to ensure it is consistent with the items before it and after it in the queue
*
* When an item is dequeued, the head of the queue is removed.
* - There is an option, currently used in thor, to say wait for up to 30 seconds if the priority at the header of the queue is lower
* than the last item dequeued from that queue. I think this is to ensure that high priority workunits get precedence even if there
* are short pauses between graphs. However, this will prevent all thor instances from dequeuing for that period.
*
* There is an option in acceptConversatin(), currently used in thor, to say wait for up to 30 seconds if the priority at the header of the queue is lower
* than the last item dequeued from that queue. I think this is to ensure that high priority workunits get precedence even if there
* are short pauses between graphs. However, this will prevent all thor instances for the same queue from dequeuing for that period.
*
* NOTE: If this logic is included I think this should really be the priority of the last item THIS thor dequeued.
*
* After 30 seconds has passed it will only dequeue items with a higher priority than the maximum priority currently being processed
* by a thor instance reading from that queue. I think that means that if there is a thor instance processing a high priority workunit
* no lower priority workunits will be selected.
*
* NOTE: I don't understand this logic.
*
*
* We want to add the following semantics:
*
* When a server requests to dequeue an item it can pass a worker priority.
* - If there is an item on the queue, then dequeue it
* - Otherwise record the worker priority in the Client information. (If there are multiple threads do they have to have the same priority??)
* - When an item is received, the WAITING worker with the highest priority gets to process it.
*
* Problems:
* Multiple threads from a single client
* Ensuring there are no race conditions
*
*/
interface IJobQueueItem: extends serializable
Expand Down

0 comments on commit 31dc9b3

Please sign in to comment.