[Fixed]-Django/Celery multiple queues on localhost – routing not working

28👍

Ok, so i figured it out. Following is my whole setup, settings and how to run celery, for those who might be wondering about same thing as my question did.

Settings

CELERY_TIMEZONE = TIME_ZONE
CELERY_ACCEPT_CONTENT = ['json', 'pickle']
CELERYD_CONCURRENCY = 2
CELERYD_MAX_TASKS_PER_CHILD = 4
CELERYD_PREFETCH_MULTIPLIER = 1

# celery queues setup
CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE_TYPE = 'topic'
CELERY_DEFAULT_ROUTING_KEY = 'default'
CELERY_QUEUES = (
    Queue('default', Exchange('default'), routing_key='default'),
    Queue('feeds', Exchange('feeds'), routing_key='long_tasks'),
)
CELERY_ROUTES = {
    'arena.social.tasks.Update': {
        'queue': 'feeds',
        'routing_key': 'long_tasks',
    },
}

How to run celery?

terminal – tab 1:

celery -A proj worker -Q default -l debug -n default_worker

this will start first worker that consumes tasks from default queue. NOTE! -n default_worker is not a must for the first worker, but is a must if you have any other celery instances up and running. Setting -n worker_name is the same as --hostname=default@%h.

terminal – tab 2:

celery -A proj worker -Q feeds -l debug -n feeds_worker

this will start second worker that consumers tasks from feeds queue. Notice -n feeds_worker, if you are running with -l debug (log level = debug), you will see that both workers are syncing between them.

terminal – tab 3:

celery -A proj beat -l debug

this will start the beat, executing tasks according to the schedule in your CELERYBEAT_SCHEDULE.
I didn’t have to change the task, or the CELERYBEAT_SCHEDULE.

For example, this is how looks my CELERYBEAT_SCHEDULE for the task that should go to feeds queue:

CELERYBEAT_SCHEDULE = {
    ...
    'update_feeds': {
        'task': 'arena.social.tasks.Update',
        'schedule': crontab(minute='*/6'),
    },
    ...
}

As you can see, no need for adding 'options': {'routing_key': 'long_tasks'} or specifying to what queue it should go. Also, if you were wondering why Update is upper cased, its because its a custom task, which are defined as sub classes of celery.Task.

Update Celery 5.0+

Celery made a couple changes since version 5, here is an updated setup for routing of tasks.

How to create the queues?

Celery can create the queues automatically. It works perfectly for simple cases, where celery default values for routing are ok.

task_create_missing_queues=True or, if you’re using django settings and you’re namespacing all celery configs under CELERY_ key, CELERY_TASK_CREATE_MISSING_QUEUES=True. Note, that it is on by default.

Automatic scheduled task routing

After configuring celery app:

celery_app.conf.beat_schedule = {
  "some_scheduled_task": {
    "task": "module.path.some_task",
    "schedule": crontab(minute="*/10"),
    "options": {"queue": "queue1"}
  }
}

Automatic task routing

Celery app still has to be configured first and then:

app.conf.task_routes = {
  "module.path.task2": {"queue": "queue2"},
}

Manual routing of tasks

In case and you want to route the tasks dynamically, then when sending the task specify the queue:

from module import task

def do_work():
  # do some work and launch the task
  task.apply_async(args=(arg1, arg2), queue="queue3")

More details re routing can be found here:
https://docs.celeryproject.org/en/stable/userguide/routing.html

And regarding calling tasks here:
https://docs.celeryproject.org/en/stable/userguide/calling.html

👤Neara

3👍

In addition to accepted answer, if anyone comes here and still wonders why his settings aren’t working (as I did just moments ago), here’s why: celery documentation isn’t listing settings names properly.

For celery 5.0.5 settings CELERY_DEFAULT_QUEUE, CELERY_QUEUES, CELERY_ROUTES should be named CELERY_TASK_DEFAULT_QUEUE, CELERY_TASK_QUEUESand CELERY_TASK_ROUTES instead. These are settings that I’ve tested, but my guess is the same rule applies for exchange and routing key aswell.

Leave a comment