From dcdcd8302fae66120aa34fe86c608323329d9d53 Mon Sep 17 00:00:00 2001 From: PierreC Date: Wed, 21 Jul 2021 11:51:59 +0200 Subject: [PATCH 1/3] Adding "keep_lock_until_timeout" param "keep_lock_until_timeout" is used to prevent same calls within a time interval defined by "default_timeout" configuration --- celery_once/tasks.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/celery_once/tasks.py b/celery_once/tasks.py index 04ce832..d4cb2d6 100644 --- a/celery_once/tasks.py +++ b/celery_once/tasks.py @@ -21,7 +21,8 @@ class QueueOnce(Task): abstract = True once = { 'graceful': False, - 'unlock_before_run': False + 'unlock_before_run': False, + 'keep_lock_until_timeout': False } """ @@ -61,6 +62,9 @@ def default_timeout(self): def unlock_before_run(self): return self.once.get('unlock_before_run', False) + + def keep_lock_until_timeout(self): + return self.once.get('keep_lock_until_timeout', False) def __init__(self, *args, **kwargs): self._signature = signature(self.run) @@ -132,10 +136,10 @@ def get_key(self, args=None, kwargs=None): def after_return(self, status, retval, task_id, args, kwargs, einfo): """ After a task has run (both successfully or with a failure) clear the - lock if "unlock_before_run" is False. + lock if "unlock_before_run" and "keep_lock_until_timeout" are False. """ # Only clear the lock after the task's execution if the # "unlock_before_run" option is False - if not self.unlock_before_run(): + if not self.unlock_before_run() and not self.keep_lock_until_timeout(): key = self.get_key(args, kwargs) self.once_backend.clear_lock(key) From 2e408b4d16c85a0213b8a0c23872859148ca8a0a Mon Sep 17 00:00:00 2001 From: PierreC Date: Mon, 20 Sep 2021 16:47:24 +0200 Subject: [PATCH 2/3] Fix : Now working correctly --- celery_once/tasks.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/celery_once/tasks.py b/celery_once/tasks.py index d4cb2d6..0c68294 100644 --- a/celery_once/tasks.py +++ b/celery_once/tasks.py @@ -27,15 +27,12 @@ class QueueOnce(Task): """ 'There can be only one'. - Highlander (1986) - An abstract tasks with the ability to detect if it has already been queued. When running the task (through .delay/.apply_async) it checks if the tasks is not already queued. By default it will raise an an AlreadyQueued exception if it is, by you can silence this by including `once={'graceful': True}` in apply_async or in the task's settings. - Example: - >>> from celery_queue.tasks import QueueOnce >>> from celery import task >>> @task(base=QueueOnce, once={'graceful': True}) @@ -64,7 +61,7 @@ def unlock_before_run(self): return self.once.get('unlock_before_run', False) def keep_lock_until_timeout(self): - return self.once.get('keep_lock_until_timeout', False) + return self.once_config['settings'].get('keep_lock_until_timeout', False) def __init__(self, *args, **kwargs): self._signature = signature(self.run) @@ -82,7 +79,6 @@ def apply_async(self, args=None, kwargs=None, **options): """ Attempts to queues a task. Will raises an AlreadyQueued exception if already queued. - :param \*args: positional arguments passed on to the task. :param \*\*kwargs: keyword arguments passed on to the task. :keyword \*\*once: (optional) @@ -93,14 +89,13 @@ def apply_async(self, args=None, kwargs=None, **options): An `int' number of seconds after which the lock will expire. If not set, defaults to 1 hour. :param: keys: (optional) - """ once_options = options.get('once', {}) once_graceful = once_options.get( 'graceful', self.once.get('graceful', False)) once_timeout = once_options.get( 'timeout', self.once.get('timeout', self.default_timeout)) - + if not options.get('retries'): key = self.get_key(args, kwargs) try: From bab589937db874b29dd0bd0b6c178debaf78203a Mon Sep 17 00:00:00 2001 From: PierreC Date: Mon, 20 Sep 2021 17:01:59 +0200 Subject: [PATCH 3/3] Update README.rst --- README.rst | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/README.rst b/README.rst index 79efc77..978410d 100644 --- a/README.rst +++ b/README.rst @@ -38,7 +38,8 @@ Once installed, you'll need to configure a few options a ``ONCE`` key in celery' 'backend': 'celery_once.backends.Redis', 'settings': { 'url': 'redis://localhost:6379/0', - 'default_timeout': 60 * 60 + 'default_timeout': 60 * 60, + 'keep_lock_until_timeout': False } } @@ -176,7 +177,24 @@ By default, the lock is removed after the task has executed (using celery's `aft return "Done!" +``keep_lock_until_timeout`` +----------- +By default, when the lock is removed at the end of the task of at the end of the timeout, a new similar task can be pushed in the queue. +If you set ``'keep_lock_until_timeout' : True`` in the conf, no matter if the task has ended or not, you can't push the same task until the time if over the timeout. +.. code:: python + + celery.conf.ONCE = { + 'backend': 'celery_once.backends.Redis', + 'settings': { + 'url': 'redis://localhost:6379/0', + 'default_timeout': 60 * 60, + 'keep_lock_until_timeout': True + } + } + + + Backends ========