[Django]-Using mock to patch a celery task in Django unit tests

60👍

The issue that you are having is unrelated to the fact that this is a Celery task. You just happen to be patching the wrong thing. 😉

Specifically, you need to find out which view or other file is importing “mytask” and patch it over there, so the relevant line would look like this:

with patch('myapp.myview.mytask.delay') as mock_task:

There is some more flavor to this here:

http://www.voidspace.org.uk/python/mock/patch.html#where-to-patch

44👍

The @task decorator replaces the function with a Task object (see documentation). If you mock the task itself you’ll replace the (somewhat magic) Task object with a MagicMock and it won’t schedule the task at all. Instead mock the Task object’s run() method, like so:

@override_settings(CELERY_TASK_ALWAYS_EAGER=True)
@patch('monitor.tasks.monitor_user.run')
def test_monitor_all(self, monitor_user):
    """
    Test monitor.all task
    """

    user = ApiUserFactory()
    tasks.monitor_all.delay()
    monitor_user.assert_called_once_with(user.key)

3👍

Just patch the celery Task method

mocker.patch("celery.app.task.Task.delay", return_value=1)
👤jTiKey

Leave a comment