34👍
This is possible by starting a Celery worker within the Django test case.
Background
Django’s in-memory database is sqlite3. As it says on the description page for Sqlite in-memory databases, "[A]ll database connections sharing the in-memory database need to be in the same process." This means that, as long as Django uses an in-memory test database and Celery is started in a separate process, it is fundamentally impossible to have Celery and Django to share a test database.
However, with celery.contrib.testing.worker.start_worker
, it possible to start a Celery worker in a separate thread within the same process. This worker can access the in-memory database.
This assumes that Celery is already setup in the usual way with the Django project.
Solution
Because Django-Celery involves some cross-thread communication, only test cases that don’t run in isolated transactions will work. The test case must inherit directly from SimpleTestCase
or its Rest equivalent APISimpleTestCase
and set databases
to '__all__'
or just the database that the test interacts with.
The key is to start a Celery worker in the setUpClass
method of the TestCase
and close it in the tearDownClass
method. The key function is celery.contrib.testing.worker.start_worker
, which requires an instance of the current Celery app, presumably obtained from mysite.celery.app
and returns a Python ContextManager
, which has __enter__
and __exit__
methods, which must be called in setUpClass
and tearDownClass
, respectively. There is probably a way to avoid manually entering and existing the ContextManager
with a decorator or something, but I couldn’t figure it out. Here is an example tests.py
file:
from celery.contrib.testing.worker import start_worker
from django.test import SimpleTestCase
from mysite.celery import app
class BatchSimulationTestCase(SimpleTestCase):
databases = '__all__'
@classmethod
def setUpClass(cls):
super().setUpClass()
# Start up celery worker
cls.celery_worker = start_worker(app, perform_ping_check=False)
cls.celery_worker.__enter__()
@classmethod
def tearDownClass(cls):
super().tearDownClass()
# Close worker
cls.celery_worker.__exit__(None, None, None)
def test_my_function(self):
# my_task.delay() or something
For whatever reason, the testing worker tries to use a task called 'celery.ping'
, probably to provide better error messages in the case of worker failure. The task it is looking for is celery.contrib.testing.tasks.ping
, which is not available at test time. Setting the perform_ping_check
argument of start_worker
to False
skips the check for this and avoids the associated error.
Now, when the tests are run, there is no need to start a separate Celery process. A Celery worker will be started in the Django test process as a separate thread. This worker can see any in-memory databases, including the default in-memory test database. To control the number of workers, there are options available in start_worker
, but it appears the default is a single worker.
3👍
For your unittests I would recommend skipping the celery dependency, the two following links will provide you with the necesarry infos to start your unittests:
- http://docs.celeryproject.org/projects/django-celery/en/2.4/cookbook/unit-testing.html
- http://docs.celeryproject.org/en/latest/userguide/testing.html
If you really want to test the celery function calls including a queue I’d propably set up a dockercompose with the server, worker, queue combination and extend the custom CeleryTestRunner from the django-celery docs. But I wouldn’t see a benefit from it because the test system is pbly to far away from production to be representative.
- Create a canonical "parent" product in Django Oscar programmatically
- Django-Rest-Framework serializer class meta
- Clean Up HTML in Python
- How to set timeout for urlfetch in Google App Engine?
- How do I simulate connection errors and request timeouts in python unit tests
0👍
I found another workaround for the solution based on @drhagen’s one:
Call celery.contrib.testing.app.TestApp()
before calling start_worker(app)
from celery.contrib.testing.worker import start_worker
from celery.contrib.testing.app import TestApp
from myapp.tasks import app, my_task
class TestTasks:
def setup(self):
TestApp()
self.celery_worker = start_worker(app)
self.celery_worker.__enter__()
def teardown(self):
self.celery_worker.__exit__(None, None, None)
0👍
I found a useful way to test celery tasks without writing to the main database.
You can use unittest.mock.patch to replace the celery function.
import json
from rest_framework.test import APITestCase
from myapp.models import MyModel
from myapp.util import get_result_from_response
from unittest.mock import patch
class MyTestCase(APITestCase):
@classmethod
def setUpTestData(cls):
# This object is not visible to Celery
MyModel(id='test_object').save()
@patch('appName.views.yourView.yourCeleryTask.delay', new=yourCeleryTask)
def test_celery_integration(self):
# This view spawns a Celery task
# Task should see MyModel.objects.get(id='test_object'), but can't
http_response = self.client.post('/', 'test_data', format='json')
result = get_result_from_response(http_response)
result.get() # Wait for task to finish before ending test case
# Objects saved by Celery task should be deleted, but persist
Hope this helps, instead of calling celery worker to perform the task, the view itself will perform the task while testing