Skip to content

Commit

Permalink
Grjones use setnx for locking (#67)
Browse files Browse the repository at this point in the history
* Use setnx-based lock to avoid race conditions
  • Loading branch information
cameronmaske authored Apr 26, 2018
1 parent 09cb6ff commit 17b8114
Show file tree
Hide file tree
Showing 8 changed files with 100 additions and 36 deletions.
21 changes: 21 additions & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,27 @@
History
=======

2.0.0
-----

2018-04-25

Major Release:

This changes the Redis backend to use a SETNX-based lock (RedLock). This should address race conditions that the previous approach had (See: #7, #60).

*This may not be backwards compatible with existing keys stored in Redis.*
If you are upgrading from `1.0.0`, it may be safer to remove any previous used lock keys (See https://github.com/cameronmaske/celery-once/pull/67#issuecomment-384281438 for instructions).

Other changes include:

- Able to run on blocking mode when scheduling tasks with Redis backend. See the README for more details.

- ``AlreadyQueued`` exception return's countdown seconds as `float` instead of `int`.

Big thanks to @grjones for his contributions for this patch.


1.3.0
-----

Expand Down
24 changes: 22 additions & 2 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,9 @@ Backends
Redis Backend
-------------


Requires:

* `Redis <http://redis.io/>`_ is used as a distributed locking mechanism.
* `Redis <http://redis.io/>`_ is used as a distributed locking mechanism. Behind the scenes, it use Redlock. This `page <https://redis.io/topics/distlock>`_ has more infomation about the locking guarantees.

Configuration:

Expand All @@ -199,6 +198,10 @@ Configuration:

- ``url`` - should point towards a running Redis instance (defaults to ``redis://localhost:6379/0``). See below for the format options supported

- ``blocking`` (boolean value: default ``False``) - If set to ``True``, scheduling a task (by ``.delay/.apply_async``) will block for X seconds to acquire the lock (see: ``blocking_timeout`` below). If no lock could be acquired after X seconds, will raise an ``AlreadyQueued`` exception. This is a very specific use-case scenario and by default is disabled.

- ``blocking_timeout`` (int or float value: default ``1``) - How many seconds the task will block trying to aquire the lock, if ``blocking`` is set to ``True``. Setting this to ``None`` set's no timeout (equivalent to infinite seconds).



The URL parser supports two patterns of urls:
Expand All @@ -216,6 +219,8 @@ The URL parser supports two patterns of urls:

Example Configuration:

Minimal:

.. code:: python
celery.conf.ONCE = {
Expand All @@ -227,6 +232,21 @@ Example Configuration:
}
Advanced:
Scheduling tasks blocks up to 30 seconds trying to acquire a lock before raising an exception.

.. code:: python
celery.conf.ONCE = {
'backend': 'celery_once.backends.Redis',
'settings': {
'url': 'redis://localhost:6379/0',
'default_timeout': 60 * 60,
'blocking': True,
'blocking_timeout': 30
}
}
Custom Backend
--------------

Expand Down
2 changes: 1 addition & 1 deletion celery_once/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

__author__ = 'Cameron Maske'
__email__ = '[email protected]'
__version__ = '1.3.0'
__version__ = '2.0.0'


from .tasks import QueueOnce, AlreadyQueued
40 changes: 25 additions & 15 deletions celery_once/backends/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
# Python 3!
from urllib.parse import urlparse, parse_qsl

from celery_once.helpers import now_unix
from celery_once.tasks import AlreadyQueued


Expand Down Expand Up @@ -55,6 +54,13 @@ def parse_url(url):

redis = None

try:
from redis.lock import Lock
except ImportError:
raise ImportError(
"You need to install the redis library in order to use Redis"
" backend (pip install redis)")


def get_redis(settings):
global redis
Expand All @@ -70,10 +76,12 @@ def get_redis(settings):


class Redis(object):
"""Redis backend."""
"""Redis locking backend."""

def __init__(self, settings):
self._redis = get_redis(settings)
self.blocking_timeout = settings.get("blocking_timeout", 1)
self.blocking = settings.get("blocking", False)

@property
def redis(self):
Expand All @@ -83,20 +91,22 @@ def redis(self):
def raise_or_lock(self, key, timeout):
"""
Checks if the task is locked and raises an exception, else locks
the task.
the task. By default, the tasks and the key expire after 60 minutes.
(meaning it will not be executed and the lock will clear).
"""
now = now_unix()
# Check if the tasks is already queued if key is in cache.
result = self.redis.get(key)
if result:
# Work out how many seconds remaining till the task timeout.
remaining = int(result) - now
if remaining > 0:
raise AlreadyQueued(remaining)

# By default, the tasks and the key expire after 60 minutes.
# (meaning it will not be executed and the lock will clear).
self.redis.setex(key, timeout, now + timeout)
acquired = Lock(
self.redis,
key,
timeout=timeout,
blocking=self.blocking,
blocking_timeout=self.blocking_timeout
).acquire()

if not acquired:
# Time remaining in milliseconds
# https://redis.io/commands/pttl
ttl = self.redis.pttl(key)
raise AlreadyQueued(ttl / 1000.)

def clear_lock(self, key):
"""Remove the lock from redis."""
Expand Down
2 changes: 1 addition & 1 deletion requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@ pytest==2.9.2
pytest-cov==1.8.1
pytest-mock==1.6.0
python-coveralls==2.4.3
fakeredis==0.5.1
fakeredis==0.9.0
mock==1.0.1
freezegun==0.2.8
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

requirements = [
"celery",
"redis"
"redis>=2.10.2"
]

__version__ = ''
Expand Down
24 changes: 17 additions & 7 deletions tests/integration/backends/test_redis.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import pytest
from freezegun import freeze_time
import time
from fakeredis import FakeStrictRedis

from celery import Celery
from celery_once import QueueOnce, AlreadyQueued
from redis.lock import Lock as RedisLock


@pytest.fixture()
Expand Down Expand Up @@ -51,11 +52,15 @@ def test_delay_already_queued(redis):
pass


@freeze_time("2012-01-14") # Time since epoch = 1326499200
def test_delay_expired(redis):
# Fallback, key should of been timed out.
redis.set("qo_example_a-1", 1326499200 - 60 * 60)
lock = RedisLock(redis, "qo_example_a-1", timeout=1)
lock.acquire()

assert redis.get("qo_example_a-1") is not None

time.sleep(1)
example.delay(1)

assert redis.get("qo_example_a-1") is None


Expand All @@ -79,8 +84,13 @@ def test_already_queued_graceful(redis):
assert result.result is None


@freeze_time("2012-01-14") # Time since epoch = 1326499200
def test_apply_async_expired(redis):
# Fallback, key should of been timed out.
redis.set("qo_example_a-1", 1326499200 - 60 * 60)
lock = RedisLock(redis, "qo_example_a-1", timeout=1)
lock.acquire()

assert redis.get("qo_example_a-1") is not None

time.sleep(1)
example.apply_async(args=(1, ))

assert redis.get("qo_example_a-1") is None
21 changes: 12 additions & 9 deletions tests/unit/backends/test_redis.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import pytest
from freezegun import freeze_time
import time
from fakeredis import FakeStrictRedis

from celery_once.backends.redis import parse_url, Redis
from celery_once.tasks import AlreadyQueued
from redis.lock import Lock as RedisLock


def test_parse_redis_details_tcp_default_args():
Expand Down Expand Up @@ -59,27 +60,29 @@ def backend():
return backend


@freeze_time("2012-01-14") # Time since epoch = 1326499200
def test_redis_raise_or_lock(redis, backend):
assert redis.get("test") is None
backend.raise_or_lock(key="test", timeout=60)
assert redis.get("test") is not None


@freeze_time("2012-01-14") # Time since epoch = 1326499200
def test_redis_raise_or_lock_locked(redis, backend):
# Set to expire in 30 seconds!
redis.set("test", 1326499200 + 30)
lock = RedisLock(redis, "test", timeout=30)
lock.acquire()

with pytest.raises(AlreadyQueued) as e:
backend.raise_or_lock(key="test", timeout=60)
assert e.value.countdown == 30
assert e.value.message == "Expires in 30 seconds"

assert e.value.countdown == 30.0
assert e.value.message == "Expires in 30.0 seconds"


@freeze_time("2012-01-14") # Time since epoch = 1326499200
def test_redis_raise_or_lock_locked_and_expired(redis, backend):
# Set to have expired 30 ago seconds!
redis.set("test", 1326499200 - 30)
lock = RedisLock(redis, "test", timeout=1)
lock.acquire()
time.sleep(1) # wait for lock to expire

backend.raise_or_lock(key="test", timeout=60)
assert redis.get("test") is not None

Expand Down

0 comments on commit 17b8114

Please sign in to comment.