Skip to content

Commit

Permalink
FIX: lockfree MPMC queue should not fail to pop/push when queue is no…
Browse files Browse the repository at this point in the history
…t empty/full

Signed-off-by: Coldwings <[email protected]>
  • Loading branch information
Coldwings committed Jun 6, 2024
1 parent 1738a58 commit a14f2a3
Showing 1 changed file with 8 additions and 20 deletions.
28 changes: 8 additions & 20 deletions common/lockfree_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ class LockfreeMPMCRingQueue : public LockfreeRingQueueBase<T, N> {
using Base::empty;
using Base::full;

bool push_weak(const T& x) {
bool push(const T& x) {
auto t = tail.load(std::memory_order_acquire);
for (;;) {
auto& slot = slots[idx(t)];
Expand All @@ -192,15 +192,16 @@ class LockfreeMPMCRingQueue : public LockfreeRingQueueBase<T, N> {
}
} else {
auto const prevTail = t;
auto h = head.load(std::memory_order_acquire);
t = tail.load(std::memory_order_acquire);
if (t == prevTail) {
if (t == prevTail && Base::check_full(h, t)) {
return false;
}
}
}
}

bool pop_weak(T& x) {
bool pop(T& x) {
auto h = head.load(std::memory_order_acquire);
for (;;) {
auto& slot = slots[idx(h)];
Expand All @@ -213,28 +214,15 @@ class LockfreeMPMCRingQueue : public LockfreeRingQueueBase<T, N> {
}
} else {
auto const prevHead = h;
auto t = tail.load(std::memory_order_acquire);
h = head.load(std::memory_order_acquire);
if (h == prevHead) {
if (h == prevHead && Base::check_empty(h, t)) {
return false;
}
}
}
}

bool push(const T& x) {
do {
if (push_weak(x)) return true;
} while (!full());
return false;
}

bool pop(T& x) {
do {
if (pop_weak(x)) return true;
} while (!empty());
return false;
}

template <typename Pause = ThreadPause>
void send(const T& x) {
static_assert(std::is_base_of<PauseBase, Pause>::value,
Expand Down Expand Up @@ -536,8 +524,8 @@ namespace common {
* and load balancing.
* Watch out that `recv` should run in photon environment (because it has to)
* use photon semaphore to be notified that new item has sended. `send` could
* running in photon or std::thread environment (needs to set template `Pause` as
* `ThreadPause`).
* running in photon or std::thread environment (needs to set template `Pause`
* as `ThreadPause`).
*
* @tparam QueueType shoulde be one of LockfreeMPMCRingQueue,
* LockfreeBatchMPMCRingQueue, or LockfreeSPSCRingQueue, with their own template
Expand Down

0 comments on commit a14f2a3

Please sign in to comment.