Skip to content

Commit

Permalink
Remove multiThorPriorityLock semantics from queueing
Browse files Browse the repository at this point in the history
Signed-off-by: Gavin Halliday <[email protected]>
  • Loading branch information
ghalliday committed Nov 5, 2024
1 parent 31dc9b3 commit eb522a1
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 107 deletions.
22 changes: 6 additions & 16 deletions common/workunit/wujobq.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1742,33 +1742,23 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue
return initiateconv.getClear();
}

IConversation *acceptConversation(IJobQueueItem *&retitem, unsigned prioritytransitiondelay,IDynamicPriority *maxp)
IConversation *acceptConversation(IJobQueueItem *&retitem, unsigned prioritytransitiondelay)
{
CriticalBlock block(crit);
retitem = NULL;
assertex(connected); // must be connected
int prevmp = maxp?maxp->get():0;
int curmp = prevmp;
for (;;) {
bool timedout = false;
Owned<IJobQueueItem> item;
{
CriticalUnblock unblock(crit);
// this is a bit complicated with multi-thor
if (prioritytransitiondelay||maxp)
if (prioritytransitiondelay)
{
int itemmax = (std::max(prevmp,curmp)/10)*10; // round down to multiple of 10
unsigned timeout = prioritytransitiondelay?prioritytransitiondelay:60000;
// if dynamic priority check every minute
bool usePrevPrio = prioritytransitiondelay>0;
item.setown(dodequeue(itemmax, timeout, usePrevPrio, &timedout));

//If not waiting for a transition, then update the minimum priroty that is acceptable
if (!prioritytransitiondelay)
{
prevmp = curmp; // using max above is a bit devious to allow transition
curmp = maxp->get();
}
int minprio = 0;
unsigned timeout = prioritytransitiondelay;
bool usePrevPrio = true;
item.setown(dodequeue(minprio, timeout, usePrevPrio, &timedout));
}
else
item.setown(dequeue(INFINITE));
Expand Down
19 changes: 4 additions & 15 deletions common/workunit/wujobq.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,12 @@
*
* When an item is dequeued, the head of the queue is removed.
*
* There is an option in acceptConversatin(), currently used in thor, to say wait for up to 30 seconds if the priority at the header of the queue is lower
* There is an option in acceptConversation(), currently used in thor, to wait for up to 30 seconds if the priority at the header of the queue is lower
* than the last item dequeued from that queue. I think this is to ensure that high priority workunits get precedence even if there
* are short pauses between graphs. However, this will prevent all thor instances for the same queue from dequeuing for that period.
* are short pauses between graphs. However, this will prevent all thor instances for the same queue from dequeuing for that period.
*
* NOTE: If this logic is included I think this should really be the priority of the last item THIS thor dequeued.
*
* After 30 seconds has passed it will only dequeue items with a higher priority than the maximum priority currently being processed
* by a thor instance reading from that queue. I think that means that if there is a thor instance processing a high priority workunit
* no lower priority workunits will be selected.
*
* NOTE: I don't understand this logic.
*
*
* We want to add the following semantics:
*
Expand All @@ -67,7 +61,7 @@ interface IJobQueueItem: extends serializable
virtual unsigned getPort()=0; // conversation port (not used for DFU server)
virtual bool equals(IJobQueueItem *other)=0;

virtual void setPriority(int priority) [[deprecated]]=0;
virtual void setPriority(int priority)=0;
virtual void setOwner(const char *owner)=0;
virtual void setEndpoint(const SocketEndpoint &ep)=0;
virtual void setPort(unsigned)=0;
Expand All @@ -92,11 +86,6 @@ class WORKUNIT_API CJobQueueContents: public IArrayOf<IJobQueueItem>
IJobQueueIterator *getIterator(); // only valid during lifetime of CJobQueueContents
};

interface IDynamicPriority
{
virtual int get()=0;
};

interface IJobQueueConst: extends IInterface
{
virtual unsigned ordinality()=0; // number of items on queue
Expand Down Expand Up @@ -160,7 +149,7 @@ interface IJobQueue: extends IJobQueueConst

// conversations:
virtual IConversation *initiateConversation(IJobQueueItem *item)=0; // does enqueue - take ownership of item
virtual IConversation *acceptConversation(IJobQueueItem *&item,unsigned prioritytransitiondelay=0,IDynamicPriority *maxp=NULL)=0;
virtual IConversation *acceptConversation(IJobQueueItem *&item,unsigned prioritytransitiondelay=0)=0;
// does dequeue - returns queue item dequeued
virtual void cancelInitiateConversation()=0; // cancels initiateConversation in progress
virtual bool cancelInitiateConversation(const char *wuid)=0; // cancels remote initiate
Expand Down
2 changes: 0 additions & 2 deletions initfiles/componentfiles/configschema/xsd/thor.xsd
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,6 @@
hpcc:tooltip="Port increment between slaves on same node"/>
<xs:attribute name="multiThorMemoryThreshold" type="xs:nonNegativeInteger" hpcc:displayName="Multi Thor Memory Threshold(MB)"
hpcc:tooltip="Memory usage (in MB) beneath which multiple Thors will run in parallel. Leave blank if no limit"/>
<xs:attribute name="multiThorPriorityLock" type="xs:boolean" hpcc:displayName="Mult Thor Priority Lock" hpcc:presetValue="false"
hpcc:tooltip="If set true, prevents lower priority jobs starting on a multithor"/>
<xs:attribute name="multiThorExclusionLockName" type="xs:string" hpcc:displayName="Multi Thor Exclusion Lock Name"
hpcc:tooltip="Prevents other thors (on any queue) sharing the same multiThorExclusionLockName name from running jobs at the same time"/>
<!--todo seems this needs to be true for multinode, what does that mean? (see old xsd and search for autogendefaultformultinode) -->
Expand Down
7 changes: 0 additions & 7 deletions initfiles/componentfiles/configxml/thor.xsd.in
Original file line number Diff line number Diff line change
Expand Up @@ -491,13 +491,6 @@
</xs:appinfo>
</xs:annotation>
</xs:attribute>
<xs:attribute name="multiThorPriorityLock" type="xs:boolean" use="optional" default="false">
<xs:annotation>
<xs:appinfo>
<tooltip>If set true, prevents lower priority jobs starting on a multithor</tooltip>
</xs:appinfo>
</xs:annotation>
</xs:attribute>
<xs:attribute name="multiThorExclusionLockName" type="xs:string" use="optional">
<xs:annotation>
<xs:appinfo>
Expand Down
70 changes: 3 additions & 67 deletions thorlcr/master/thgraphmanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -475,47 +475,6 @@ class CIdleShutdown : public CSimpleInterface, implements IThreaded
void stop() { sem.signal(); }
};

static int getRunningMaxPriority(const char *qname)
{
int maxpriority = 0; // ignore neg
try {
Owned<IRemoteConnection> conn = querySDS().connect("/Status/Servers",myProcessSession(),RTM_LOCK_READ,30000);
if (conn.get())
{
Owned<IPropertyTreeIterator> it(conn->queryRoot()->getElements("Server"));
ForEach(*it) {
StringBuffer instance;
if(it->query().hasProp("@queue"))
{
const char* queue=it->query().queryProp("@queue");
if(queue&&(strcmp(queue,qname)==0)) {
Owned<IPropertyTreeIterator> wuids = it->query().getElements("WorkUnit");
ForEach(*wuids) {
IPropertyTree &wu = wuids->query();
const char* wuid=wu.queryProp(NULL);
if (wuid&&*wuid) {
Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
Owned<IConstWorkUnit> workunit = factory->openWorkUnit(wuid);
if (workunit) {
int priority = workunit->getPriorityValue();
if (priority>maxpriority)
maxpriority = priority;
}
}
}
}
}
}
}
}
catch (IException *e)
{
IERRLOG(e,"getRunningMaxPriority");
e->Release();
}
return maxpriority;
}

bool CJobManager::fireException(IException *e)
{
IArrayOf<CJobMaster> jobList;
Expand Down Expand Up @@ -596,23 +555,6 @@ void CJobManager::run()
#endif

jobq.setown(createJobQueue(queueName.get()));
struct cdynprio: public IDynamicPriority
{
const char *qn;
int get()
{
int p = getRunningMaxPriority(qn);
if (p)
PROGLOG("Dynamic Min priority = %d",p);
return p;
}
} *dp = NULL;

if (globals->getPropBool("@multiThorPriorityLock")) {
PROGLOG("multiThorPriorityLock enabled");
dp = new cdynprio;
dp->qn = queueName.get();
}

PROGLOG("verifying mp connection to all slaves");
Owned<IMPServer> mpServer = getMPServer();
Expand Down Expand Up @@ -666,13 +608,8 @@ void CJobManager::run()
{
if (exclusiveLockName.length())
{
if (globals->getPropBool("@multiThorPriorityLock"))
FLLOG(MCoperatorWarning, "multiThorPriorityLock cannot be used in conjunction with multiThorExclusionLockName");
else
{
PROGLOG("Multi-Thor exclusive lock defined: %s", exclusiveLockName.str());
exclLockDaliMutex.setown(createDaliMutex(exclusiveLockName.str()));
}
PROGLOG("Multi-Thor exclusive lock defined: %s", exclusiveLockName.str());
exclLockDaliMutex.setown(createDaliMutex(exclusiveLockName.str()));
}
}
bool jobQConnected = false;
Expand Down Expand Up @@ -792,7 +729,7 @@ void CJobManager::run()
jobQConnected = true;
}
IJobQueueItem *_item;
conversation.setown(jobq->acceptConversation(_item,30*1000,dp)); // 30s priority transition delay
conversation.setown(jobq->acceptConversation(_item,30*1000)); // 30s priority transition delay
item.setown(_item);
}
}
Expand Down Expand Up @@ -875,7 +812,6 @@ void CJobManager::run()
// reset for next job
setProcessAborted(false);
}
delete dp;
jobq.clear();
}

Expand Down

0 comments on commit eb522a1

Please sign in to comment.