38π
I have got it working, there are few things to note here:
According Celeryβs 4.2.0 documentation, CELERY_ROUTES
should be the variable to define queue routing, but it only works for me using CELERY_TASK_ROUTES
instead. The task routing seems to be independent from Celery Beat, therefore this will only work for tasks scheduled manually:
app1_test.delay()
app2_test.delay()
or
app1_test.apply_async()
app2_test.apply_async()
To make it work with Celery Beat, we just need to define the queues explicitly in the CELERY_BEAT_SCHEDULE
variable. The final setup of the file my_app/settings.py
would be as follows:
CELERY_BROKER_URL = "amqp://guest:guest@localhost:5672//"
CELERY_TASK_ROUTES = {
'app1.tasks.*': {'queue': 'queue1'},
'app2.tasks.*': {'queue': 'queue2'},
}
CELERY_BEAT_SCHEDULE = {
'app1_test': {
'task': 'app1.tasks.app1_test',
'schedule': 15,
'options': {'queue': 'queue1'}
},
'app2_test': {
'task': 'app2.tasks.app2_test',
'schedule': 15,
'options': {'queue': 'queue2'}
},
}
And to run Celery listening on those two queues:
celery -A my_app worker -B -l INFO -Q queue1,queue2
Where
-A
: name of the project or app.-B
: Initiates the task scheduler Celery beat.-l
: Defines the logging level.-Q
: Defines the queues handled by this worker.
I hope this saves some time to other developers.
26π
adding queue
parameter to the decorator may help you,
@app.task(queue='queue1')
def app1_test():
print('I am app1_test task!')
time.sleep(10)
- [Django]-Get local timezone in django
- [Django]-Serving gzipped content from django
- [Django]-What is the difference render() and redirect() in Django?
1π
Okay as i have tried the same command that you have used to run the worker so I found that you just have to remove the "celery after the -Q parameter and thatβll be fine too.
So the old command is
celery -A my_app worker -B -l info -Q celery,queue1,queue2
And the new command is
celery -A my_app worker -B -l info -Q queue1,queue2
- [Django]-How to stream an HttpResponse with Django
- [Django]-Django model inheritance: create sub-instance of existing instance (downcast)?
- [Django]-What is the django command to delete all tables?