Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add release_on_start task configuration #46

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ This is a baseclass for celery tasks that ensures only one instance of the task
- [Task configuration](#task-configuration)
- [unique\_on](#uniqueon)
- [raise\_on\_duplicate](#raiseonduplicate)
- [lock\_expiry](#lockexpiry)
- [release\_on\_start](#releaseonstart)
- [App Configuration](#app-configuration)
- [Testing](#testing)
- [Contribute](#contribute)
Expand Down Expand Up @@ -184,6 +186,25 @@ assert task1 != task2 # These are two separate task instances

This option can be applied globally in the [app config](#app-configuration) with `singleton_lock_expiry`. Task option supersedes the app config.

### release\_on\_start
_Warning: This is only available in celery versions 5.2 and above_

By default, the lock that a singleton task holds is released when the job is finished. By setting `release_on_start=True` on the task, the lock will be released before the task is run and is thus only held while the job is in the queue.

```python
@app.task(base=Singleton, release_on_start=True)
def runs_for_3_seconds():
self.time.sleep(3)


task1 = runs_for_3_seconds.delay()
time.sleep(1)
# task1 has started and is still running
task2 = runs_for_3_seconds.delay()

assert task1 != task2 # These are two separate task instances
```


## App Configuration

Expand Down
11 changes: 9 additions & 2 deletions celery_singleton/singleton.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class Singleton(BaseTask):
unique_on = None
raise_on_duplicate = None
lock_expiry = None
release_on_start = False

@property
def _raise_on_duplicate(self):
Expand Down Expand Up @@ -145,8 +146,14 @@ def on_duplicate(self, existing_task_id):
)
return self.AsyncResult(existing_task_id)

def before_start(self, task_id, args, kwargs):
if self.release_on_start:
self.release_lock(task_args=args, task_kwargs=kwargs)

def on_failure(self, exc, task_id, args, kwargs, einfo):
self.release_lock(task_args=args, task_kwargs=kwargs)
if not self.release_on_start:
self.release_lock(task_args=args, task_kwargs=kwargs)

def on_success(self, retval, task_id, args, kwargs):
self.release_lock(task_args=args, task_kwargs=kwargs)
if not self.release_on_start:
self.release_lock(task_args=args, task_kwargs=kwargs)
31 changes: 31 additions & 0 deletions tests/test_singleton.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from celery_singleton.singleton import Singleton, clear_locks
from celery_singleton import util, DuplicateTaskError
from celery_singleton.backends.redis import RedisBackend
from celery import version_info as celery_version
from celery_singleton.backends import get_backend
from celery_singleton.config import Config

Expand Down Expand Up @@ -369,3 +370,33 @@ def simple_task(*args):
mock_lock.assert_called_once_with(
simple_task.singleton_backend, lock, result.task_id, expiry=60
)


class TestReleaseOnStart:
@pytest.mark.skipif(celery_version < (5, 2), reason="Feature requires least celery-5.2")
def test__release_on_start(
self, scoped_app, celery_worker
) :
with scoped_app:
@celery_worker.app.task(base=Singleton, release_on_start=True)
def queue_only_task(a):
time.sleep(1)
return a * 2

celery_worker.reload() # So task is registered

# `task1` is in queue, but not started
task1 = queue_only_task.apply_async(args=[1], countdown=1)
time.sleep(0.05) # small delay for on_success
# `task2` is in queue, but not started
task2 = queue_only_task.apply_async(args=[1])
# Both tasks were in the queue
assert task1 == task2

time.sleep(1.10)
# `task1` is now started (and the lock is released)
task3 = queue_only_task.apply_async(args=[1])
time.sleep(0.05) # small delay for on_success
task3.get()
# `task1` had started executing, so `task3` was free to be scheduled
assert task1 != task3