[Fixed]-Make Django test case database visible to Celery

34👍

This is possible by starting a Celery worker within the Django test case.

Background

Django’s in-memory database is sqlite3. As it says on the description page for Sqlite in-memory databases, "[A]ll database connections sharing the in-memory database need to be in the same process." This means that, as long as Django uses an in-memory test database and Celery is started in a separate process, it is fundamentally impossible to have Celery and Django to share a test database.

However, with celery.contrib.testing.worker.start_worker, it possible to start a Celery worker in a separate thread within the same process. This worker can access the in-memory database.

This assumes that Celery is already setup in the usual way with the Django project.

Solution

Because Django-Celery involves some cross-thread communication, only test cases that don’t run in isolated transactions will work. The test case must inherit directly from SimpleTestCase or its Rest equivalent APISimpleTestCase and set databases to '__all__' or just the database that the test interacts with.

The key is to start a Celery worker in the setUpClass method of the TestCase and close it in the tearDownClass method. The key function is celery.contrib.testing.worker.start_worker, which requires an instance of the current Celery app, presumably obtained from mysite.celery.app and returns a Python ContextManager, which has __enter__ and __exit__ methods, which must be called in setUpClass and tearDownClass, respectively. There is probably a way to avoid manually entering and existing the ContextManager with a decorator or something, but I couldn’t figure it out. Here is an example tests.py file:

from celery.contrib.testing.worker import start_worker
from django.test import SimpleTestCase

from mysite.celery import app

class BatchSimulationTestCase(SimpleTestCase):
    databases = '__all__'

    @classmethod
    def setUpClass(cls):
        super().setUpClass()

        # Start up celery worker
        cls.celery_worker = start_worker(app, perform_ping_check=False)
        cls.celery_worker.__enter__()

    @classmethod
    def tearDownClass(cls):
        super().tearDownClass()

        # Close worker
        cls.celery_worker.__exit__(None, None, None)

    def test_my_function(self):
        # my_task.delay() or something

For whatever reason, the testing worker tries to use a task called 'celery.ping', probably to provide better error messages in the case of worker failure. The task it is looking for is celery.contrib.testing.tasks.ping, which is not available at test time. Setting the perform_ping_check argument of start_worker to False skips the check for this and avoids the associated error.

Now, when the tests are run, there is no need to start a separate Celery process. A Celery worker will be started in the Django test process as a separate thread. This worker can see any in-memory databases, including the default in-memory test database. To control the number of workers, there are options available in start_worker, but it appears the default is a single worker.

3👍

For your unittests I would recommend skipping the celery dependency, the two following links will provide you with the necesarry infos to start your unittests:

If you really want to test the celery function calls including a queue I’d propably set up a dockercompose with the server, worker, queue combination and extend the custom CeleryTestRunner from the django-celery docs. But I wouldn’t see a benefit from it because the test system is pbly to far away from production to be representative.

0👍

I found another workaround for the solution based on @drhagen’s one:

Call celery.contrib.testing.app.TestApp() before calling start_worker(app)

from celery.contrib.testing.worker import start_worker
from celery.contrib.testing.app import TestApp

from myapp.tasks import app, my_task


class TestTasks:
    def setup(self):
        TestApp()
        self.celery_worker = start_worker(app)
        self.celery_worker.__enter__()

    def teardown(self):
        self.celery_worker.__exit__(None, None, None)

0👍

I found a useful way to test celery tasks without writing to the main database.

You can use unittest.mock.patch to replace the celery function.

import json
from rest_framework.test import APITestCase
from myapp.models import MyModel
from myapp.util import get_result_from_response
from unittest.mock import patch

class MyTestCase(APITestCase):
    @classmethod
    def setUpTestData(cls):
        # This object is not visible to Celery
        MyModel(id='test_object').save()

    @patch('appName.views.yourView.yourCeleryTask.delay', new=yourCeleryTask)
    def test_celery_integration(self):
        # This view spawns a Celery task
        # Task should see MyModel.objects.get(id='test_object'), but can't
        http_response = self.client.post('/', 'test_data', format='json')

        result = get_result_from_response(http_response)
        result.get()  # Wait for task to finish before ending test case
        # Objects saved by Celery task should be deleted, but persist

Hope this helps, instead of calling celery worker to perform the task, the view itself will perform the task while testing

Leave a comment