4👍
celery-singleton solves this requirement
Caveat: requires redis broker (for distributed locks)
pip install celery-singleton
Use the Singleton
task base class:
from celery_singleton import Singleton
@celery_app.task(base=Singleton)
def do_stuff_for_some_time(some_id):
e = Model.objects.get(id=some_id)
e.domanystuff()
from the docs:
calls to do_stuff.delay() will either queue a new task
or return an AsyncResult for the currently queued/running instance of
the task
3👍
I would try a mix of a cache lock
and a task result backend
which stores each task’s results:
-
The cache lock will prevent tasks with the same arguments to get added to the queue multiple times. Celery documentation contains a nice example of cache lock implementation here, but if you don’t want to create it yourself, you can use the celery-once module.
-
For a task result backend, we will use the recommended django-celery-results, which creates a
TaskResult
table that we will query for task results.
Example:
-
Install and configure
django-celery-results
:settings.py
:INSTALLED_APPS = ( ..., 'django_celery_results', ) CELERY_RESULT_BACKEND = 'django-db' # You can also use 'django-cache'
./manage.py migrate django_celery_results
-
Install and configure the
celery-once
module:tasks.py
:from celery import Celery from celery_once import QueueOnce from time import sleep celery = Celery('tasks', broker='amqp://guest@localhost//') celery.conf.ONCE = { 'backend': 'celery_once.backends.Redis', 'settings': { 'url': 'redis://localhost:6379/0', 'default_timeout': 60 * 60 } } @celery.task(base=QueueOnce) def do_stuff_for_some_time(some_id): e = Model.objects.get(id=some_id) e.domanystuff()
At this point, if a task with the same arguments is going to be executed,
anAlreadyQueued
exception will be raised. -
Let’s use the above:
from django_celery_results.models import TaskResult try: result = do_stuff_for_some_time(some_id) except AlreadyQueued: result = TaskResult.objects.get(task_args=some_id)
Caveats:
-
Mind that at the time an
AlreadyQueued
exception arises, the initial task with argument=some_id
may not be executed and therefore it will not have results inTaskResult
table. -
Mind everything in your code that can go wrong and hang any of the above processes (because it will do that!).
Extra Reading:
- Another Task with Lock DIY implementation
- django-celery-result’s
TaskResult
model.
- Unable to find/build a commenting system similar as at Djangobook
- Django/Nginx – Error 403 Forbidden when serving media files over some size
2👍
I am not really sure if celery has such an option. However, I would like to suggest a work-around.
1) Create a model for all the celery tasks being queued. In that model, save the task_name, queue_name as well as the parameters
2) Use a get_or_create on that model for every celery task that is ready to be queued.
3) If created = True from step 2, allow the task to be added to the queue, else do not add the task into the queue
- Why is gunicorn_django not recommended anymore?
- How do I insert a Django template variable into a React script?
- Failed: Database access not allowed, use the "django_db" mark, or the "db" or "transactional_db" fixtures to enable it
- Django model_to_dict skips all DateTimeField when converting models