28👍
Ok, so i figured it out. Following is my whole setup, settings and how to run celery, for those who might be wondering about same thing as my question did.
Settings
CELERY_TIMEZONE = TIME_ZONE
CELERY_ACCEPT_CONTENT = ['json', 'pickle']
CELERYD_CONCURRENCY = 2
CELERYD_MAX_TASKS_PER_CHILD = 4
CELERYD_PREFETCH_MULTIPLIER = 1
# celery queues setup
CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE_TYPE = 'topic'
CELERY_DEFAULT_ROUTING_KEY = 'default'
CELERY_QUEUES = (
Queue('default', Exchange('default'), routing_key='default'),
Queue('feeds', Exchange('feeds'), routing_key='long_tasks'),
)
CELERY_ROUTES = {
'arena.social.tasks.Update': {
'queue': 'feeds',
'routing_key': 'long_tasks',
},
}
How to run celery?
terminal – tab 1:
celery -A proj worker -Q default -l debug -n default_worker
this will start first worker that consumes tasks from default queue. NOTE! -n default_worker
is not a must for the first worker, but is a must if you have any other celery instances up and running. Setting -n worker_name
is the same as --hostname=default@%h
.
terminal – tab 2:
celery -A proj worker -Q feeds -l debug -n feeds_worker
this will start second worker that consumers tasks from feeds queue. Notice -n feeds_worker
, if you are running with -l debug
(log level = debug), you will see that both workers are syncing between them.
terminal – tab 3:
celery -A proj beat -l debug
this will start the beat, executing tasks according to the schedule in your CELERYBEAT_SCHEDULE
.
I didn’t have to change the task, or the CELERYBEAT_SCHEDULE
.
For example, this is how looks my CELERYBEAT_SCHEDULE
for the task that should go to feeds queue:
CELERYBEAT_SCHEDULE = {
...
'update_feeds': {
'task': 'arena.social.tasks.Update',
'schedule': crontab(minute='*/6'),
},
...
}
As you can see, no need for adding 'options': {'routing_key': 'long_tasks'}
or specifying to what queue it should go. Also, if you were wondering why Update
is upper cased, its because its a custom task, which are defined as sub classes of celery.Task
.
Update Celery 5.0+
Celery made a couple changes since version 5, here is an updated setup for routing of tasks.
How to create the queues?
Celery can create the queues automatically. It works perfectly for simple cases, where celery default values for routing are ok.
task_create_missing_queues=True
or, if you’re using django settings and you’re namespacing all celery configs under CELERY_
key, CELERY_TASK_CREATE_MISSING_QUEUES=True
. Note, that it is on by default.
Automatic scheduled task routing
After configuring celery app:
celery_app.conf.beat_schedule = {
"some_scheduled_task": {
"task": "module.path.some_task",
"schedule": crontab(minute="*/10"),
"options": {"queue": "queue1"}
}
}
Automatic task routing
Celery app still has to be configured first and then:
app.conf.task_routes = {
"module.path.task2": {"queue": "queue2"},
}
Manual routing of tasks
In case and you want to route the tasks dynamically, then when sending the task specify the queue:
from module import task
def do_work():
# do some work and launch the task
task.apply_async(args=(arg1, arg2), queue="queue3")
More details re routing can be found here:
https://docs.celeryproject.org/en/stable/userguide/routing.html
And regarding calling tasks here:
https://docs.celeryproject.org/en/stable/userguide/calling.html
3👍
In addition to accepted answer, if anyone comes here and still wonders why his settings aren’t working (as I did just moments ago), here’s why: celery documentation isn’t listing settings names properly.
For celery 5.0.5 settings CELERY_DEFAULT_QUEUE
, CELERY_QUEUES
, CELERY_ROUTES
should be named CELERY_TASK_DEFAULT_QUEUE
, CELERY_TASK_QUEUES
and CELERY_TASK_ROUTES
instead. These are settings that I’ve tested, but my guess is the same rule applies for exchange and routing key aswell.
- How can I tell Django templates not to parse a block containing code that looks like template tags?
- How to clear all session variables without getting logged out
- "Returning to that page might cause any action you took to be repeated" – Django
- RuntimeError: 'list' must be None or a list, not <class 'str'> while trying to start celery worker
- Why is post_save being raised twice during the save of a Django model?