Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding "keep_lock_until_timeout" param #133

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ Once installed, you'll need to configure a few options a ``ONCE`` key in celery'
'backend': 'celery_once.backends.Redis',
'settings': {
'url': 'redis://localhost:6379/0',
'default_timeout': 60 * 60
'default_timeout': 60 * 60,
'keep_lock_until_timeout': False
}
}

Expand Down Expand Up @@ -176,7 +177,24 @@ By default, the lock is removed after the task has executed (using celery's `aft
return "Done!"


``keep_lock_until_timeout``
-----------
By default, when the lock is removed at the end of the task of at the end of the timeout, a new similar task can be pushed in the queue.
If you set ``'keep_lock_until_timeout' : True`` in the conf, no matter if the task has ended or not, you can't push the same task until the time if over the timeout.

.. code:: python

celery.conf.ONCE = {
'backend': 'celery_once.backends.Redis',
'settings': {
'url': 'redis://localhost:6379/0',
'default_timeout': 60 * 60,
'keep_lock_until_timeout': True
}
}




Backends
========
Expand Down
17 changes: 8 additions & 9 deletions celery_once/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,18 @@ class QueueOnce(Task):
abstract = True
once = {
'graceful': False,
'unlock_before_run': False
'unlock_before_run': False,
'keep_lock_until_timeout': False
}

"""
'There can be only one'. - Highlander (1986)

An abstract tasks with the ability to detect if it has already been queued.
When running the task (through .delay/.apply_async) it checks if the tasks
is not already queued. By default it will raise an
an AlreadyQueued exception if it is, by you can silence this by including
`once={'graceful': True}` in apply_async or in the task's settings.

Example:

>>> from celery_queue.tasks import QueueOnce
>>> from celery import task
>>> @task(base=QueueOnce, once={'graceful': True})
Expand All @@ -61,6 +59,9 @@ def default_timeout(self):

def unlock_before_run(self):
return self.once.get('unlock_before_run', False)

def keep_lock_until_timeout(self):
return self.once_config['settings'].get('keep_lock_until_timeout', False)

def __init__(self, *args, **kwargs):
self._signature = signature(self.run)
Expand All @@ -78,7 +79,6 @@ def apply_async(self, args=None, kwargs=None, **options):
"""
Attempts to queues a task.
Will raises an AlreadyQueued exception if already queued.

:param \*args: positional arguments passed on to the task.
:param \*\*kwargs: keyword arguments passed on to the task.
:keyword \*\*once: (optional)
Expand All @@ -89,14 +89,13 @@ def apply_async(self, args=None, kwargs=None, **options):
An `int' number of seconds after which the lock will expire.
If not set, defaults to 1 hour.
:param: keys: (optional)

"""
once_options = options.get('once', {})
once_graceful = once_options.get(
'graceful', self.once.get('graceful', False))
once_timeout = once_options.get(
'timeout', self.once.get('timeout', self.default_timeout))

if not options.get('retries'):
key = self.get_key(args, kwargs)
try:
Expand Down Expand Up @@ -132,10 +131,10 @@ def get_key(self, args=None, kwargs=None):
def after_return(self, status, retval, task_id, args, kwargs, einfo):
"""
After a task has run (both successfully or with a failure) clear the
lock if "unlock_before_run" is False.
lock if "unlock_before_run" and "keep_lock_until_timeout" are False.
"""
# Only clear the lock after the task's execution if the
# "unlock_before_run" option is False
if not self.unlock_before_run():
if not self.unlock_before_run() and not self.keep_lock_until_timeout():
key = self.get_key(args, kwargs)
self.once_backend.clear_lock(key)