Skip to content

Commit

Permalink
Add a Task class specialised for Django (celery#8491)
Browse files Browse the repository at this point in the history
* Add a Task class for Django

* Automatically use specialised Django Task class

* Add unit tests for specialized Django task

* Don't use specialized Django task if customised by user

* Add patch to avoid side effects with other tests

* Rename task class to DjangoTask

* Add versionadded

* Add reference page for new DjangoTask

* Fix generation of reference documentation for DjangoTask

* Fix links & extend documentation

* Fix link to base task in docs

* Improve links in DjangoTask docs

* Improve more links in DjangoTask docs

* Apply suggestions from code review

Co-authored-by: Asif Saif Uddin <[email protected]>

* Update Django example to demo the new delay_on_commit() method

* Replace try/catch ImportError for documentation by autodoc_mock_imports

---------

Co-authored-by: Asif Saif Uddin <[email protected]>
  • Loading branch information
browniebroke and auvipy authored Jan 29, 2024
1 parent 86895a9 commit da1146a
Show file tree
Hide file tree
Showing 12 changed files with 167 additions and 3 deletions.
1 change: 1 addition & 0 deletions celery/app/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ def __init__(self, main=None, loader=None, backend=None,
self.loader_cls = loader or self._get_default_loader()
self.log_cls = log or self.log_cls
self.control_cls = control or self.control_cls
self._custom_task_cls_used = bool(task_cls)
self.task_cls = task_cls or self.task_cls
self.set_as_current = set_as_current
self.registry_cls = symbol_by_name(self.registry_cls)
Expand Down
Empty file.
21 changes: 21 additions & 0 deletions celery/contrib/django/task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import functools

from django.db import transaction

from celery.app.task import Task


class DjangoTask(Task):
"""
Extend the base :class:`~celery.app.task.Task` for Django.
Provide a nicer API to trigger tasks at the end of the DB transaction.
"""

def delay_on_commit(self, *args, **kwargs):
"""Call :meth:`~celery.app.task.Task.delay` with Django's ``on_commit()``."""
return transaction.on_commit(functools.partial(self.delay, *args, **kwargs))

def apply_async_on_commit(self, *args, **kwargs):
"""Call :meth:`~celery.app.task.Task.apply_async` with Django's ``on_commit()``."""
return transaction.on_commit(functools.partial(self.apply_async, *args, **kwargs))
3 changes: 3 additions & 0 deletions celery/fixups/django.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ def install(self) -> "DjangoFixup":
self._settings = symbol_by_name('django.conf:settings')
self.app.loader.now = self.now

if not self.app._custom_task_cls_used:
self.app.task_cls = 'celery.contrib.django.task:DjangoTask'

signals.import_modules.connect(self.on_import_modules)
signals.worker_init.connect(self.on_worker_init)
return self
Expand Down
3 changes: 2 additions & 1 deletion docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@
r'^http://localhost'
],
autodoc_mock_imports=[
'riak'
'riak',
'django',
]
))

Expand Down
58 changes: 58 additions & 0 deletions docs/django/first-steps-with-django.rst
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,64 @@ concrete app instance:
You can find the full source code for the Django example project at:
https://github.com/celery/celery/tree/main/examples/django/

Trigger tasks at the end of the database transaction
----------------------------------------------------

A common pitfall with Django is triggering a task immediately and not wait until
the end of the database transaction, which means that the Celery task may run
before all changes are persisted to the database. For example:

.. code-block:: python
# views.py
def create_user(request):
# Note: simplified example, use a form to validate input
user = User.objects.create(username=request.POST['username'])
send_email.delay(user.pk)
return HttpResponse('User created')
# task.py
@shared_task
def send_email(user_pk):
user = User.objects.get(pk=user_pk)
# send email ...
In this case, the ``send_email`` task could start before the view has committed
the transaction to the database, and therefore the task may not be able to find
the user.

A common solution is to use Django's `on_commit`_ hook to trigger the task
after the transaction has been committed:

.. _on_commit: https://docs.djangoproject.com/en/stable/topics/db/transactions/#django.db.transaction.on_commit

.. code-block:: diff
- send_email.delay(user.pk)
+ transaction.on_commit(lambda: send_email.delay(user.pk))
.. versionadded:: 5.4

Since this is such a common pattern, Celery 5.4 introduced a handy shortcut for this,
using a :class:`~celery.contrib.django.task.DjangoTask`. Instead of calling
:meth:`~celery.app.task.Task.delay`, you should call
:meth:`~celery.contrib.django.task.DjangoTask.delay_on_commit`:

.. code-block:: diff
- send_email.delay(user.pk)
+ send_email.delay_on_commit(user.pk)
This API takes care of wrapping the call into the `on_commit`_ hook for you.
In rare cases where you want to trigger a task without waiting, the existing
:meth:`~celery.app.task.Task.delay` API is still available.

This task class should be used automatically if you've follow the setup steps above.
However, if your app :ref:`uses a custom task base class <task-custom-classes>`,
you'll need inherit from :class:`~celery.contrib.django.task.DjangoTask` instead of
:class:`~celery.app.task.Task` to get this behaviour.

Extensions
==========

Expand Down
17 changes: 17 additions & 0 deletions docs/reference/celery.contrib.django.task.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
====================================
``celery.contrib.django.task``
====================================

.. versionadded:: 5.4

.. contents::
:local:

API Reference
=============

.. currentmodule:: celery.contrib.django.task

.. automodule:: celery.contrib.django.task
:members:
:undoc-members:
1 change: 1 addition & 0 deletions docs/reference/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
celery.loaders.base
celery.states
celery.contrib.abortable
celery.contrib.django.task
celery.contrib.migrate
celery.contrib.pytest
celery.contrib.sphinx
Expand Down
8 changes: 7 additions & 1 deletion examples/django/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ Running a task
$ python ./manage.py shell
>>> from demoapp.tasks import add, mul, xsum
>>> res = add.delay(2,3)
>>> res = add.delay_on_commit(2, 3)
>>> res.get()
5
.. note::

The ``delay_on_commit`` method is only available when using Django,
and was added in Celery 5.4. If you are using an older version of Celery,
you can use ``delay`` instead.
Empty file.
32 changes: 32 additions & 0 deletions t/unit/contrib/django/test_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from unittest.mock import patch

import pytest


@pytest.mark.patched_module(
'django',
'django.db',
'django.db.transaction',
)
@pytest.mark.usefixtures("module")
class test_DjangoTask:
@pytest.fixture
def task_instance(self):
from celery.contrib.django.task import DjangoTask
yield DjangoTask()

@pytest.fixture(name="on_commit")
def on_commit(self):
with patch(
'django.db.transaction.on_commit',
side_effect=lambda f: f(),
) as patched_on_commit:
yield patched_on_commit

def test_delay_on_commit(self, task_instance, on_commit):
result = task_instance.delay_on_commit()
assert result is not None

def test_apply_async_on_commit(self, task_instance, on_commit):
result = task_instance.apply_async_on_commit()
assert result is not None
26 changes: 25 additions & 1 deletion t/unit/fixups/test_django.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,12 @@ def test_init(self):
with self.fixup_context(self.app) as (f, importmod, sym):
assert f

def test_install(self, patching):
@pytest.mark.patched_module(
'django',
'django.db',
'django.db.transaction',
)
def test_install(self, patching, module):
self.app.loader = Mock()
self.cw = patching('os.getcwd')
self.p = patching('sys.path')
Expand All @@ -97,8 +102,27 @@ def test_install(self, patching):
f.install()
self.sigs.worker_init.connect.assert_called_with(f.on_worker_init)
assert self.app.loader.now == f.now

# Specialized Task class is used
assert self.app.task_cls == 'celery.contrib.django.task:DjangoTask'
from celery.contrib.django.task import DjangoTask
assert issubclass(f.app.Task, DjangoTask)
assert hasattr(f.app.Task, 'delay_on_commit')
assert hasattr(f.app.Task, 'apply_async_on_commit')

self.p.insert.assert_called_with(0, '/opt/vandelay')

def test_install_custom_user_task(self, patching):
patching('celery.fixups.django.signals')

self.app.task_cls = 'myapp.celery.tasks:Task'
self.app._custom_task_cls_used = True

with self.fixup_context(self.app) as (f, _, _):
f.install()
# Specialized Task class is NOT used
assert self.app.task_cls == 'myapp.celery.tasks:Task'

def test_now(self):
with self.fixup_context(self.app) as (f, _, _):
assert f.now(utc=True)
Expand Down

0 comments on commit da1146a

Please sign in to comment.