[Django]-Resolving circular imports in celery and django

1👍

Use signals.

tasks.py

from models import MyModel, my_signal
from celery import shared_task
from django.dispatch import receiver

@shared_task
def my_task(id):
    qs = MyModel.objects.filter(some_field=id)
    for record in qs:
        my_value = #do some computations
        record.my_field = my_value
        record.save()

@receiver(my_signal)
def my_receiver(sender, **kwargs):
    my_task.delay(kwargs['id'])

models.py

 from django.db import models
 from tasks import my_task
 from django.dispatch import Signal

 my_signal = Signal(providing_args=['id'])

 class MyModel(models.Model):
      field1 = models.IntegerField()
      #more fields
      my_field = models.FloatField(null=True)

      @staticmethod
      def load_from_file(file):
          #parse file, set fields from file
          my_signal.send(sender=?, id=?)
👤joshua

23👍

The solution posted by joshua is very good, but when I first tried it, I found that my @receiver decorators had no effect. That was because the tasks module wasn’t imported anywhere, which was expected as I used task auto-discovery.

There is, however, another way to decouple tasks.py from modules.py. Namely, tasks can be sent by name and they don’t have to be evaluated (imported) in the process that sends them:

from django.db import models
#from tasks import my_task
import celery

class MyModel(models.Model):
    field1 = models.IntegerField()
    #more fields
    my_field = models.FloatField(null=True)

    @staticmethod
    def load_from_file(file):
        #parse file, set fields from file
        #my_task.delay(id)
        celery.current_app.send_task('myapp.tasks.my_task', (id,))

send_task() is a method on Celery app objects.

In this solution it is important to take care of correct, predictable names for your tasks.

19👍

In your models instead of importing the my_task at the beginning of the file, you can import it just before you use it. It will solve circular imports problem.

from django.db import models

class MyModel(models.Model):
      field1 = models.IntegerField()
      #more fields
      my_field = models.FloatField(null=True)

      @staticmethod
      def load_from_file(file):
          #parse file, set fields from file
          from tasks import my_task   # import here instead of top
          my_task.delay(id)

Alternatively, you can also do same thing in your tasks.py. You can import your models just before you use it instead of beginning.

Alternative:

You can use send_task method to call your task

from celery import current_app
from django.db import models

class MyModel(models.Model):
      field1 = models.IntegerField()
      #more fields
      my_field = models.FloatField(null=True)

      @staticmethod
      def load_from_file(file):
          #parse file, set fields from file
          current_app.send_task('myapp.tasks.my_task', (id,))

15👍

Just to toss one more not-great solution into this list, what I’ve ended up doing is relying on django’s now-built-in app registry.

So in tasks.py, rather than importing from models, you use apps.get_model() to gain access to the model.

I do this with a helper method with a healthy bit of documentation just to express why this is painful:

from django.apps import apps

def _model(model_name):
    """Generically retrieve a model object.

    This is a hack around Django/Celery's inherent circular import
    issues with tasks.py/models.py. In order to keep clean abstractions, we use
    this to avoid importing from models, introducing a circular import.

    No solutions for this are good so far (unnecessary signals, inline imports,
    serializing the whole object, tasks forced to be in model, this), so we
    use this because at least the annoyance is constrained to tasks.
    """
    return apps.get_model('my_app', model_name)

And then:

@shared_task
def some_task(post_id):
    post = _model('Post').objects.get(pk=post_id)

You could certainly just use apps.get_model() directly though.

👤umbrae

0👍

Not sure if this is anyone else’s problem, but I took a few hours, and I found a solution…mainly, the key from the documentation:

Using the @shared_task decorator

The tasks you write will probably live in reusable apps, and reusable apps cannot depend on the project itself, so you also cannot import your app instance directly.

Basically what I was doing was this…

####
# project/coolapp/tasks.py -- DON'T DO THIS
import os
from celery import Celery
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings")
app = Celery("coolapp")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()


@app.task(bind=True)
def some_task(self, some_id):
  from coolapp.models import CoolPerson

####
# project/coolapp/__init__.py -- DON'T DO THIS
from __future__ import absolute_import, unicode_literals
from .tasks import app as celery_app
__all__ = ("celery_app",)

Therefore, I was getting weird errors about missing app labels (a clear indication of a circular import).

The solution…

Refactor project/coolapp/tasks.py -> project/project/tasks.py and project/coolapp/__init__.py -> project/project/__init__.py.

IMPORTANT: This does not (and should not) be added to INSTALLED_APPS. Otherwise, you’ll get the circular import.

So then to start the woker:

celery -A project.project worker -l INFO

Also, a little debugging tip…

When you want to find out if your tasks are properly discovered, put this in project/project/app.py:

app.autodiscover_tasks()
assert "project.app.tasks.some_task" in app.tasks

Otherwise, you’ll have to start up the worker only to realize your tasks aren’t included in the app, then you’ll have to wait for shutdown.

Leave a comment