Skip to content

Commit

Permalink
Fix race condition
Browse files Browse the repository at this point in the history
Signed-off-by: Gavin Halliday <[email protected]>
  • Loading branch information
ghalliday committed Nov 19, 2024
1 parent 5e124d3 commit dde7ce4
Showing 1 changed file with 18 additions and 20 deletions.
38 changes: 18 additions & 20 deletions common/workunit/wujobq.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,6 @@ class CJobQueueBase: implements IJobQueueConst, public CInterface
}
public:
sQueueData *qdata;
Semaphore notifysem;
CriticalSection crit;

IMPLEMENT_IINTERFACE;
Expand Down Expand Up @@ -801,32 +800,31 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue
bool cancelwaiting = false;
bool validateitemsessions = false;

class csubs: implements ISDSSubscription, public CInterface
class QueueChangeSubscription : implements ISDSSubscription, public CInterface
{
CJobQueue *parent;
public:
//If this semaphone is in the CJobQueue class then there is a race condition
//A callback may be at this point while the CJobQueue is deleted - causing it to signal
//a deleted semaphore
Semaphore notifysem;
public:
IMPLEMENT_IINTERFACE;
csubs(CJobQueue *_parent)
{
parent = _parent;
}

void notify(SubscriptionId id, const char *xpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData)
{
//There is a race condition - a callback may be at this point while the CJobQueue is deleted.
//Adding a critical section in parent makes it much more likely to be hit.
//Ultimately the semaphore should be moved to this class instead
//CriticalBlock block(parent->crit);
parent->notifysem.signal();
notifysem.signal();
}
};

Owned<csubs> subs;
//This must be an owned pointer, rather than a member, to avoid it being deleted while the notify()
//callback is being called.
Owned<QueueChangeSubscription> notifySubscription;

IMPLEMENT_IINTERFACE_USING(CJobQueueBase);

CJobQueue(const char *_qname) : CJobQueueBase(_qname)
{
subs.setown(new csubs(this));
notifySubscription.setown(new QueueChangeSubscription);
activeq = qdata;
sessionid = myProcessSession();
validateitemsessions = false;
Expand Down Expand Up @@ -1044,7 +1042,7 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue
}
StringBuffer path;
path.appendf("/JobQueues/Queue[@name=\"%s\"]/Edition",qd->qname.get());
qd->subscriberid = querySDS().subscribe(path.str(), *subs, false);
qd->subscriberid = querySDS().subscribe(path.str(), *notifySubscription, false);
}
}

Expand All @@ -1055,7 +1053,7 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue
if (!qd->subscriberid) {
StringBuffer path;
path.appendf("/JobQueues/Queue[@name=\"%s\"]/Edition",qd->qname.get());
qd->subscriberid = querySDS().subscribe(path.str(), *subs, false);
qd->subscriberid = querySDS().subscribe(path.str(), *notifySubscription, false);
}
unsigned e = (unsigned)qd->root->getPropInt("Edition", 1);
if (e!=qd->lastWaitEdition) {
Expand Down Expand Up @@ -1297,7 +1295,7 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue
// check every 5 mins independant of notify (in case subscription lost for some reason)
if (to>timeout)
to = timeout;
notifysem.wait(to);
notifySubscription->notifysem.wait(to);
if (timeout!=(unsigned)INFINITE) {
t = msTick()-t;
if (t<timeout)
Expand Down Expand Up @@ -1878,7 +1876,7 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue
{
CriticalBlock block(crit);
dequeuestop = true;
notifysem.signal();
notifySubscription->notifysem.signal();
}

bool cancelInitiateConversation(sQueueData &qd,const char *wuid)
Expand Down Expand Up @@ -1915,7 +1913,7 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue
if (haschanged())
return true;
}
if (!notifysem.wait(timeout))
if (!notifySubscription->notifysem.wait(timeout))
break;
}
return false;
Expand All @@ -1924,7 +1922,7 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue
{
CriticalBlock block(crit);
cancelwaiting = true;
notifysem.signal();
notifySubscription->notifysem.signal();
}

virtual void enqueue(IJobQueueItem *qitem)
Expand Down

0 comments on commit dde7ce4

Please sign in to comment.