40π
For django we need to use another signal: @celery_app.on_after_finalize.connect
. It can be used for both:
- declaration of task schedule close to task in
app/tasks.py
because this signal will be fired after alltasks.py
imported and all possible receivers already subscribed (first case). - centralized schedule declaration because django apps will be already initialized and ready for imports (second case)
I think I should write down final declaration:
First case
Declaration of task schedule close to task:
main_app/some_app/tasks.py
from main_app.celery import app as celery_app
@celery_app.on_after_finalize.connect
def setup_periodic_tasks(sender, **kwargs):
# Calls test('hello') every 10 seconds.
sender.add_periodic_task(10.0, test.s('hello'))
@celery_app.task
def test(arg):
print(arg)
Second case
Centralized schedule declaration in config file main_app/celery.py
:
...
app = Celery()
@app.on_after_finalize.connect
def setup_periodic_tasks(sender, **kwargs):
# Calls test('hello') every 10 seconds.
from main_app.some_app.tasks import test
sender.add_periodic_task(10.0, test.s('hello'))
...
20π
If the intent is to maintain task logic separately in tasks.py, then calling from main_app.some_app.tasks import test
inside setup_periodic_tasks
did not work for me. What worked is the following:
celery.py
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
# Calls test('hello') every 10 seconds.
sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')
@app.task
def test(arg):
print(arg)
from some_app.tasks import test
test(arg)
tasks.py
@shared_task
def test(arg):
print('world')
This resulted in the following output:
[2017-10-26 22:52:42,262: INFO/MainProcess] celery@ubuntu-xenial ready.
[2017-10-26 22:52:42,263: INFO/MainProcess] Received task: main_app.celery.test[3cbdf4fa-ff63-401a-a9e4-cfd1b6bb4ad4]
[2017-10-26 22:52:42,367: WARNING/ForkPoolWorker-2] hello
[2017-10-26 22:52:42,368: WARNING/ForkPoolWorker-2] world
[2017-10-26 22:52:42,369: INFO/ForkPoolWorker-2] Task main_app.celery.test[3cbdf4fa-ff63-401a-a9e4-cfd1b6bb4ad4] succeeded in 0.002823335991706699s: None
[2017-10-26 22:52:51,205: INFO/Beat] Scheduler: Sending due task add every 10 (main_app.celery.test)
[2017-10-26 22:52:51,207: INFO/MainProcess] Received task: main_app.celery.test[ce0f3cfc-54d5-4d74-94eb-7ced2e5a6c4b]
[2017-10-26 22:52:51,209: WARNING/ForkPoolWorker-2] hello
[2017-10-26 22:52:51,209: WARNING/ForkPoolWorker-2] world
- [Django]-'collectstatic' command fails when WhiteNoise is enabled
- [Django]-What is the best way to migrate data in django
- [Django]-How to get GET request values in Django?
1π
If you want to use task logic seperately, use this setup:
celery.py:
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'backend.settings') # your settings.py path
app = Celery()
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(5, periodic_task.s('sms'), name='SMS Process')
sender.add_periodic_task(60, periodic_task.s('email'), name='Email Process')
@app.task
def periodic_task(taskname):
from myapp.tasks import sms_process, email_process
if taskname == 'sms':
sms_process()
elif taskname == 'email':
email_process()
a sample task in a django app named myapp
:
myapp/tasks.py:
def sms_process():
print('send sms task')
def email_process():
print('send email task')
- [Django]-Query for top x elements in Django
- [Django]-Django order_by query set, ascending and descending
- [Django]-How do I force Django to ignore any caches and reload data?
1π
For the new way, you still need to call tasks periodically in settings.py
same as the old way.
For example, app
with Celery()
is defined in core/celery.py
as shown below:
# "core/celery.py"
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'core.settings')
app = Celery('core')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print(f'Request: {self.request!r}')
Then, there is display
task below:
# "my_app/tasks.py"
from celery import shared_task
@shared_task
def display(arg):
return arg
Now, you need to call display
task periodically in core/settings.py
for the new way as shown below, then it works properly. *You need to import app
and display
task as shown below:
# "core/settings.py"
from .celery import app
from my_app.tasks import display
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(
3.0,
display.s('Hello'),
name='display-every-3-seconds'
)
sender.add_periodic_task(
7.0,
display.s('World'),
name='display-every-7-seconds'
)
In addition, this below is the old way which also works properly:
# "core/settings.py"
CELERY_BEAT_SCHEDULE = {
"display-every-3-seconds": {
"task": "my_app.tasks.display",
"schedule": 3.0,
"args": ["Hello"],
},
"display-every-7-seconds": {
"task": "my_app.tasks.display",
"schedule": 7.0,
"args": ["World"],
},
}
- [Django]-Why and When to use Django mark_safe() function
- [Django]-Parentheses in django if statement
- [Django]-How do I deal with this race condition in django?
0π
I got it working with using
celery.py
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'mysite.settings')
app = Celery('mysite')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
tasks.py
from celery import current_app
app = current_app._get_current_object()
@app.task
def test(arg):
print(arg)
@app.on_after_finalize.connect
def app_ready(**kwargs):
"""
Called once after app has been finalized.
"""
sender = kwargs.get('sender')
# periodic tasks
speed = 5
sender.add_periodic_task(speed, test.s('foo'),name='update leases every {} seconds'.format(speed))
running worker as
celery -A mysite worker --beat --scheduler django --loglevel=info
- [Django]-How to put timedelta in django model?
- [Django]-Default value for user ForeignKey with Django admin
- [Django]-Update to Django 1.8 β AttributeError: django.test.TestCase has no attribute 'cls_atomics'
0π
Was struggling as well, no activity in terminal, got it working with below:
Django version 3.2.8, Celery version 5.2.0
In Django project, called Proj
Proj/Proj celery.py
(a file next to settings.py
)
celery.py
import os
from celery import Celery
# Set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'Proj.settings')
app = Celery('Proj')
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django apps.
app.autodiscover_tasks()
within the __init__.py
(same folder as settings.py
)
__init__.py
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app
__all__ = ('celery_app',)
Within any sub django app folder, a file called tasks.py
(next to models.py)
tasks.py
from Proj.celery import app
# Schedule
@app.on_after_finalize.connect
def setup_periodic_tasks(sender, **kwargs):
# Calls test('hello') every 1 seconds.
sender.add_periodic_task(1.0, test.s('hello'), name='add every 1')
# Calls test('world') every 3 seconds
sender.add_periodic_task(3.0, test.s('world'), expires=10)
# Tasks
@app.task
def test(arg):
print(arg)
Then, run below in terminal, using virtual environment, if applicable:
>>> celery -A Proj worker -B
RESULT (confirms itβs working):
[2021-11-10 11:22:22,070: WARNING/MainProcess] /.venv/lib/python3.9/site-packages/celery/fixups/django.py:203: UserWarning: Using settings.DEBUG leads to a memory
leak, never use this setting in production environments!
warnings.warn('''Using settings.DEBUG leads to a memory
[2021-11-10 11:22:22,173: WARNING/ForkPoolWorker-9] hello
[2021-11-10 11:22:22,173: WARNING/ForkPoolWorker-3] hello
[2021-11-10 11:22:22,173: WARNING/ForkPoolWorker-2] world
- [Django]-How to verify if object exist in manytomany
- [Django]-Django Selective Dumpdata
- [Django]-How to get POST request values in Django?