[Solved]-Is it possible to skip delegating a celery task if the params and the task name is already queued in the server?

4👍

celery-singleton solves this requirement

Caveat: requires redis broker (for distributed locks)

pip install celery-singleton

Use the Singleton task base class:

from celery_singleton import Singleton

@celery_app.task(base=Singleton)
def do_stuff_for_some_time(some_id):
    e = Model.objects.get(id=some_id)
    e.domanystuff()

from the docs:

calls to do_stuff.delay() will either queue a new task
or return an AsyncResult for the currently queued/running instance of
the task

3👍

I would try a mix of a cache lock and a task result backend which stores each task’s results:

  • The cache lock will prevent tasks with the same arguments to get added to the queue multiple times. Celery documentation contains a nice example of cache lock implementation here, but if you don’t want to create it yourself, you can use the celery-once module.

  • For a task result backend, we will use the recommended django-celery-results, which creates a TaskResult table that we will query for task results.

Example:

  • Install and configure django-celery-results:

    settings.py:

    INSTALLED_APPS = (
        ...,
        'django_celery_results',
    )
    CELERY_RESULT_BACKEND = 'django-db'  # You can also use 'django-cache'
    

    ./manage.py migrate django_celery_results

  • Install and configure the celery-once module:

    tasks.py:

    from celery import Celery
    from celery_once import QueueOnce
    from time import sleep
    
    celery = Celery('tasks', broker='amqp://guest@localhost//')
    celery.conf.ONCE = {
        'backend': 'celery_once.backends.Redis',
        'settings': {
            'url': 'redis://localhost:6379/0',
            'default_timeout': 60 * 60
         }
    }
    
    @celery.task(base=QueueOnce)
    def do_stuff_for_some_time(some_id):
        e = Model.objects.get(id=some_id)
        e.domanystuff()
    

    At this point, if a task with the same arguments is going to be executed,
    an AlreadyQueued exception will be raised.

  • Let’s use the above:

    from django_celery_results.models import TaskResult
    
    try:
        result = do_stuff_for_some_time(some_id)
    except AlreadyQueued:
        result = TaskResult.objects.get(task_args=some_id)
    

Caveats:

  • Mind that at the time an AlreadyQueued exception arises, the initial task with argument=some_id may not be executed and therefore it will not have results in TaskResult table.

  • Mind everything in your code that can go wrong and hang any of the above processes (because it will do that!).

Extra Reading:

2👍

I am not really sure if celery has such an option. However, I would like to suggest a work-around.

1) Create a model for all the celery tasks being queued. In that model, save the task_name, queue_name as well as the parameters

2) Use a get_or_create on that model for every celery task that is ready to be queued.

3) If created = True from step 2, allow the task to be added to the queue, else do not add the task into the queue

Leave a comment