1👍
Use signals.
tasks.py
from models import MyModel, my_signal
from celery import shared_task
from django.dispatch import receiver
@shared_task
def my_task(id):
qs = MyModel.objects.filter(some_field=id)
for record in qs:
my_value = #do some computations
record.my_field = my_value
record.save()
@receiver(my_signal)
def my_receiver(sender, **kwargs):
my_task.delay(kwargs['id'])
models.py
from django.db import models
from tasks import my_task
from django.dispatch import Signal
my_signal = Signal(providing_args=['id'])
class MyModel(models.Model):
field1 = models.IntegerField()
#more fields
my_field = models.FloatField(null=True)
@staticmethod
def load_from_file(file):
#parse file, set fields from file
my_signal.send(sender=?, id=?)
23👍
The solution posted by joshua is very good, but when I first tried it, I found that my @receiver
decorators had no effect. That was because the tasks
module wasn’t imported anywhere, which was expected as I used task auto-discovery.
There is, however, another way to decouple tasks.py
from modules.py
. Namely, tasks can be sent by name and they don’t have to be evaluated (imported) in the process that sends them:
from django.db import models
#from tasks import my_task
import celery
class MyModel(models.Model):
field1 = models.IntegerField()
#more fields
my_field = models.FloatField(null=True)
@staticmethod
def load_from_file(file):
#parse file, set fields from file
#my_task.delay(id)
celery.current_app.send_task('myapp.tasks.my_task', (id,))
send_task()
is a method on Celery app objects.
In this solution it is important to take care of correct, predictable names for your tasks.
- [Django]-How to copy modules from one virtualenv to another
- [Django]-Context processors vs middleware in django
- [Django]-Showing json field in Django admin
19👍
In your models instead of importing the my_task
at the beginning of the file, you can import it just before you use it. It will solve circular imports problem.
from django.db import models
class MyModel(models.Model):
field1 = models.IntegerField()
#more fields
my_field = models.FloatField(null=True)
@staticmethod
def load_from_file(file):
#parse file, set fields from file
from tasks import my_task # import here instead of top
my_task.delay(id)
Alternatively, you can also do same thing in your tasks.py
. You can import your models just before you use it instead of beginning.
Alternative:
You can use send_task
method to call your task
from celery import current_app
from django.db import models
class MyModel(models.Model):
field1 = models.IntegerField()
#more fields
my_field = models.FloatField(null=True)
@staticmethod
def load_from_file(file):
#parse file, set fields from file
current_app.send_task('myapp.tasks.my_task', (id,))
- [Django]-Django model blank=False does not work?
- [Django]-How to reload modules in django shell?
- [Django]-How to add verbose_name to forms
15👍
Just to toss one more not-great solution into this list, what I’ve ended up doing is relying on django’s now-built-in app registry.
So in tasks.py
, rather than importing from models, you use apps.get_model()
to gain access to the model.
I do this with a helper method with a healthy bit of documentation just to express why this is painful:
from django.apps import apps
def _model(model_name):
"""Generically retrieve a model object.
This is a hack around Django/Celery's inherent circular import
issues with tasks.py/models.py. In order to keep clean abstractions, we use
this to avoid importing from models, introducing a circular import.
No solutions for this are good so far (unnecessary signals, inline imports,
serializing the whole object, tasks forced to be in model, this), so we
use this because at least the annoyance is constrained to tasks.
"""
return apps.get_model('my_app', model_name)
And then:
@shared_task
def some_task(post_id):
post = _model('Post').objects.get(pk=post_id)
You could certainly just use apps.get_model()
directly though.
- [Django]-Getting database values using get_object_or_404
- [Django]-Django: Not Found static/admin/css
- [Django]-In Django filter statement what's the difference between __exact and equal sign (=)?
0👍
Not sure if this is anyone else’s problem, but I took a few hours, and I found a solution…mainly, the key from the documentation:
Using the @shared_task decorator
The tasks you write will probably live in reusable apps, and reusable apps cannot depend on the project itself, so you also cannot import your app instance directly.
Basically what I was doing was this…
####
# project/coolapp/tasks.py -- DON'T DO THIS
import os
from celery import Celery
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings")
app = Celery("coolapp")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()
@app.task(bind=True)
def some_task(self, some_id):
from coolapp.models import CoolPerson
####
# project/coolapp/__init__.py -- DON'T DO THIS
from __future__ import absolute_import, unicode_literals
from .tasks import app as celery_app
__all__ = ("celery_app",)
Therefore, I was getting weird errors about missing app labels (a clear indication of a circular import).
The solution…
Refactor project/coolapp/tasks.py
-> project/project/tasks.py
and project/coolapp/__init__.py
-> project/project/__init__.py
.
IMPORTANT: This does not (and should not) be added to INSTALLED_APPS
. Otherwise, you’ll get the circular import.
So then to start the woker:
celery -A project.project worker -l INFO
Also, a little debugging tip…
When you want to find out if your tasks are properly discovered, put this in project/project/app.py
:
app.autodiscover_tasks()
assert "project.app.tasks.some_task" in app.tasks
Otherwise, you’ll have to start up the worker only to realize your tasks aren’t included in the app, then you’ll have to wait for shutdown.
- [Django]-Allow only one instance of a model in Django
- [Django]-How to add token auth to swagger + django rest framework?
- [Django]-UUID('…') is not JSON serializable