[Django]-Connect new celery periodic task in django

40πŸ‘

For django we need to use another signal: @celery_app.on_after_finalize.connect. It can be used for both:

  • declaration of task schedule close to task in app/tasks.py because this signal will be fired after all tasks.py imported and all possible receivers already subscribed (first case).
  • centralized schedule declaration because django apps will be already initialized and ready for imports (second case)

I think I should write down final declaration:

First case

Declaration of task schedule close to task:

main_app/some_app/tasks.py

from main_app.celery import app as celery_app

@celery_app.on_after_finalize.connect
    def setup_periodic_tasks(sender, **kwargs):
        # Calls test('hello') every 10 seconds.
        sender.add_periodic_task(10.0, test.s('hello'))

@celery_app.task
def test(arg):
    print(arg)

Second case

Centralized schedule declaration in config file main_app/celery.py:

...

app = Celery()

@app.on_after_finalize.connect
def setup_periodic_tasks(sender, **kwargs):
    # Calls test('hello') every 10 seconds.
    from main_app.some_app.tasks import test
    sender.add_periodic_task(10.0, test.s('hello'))
...
πŸ‘€vvkuznetsov

20πŸ‘

If the intent is to maintain task logic separately in tasks.py, then calling from main_app.some_app.tasks import test inside setup_periodic_tasks did not work for me. What worked is the following:

celery.py

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    # Calls test('hello') every 10 seconds.
    sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')

@app.task
def test(arg):
    print(arg)
    from some_app.tasks import test
    test(arg)

tasks.py

@shared_task
def test(arg):
    print('world')

This resulted in the following output:

[2017-10-26 22:52:42,262: INFO/MainProcess] celery@ubuntu-xenial ready.
[2017-10-26 22:52:42,263: INFO/MainProcess] Received task: main_app.celery.test[3cbdf4fa-ff63-401a-a9e4-cfd1b6bb4ad4]  
[2017-10-26 22:52:42,367: WARNING/ForkPoolWorker-2] hello
[2017-10-26 22:52:42,368: WARNING/ForkPoolWorker-2] world
[2017-10-26 22:52:42,369: INFO/ForkPoolWorker-2] Task main_app.celery.test[3cbdf4fa-ff63-401a-a9e4-cfd1b6bb4ad4] succeeded in 0.002823335991706699s: None
[2017-10-26 22:52:51,205: INFO/Beat] Scheduler: Sending due task add every 10 (main_app.celery.test)
[2017-10-26 22:52:51,207: INFO/MainProcess] Received task: main_app.celery.test[ce0f3cfc-54d5-4d74-94eb-7ced2e5a6c4b]  
[2017-10-26 22:52:51,209: WARNING/ForkPoolWorker-2] hello
[2017-10-26 22:52:51,209: WARNING/ForkPoolWorker-2] world
πŸ‘€Prasanna

1πŸ‘

If you want to use task logic seperately, use this setup:

celery.py:

import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'backend.settings') # your settings.py path

app = Celery()

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    sender.add_periodic_task(5, periodic_task.s('sms'), name='SMS Process')
    sender.add_periodic_task(60, periodic_task.s('email'), name='Email Process')


@app.task
def periodic_task(taskname):
    from myapp.tasks import sms_process, email_process

    if taskname == 'sms':
        sms_process()

    elif taskname == 'email':
        email_process()

a sample task in a django app named myapp:

myapp/tasks.py:

def sms_process():
    print('send sms task')

def email_process():
    print('send email task')
πŸ‘€suhailvs

1πŸ‘

For the new way, you still need to call tasks periodically in settings.py same as the old way.

For example, app with Celery() is defined in core/celery.py as shown below:

# "core/celery.py"

import os

from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'core.settings')

app = Celery('core')

app.config_from_object('django.conf:settings', namespace='CELERY')

app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self):
    print(f'Request: {self.request!r}')

Then, there is display task below:

# "my_app/tasks.py"

from celery import shared_task

@shared_task
def display(arg):
    return arg

Now, you need to call display task periodically in core/settings.py for the new way as shown below, then it works properly. *You need to import app and display task as shown below:

# "core/settings.py"

from .celery import app
from my_app.tasks import display

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    sender.add_periodic_task(
        3.0, 
        display.s('Hello'), 
        name='display-every-3-seconds'
    )
    sender.add_periodic_task(
        7.0, 
        display.s('World'), 
        name='display-every-7-seconds'
    )

In addition, this below is the old way which also works properly:

# "core/settings.py"

CELERY_BEAT_SCHEDULE = {
    "display-every-3-seconds": {
        "task": "my_app.tasks.display",
        "schedule": 3.0,
        "args": ["Hello"],
    },
    "display-every-7-seconds": {
        "task": "my_app.tasks.display",
        "schedule": 7.0,
        "args": ["World"],
    },
}

0πŸ‘

I got it working with using

celery.py

import os
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'mysite.settings')

app = Celery('mysite')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()

tasks.py

from celery import current_app
app = current_app._get_current_object()

@app.task
def test(arg):
    print(arg)

@app.on_after_finalize.connect
def app_ready(**kwargs):
    """
    Called once after app has been finalized.
    """
    sender = kwargs.get('sender')

    # periodic tasks
    speed = 5
    sender.add_periodic_task(speed, test.s('foo'),name='update leases every {} seconds'.format(speed))

running worker as

celery -A mysite worker --beat --scheduler django --loglevel=info
πŸ‘€Dromosys

0πŸ‘

Was struggling as well, no activity in terminal, got it working with below:

Django version 3.2.8, Celery version 5.2.0

In Django project, called Proj

Proj/Proj celery.py (a file next to settings.py)

celery.py

import os

from celery import Celery

# Set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'Proj.settings')

app = Celery('Proj')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django apps.
app.autodiscover_tasks()

within the __init__.py (same folder as settings.py)

__init__.py

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ('celery_app',)

Within any sub django app folder, a file called tasks.py (next to models.py)

tasks.py

from Proj.celery import app

# Schedule
@app.on_after_finalize.connect
def setup_periodic_tasks(sender, **kwargs):
    # Calls test('hello') every 1 seconds.
    sender.add_periodic_task(1.0, test.s('hello'), name='add every 1')

    # Calls test('world') every 3 seconds
    sender.add_periodic_task(3.0, test.s('world'), expires=10)

# Tasks
@app.task
def test(arg):
    print(arg)

Then, run below in terminal, using virtual environment, if applicable:

>>> celery -A Proj worker -B

RESULT (confirms it’s working):

[2021-11-10 11:22:22,070: WARNING/MainProcess] /.venv/lib/python3.9/site-packages/celery/fixups/django.py:203: UserWarning: Using settings.DEBUG leads to a memory
            leak, never use this setting in production environments!
  warnings.warn('''Using settings.DEBUG leads to a memory

[2021-11-10 11:22:22,173: WARNING/ForkPoolWorker-9] hello
[2021-11-10 11:22:22,173: WARNING/ForkPoolWorker-3] hello
[2021-11-10 11:22:22,173: WARNING/ForkPoolWorker-2] world
πŸ‘€Jaco

Leave a comment