diff --git a/common/workunit/wujobq.cpp b/common/workunit/wujobq.cpp index 6ac786c5608..e97543f933a 100644 --- a/common/workunit/wujobq.cpp +++ b/common/workunit/wujobq.cpp @@ -179,18 +179,6 @@ class CJobQueueItem: implements IJobQueueItem, public CInterface enqueuedt.setString(dts.str()); } - - - IJobQueueItem* clone() - { - IJobQueueItem* ret = new CJobQueueItem(wu); - ret->setPriority(priority); - ret->setPriority(port); - ret->setEndpoint(ep); - ret->setSessionId(sessid); - return ret; - } - void setPriority(int _priority) { priority = _priority; @@ -1445,24 +1433,6 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue return dotake(qd,wuid,false); } - unsigned takeItems(sQueueData &qd,CJobQueueContents &dest) - { - Cconnlockblock block(this,true); - unsigned ret = copyItemsImpl(qd,dest); - clear(qd); - return ret; - } - - void enqueueItems(sQueueData &qd,CJobQueueContents &items) - { - unsigned n=items.ordinality(); - if (n) { - Cconnlockblock block(this,true); - for (unsigned i=0;inext) - return takeItems(*qdata,dest); - Cconnlockblock block(this,true); - unsigned ret = 0; - ForEachQueue(qd) { - ret += copyItemsImpl(*qd,dest); - clear(*qd); - } - return ret; - } - void enqueueItems(CJobQueueContents &items) - { // enqueues to firs sub-queue (not sure that useful) - assertex(qdata); - return enqueueItems(*qdata,items); - } void clear() { diff --git a/common/workunit/wujobq.hpp b/common/workunit/wujobq.hpp index 9435f380c89..5a3cc35f095 100644 --- a/common/workunit/wujobq.hpp +++ b/common/workunit/wujobq.hpp @@ -22,6 +22,19 @@ #include "jsocket.hpp" #include "dasess.hpp" +/* + * The job queues have the following semantics. + * + * Items are queued with a given priority, at a given offset + * If no position is given, then the insertion position in the queue is determined by finding the first item with a lower priority + * If a position is given, then the priority may be adjusted to ensure it is consistent with the items before it and after it in the queue + * + * When an item is dequeued, the head of the queue is removed. + * - There is an option, currently used in thor, to say wait for up to 30 seconds if the priority at the header of the queue is lower + * than the last item dequeued from that queue. I think this is to ensure that high priority workunits get precedence even if there + * are short pauses between graphs. However, this will prevent all thor instances from dequeuing for that period. + * + */ interface IJobQueueItem: extends serializable { virtual const char *queryWUID()=0; @@ -32,9 +45,8 @@ interface IJobQueueItem: extends serializable virtual unsigned getPort()=0; // conversation port (not used for DFU server) virtual bool equals(IJobQueueItem *other)=0; - virtual IJobQueueItem* clone()=0; - - virtual void setPriority(int priority)=0; + + virtual void setPriority(int priority) [[deprecated]]=0; virtual void setOwner(const char *owner)=0; virtual void setEndpoint(const SocketEndpoint &ep)=0; virtual void setPort(unsigned)=0; @@ -106,8 +118,6 @@ interface IJobQueue: extends IJobQueueConst //manipulation virtual IJobQueueItem *take(const char *wuid)=0; // finds and removes - virtual unsigned takeItems(CJobQueueContents &dest)=0; // takes items and clears queue - virtual void enqueueItems(CJobQueueContents &items)=0; // enqueues to first sub-queue virtual bool moveBefore(const char *wuid,const char *nextwuid)=0; virtual bool moveAfter(const char *wuid,const char *prevwuid)=0; virtual bool moveToHead(const char *wuid)=0;