66π
Hereβs the code Iβve been using. celery.task.control.Inspect.stats()
returns a dict containing lots of details about the currently available workers, None if there are no workers running, or raises an IOError
if it canβt connect to the message broker. Iβm using RabbitMQ β itβs possible that other messaging systems might behave slightly differently. This worked in Celery 2.3.x and 2.4.x; Iβm not sure how far back it goes.
def get_celery_worker_status():
ERROR_KEY = "ERROR"
try:
from celery.task.control import inspect
insp = inspect()
d = insp.stats()
if not d:
d = { ERROR_KEY: 'No running Celery workers were found.' }
except IOError as e:
from errno import errorcode
msg = "Error connecting to the backend: " + str(e)
if len(e.args) > 0 and errorcode.get(e.args[0]) == 'ECONNREFUSED':
msg += ' Check that the RabbitMQ server is running.'
d = { ERROR_KEY: msg }
except ImportError as e:
d = { ERROR_KEY: str(e)}
return d
22π
From the documentation of celery 4.2:
from your_celery_app import app
def get_celery_worker_status():
i = app.control.inspect()
availability = i.ping()
stats = i.stats()
registered_tasks = i.registered()
active_tasks = i.active()
scheduled_tasks = i.scheduled()
result = {
'availability': availability,
'stats': stats,
'registered_tasks': registered_tasks,
'active_tasks': active_tasks,
'scheduled_tasks': scheduled_tasks
}
return result
of course you could/should improve the code with error handlingβ¦
- [Django]-Cancel an already executing task with Celery?
- [Django]-Django β what is the difference between render(), render_to_response() and direct_to_template()?
- [Django]-Django 1.3.1 compilemessages. Error: sh: msgfmt: command not found
13π
To check the same using command line in case celery is running as daemon,
- Activate virtualenv and go to the dir where the βappβ is
- Now run :
celery -A [app_name] status
- It will show if celery is up or not plus no. of nodes online
- [Django]-You are trying to add a non-nullable field 'new_field' to userprofile without a default
- [Django]-Check if OneToOneField is None in Django
- [Django]-Django: Set foreign key using integer?
7π
The following worked for me:
import socket
from kombu import Connection
celery_broker_url = "amqp://localhost"
try:
conn = Connection(celery_broker_url)
conn.ensure_connection(max_retries=3)
except socket.error:
raise RuntimeError("Failed to connect to RabbitMQ instance at {}".format(celery_broker_url))
- [Django]-Django β {% csrf_token %} was used in a template, but the context did not provide the value
- [Django]-Update all models at once in Django
- [Django]-How does django handle multiple memcached servers?
6π
One method to test if any worker is responding is to send out a βpingβ broadcast and return with a successful result on the first response.
from .celery import app # the celery 'app' created in your project
def is_celery_working():
result = app.control.broadcast('ping', reply=True, limit=1)
return bool(result) # True if at least one result
This broadcasts a βpingβ and will wait up to one second for responses. As soon as the first response comes in, it will return a result. If you want a False
result faster, you can add a timeout
argument to reduce how long it waits before giving up.
- [Django]-Django Model() vs Model.objects.create()
- [Django]-What's the recommended approach to resetting migration history using Django South?
- [Django]-Iterate over model instance field names and values in template
3π
I found an elegant solution:
from .celery import app
try:
app.broker_connection().ensure_connection(max_retries=3)
except Exception as ex:
raise RuntimeError("Failed to connect to celery broker, {}".format(str(ex)))
- [Django]-Speeding up Django Testing
- [Django]-How can i test for an empty queryset in Django?
- [Django]-Is this the right way to do dependency injection in Django?
2π
You can use ping
method to check whether any worker (or specific worker) is alive or not
https://docs.celeryproject.org/en/latest/_modules/celery/app/control.html#Control.ping
celey_app.control.ping()
- [Django]-Django: how to do calculation inside the template html page?
- [Django]-How to do math in a Django template?
- [Django]-Deploying Django with gunicorn and nginx
1π
The below script is worked for me.
#Import the celery app from project
from application_package import app as celery_app
def get_celery_worker_status():
insp = celery_app.control.inspect()
nodes = insp.stats()
if not nodes:
raise Exception("celery is not running.")
logger.error("celery workers are: {}".format(nodes))
return nodes
- [Django]-Django storages: Import Error β no module named storages
- [Django]-How do you catch this exception?
- [Django]-Django FileField with upload_to determined at runtime
1π
You can test on your terminal by running the following command.
celery -A proj_name worker -l INFO
You can review every time your celery runs.
- [Django]-What is a "django backend"?
- [Django]-Django: manage.py does not print stack trace for errors
- [Django]-Django β how to create a file and save it to a model's FileField?
1π
Run celery status
to get the status.
When celery is running,
(venv) ubuntu@server1:~/project-dir$ celery status
-> celery@server1: OK
1 node online.
When no celery worker is running, you get the below information displayed in terminal.
(venv) ubuntu@server1:~/project-dir$ celery status
Error: No nodes replied within time constraint
- [Django]-How to tell if a task has already been queued in django-celery?
- [Django]-Creating a JSON response using Django and Python
- [Django]-How to set and get session in Django?