Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

queue length for SQLiteQueue is incorrect when running in multiple processes #76

Open
melbaa opened this issue Nov 8, 2018 · 6 comments

Comments

@melbaa
Copy link

melbaa commented Nov 8, 2018

Possibly expected behavior, but I think it's worth reporting, because the queue looks usable otherwise.

The queue size is set only once on queue creation. self.total = self._count(), so if we have a producer in 1 process and a consumer in another process, we end up with size in the negatives.

To reproduce, we need producer and a consumer that's faster than the producer.

# producer process
import persistqueue as Q; q = Q.SQLiteQueue('queue', multithreading=True)
while True: q.put('hi'); time.sleep(0.01)
# consumer process
import persistqueue as Q; q = Q.SQLiteQueue('queue', auto_commit=False, multithreading=True)


while True:
    try:
        q.qsize(), q.get(block=False); q.task_done()
    except persistqueue.exceptions.Empty:
        pass

Calling q._count() returns the correct size, because it hits the DB, of course.

@peter-wangxu
Copy link
Owner

thanks for reporting, i haven't tested the queue in multi-process environment. if you find further issue. let me know, i am happy to add the support .

@ThunderRush
Copy link

Can confirm, this is indeed happened. However, this causes a somewhat annoying/strange issue.
Somehow, using get() doesn't seem to instantly work or cause a delay when you call it on an empty queue.

If I push data into a FIFOSQLiteQueue with one process, the other one takes multiple seconds before actually fetching the values.

@bavaria95
Copy link

bavaria95 commented Dec 10, 2018

@ThunderRush As far as I understand, using get on an empty queue is supposed to wait until it actually can get something. If you don't want this behaviour - you can either pass block=False to the get() or pass timeout parameter with the maximal value you are ready to wait.

@peter-wangxu
Copy link
Owner

The multi-process support was not added in this lib currently, I would be happy if anyone has interest in providing this ability. @melbaa @ThunderRush @bavaria95

@imidoriya
Copy link
Collaborator

Recent updates have added max(0, count) to remove a negative qsize(). That doesn't change the overall issue, but prevents impossible size results. On the Ack Queues, a new active_size() was added which includes the nack cache. It may be anecdotal, but I believe this has produced a more accurate return in my multi-threaded environment as it's calculating when an item is put/ack/ack_failed, and not on put/get/nack. But that's more of a decision on when you think the queue size should be decremented, on get or on completion.

@decatur
Copy link

decatur commented May 28, 2021

Yes, this seems to be a bug. So instead of

    while not queue.empty():
        item = queue.get()
        pprint(item)
        queue.task_done()

use the workaround

    while True:
        try:
            item = queue.get(block=False)
        except persistqueue.exceptions.Empty:
            break
        pprint(item)
        queue.task_done()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

6 participants