[Fixed]-Is there any way to define task quota in celery?

1👍

Celery has a setting to control the RATE_LIMIT (http://celery.readthedocs.org/en/latest/userguide/tasks.html#Task.rate_limit), it means, the number of task that could be running in a time frame.
You could set this to ‘100/m’ (hundred per second) maning your system allows 100 tasks per seconds, its important to notice, that setting is not per user neither task, its per time frame.
Have you thought about this approach instead of limiting per user?

In order to have a ‘rate_limit’ per task and user pair you will have to do it. I think (not sure) you could use a TaskRouter or a signal based on your needs.
TaskRouters (http://celery.readthedocs.org/en/latest/userguide/routing.html#routers) allow to route tasks to a specify queue aplying some logic.
Signals (http://celery.readthedocs.org/en/latest/userguide/signals.html) allow to execute code in few well-defined points of the task’s scheduling cycle.

An example of Router’s logic could be:

if task == 'A':
    user_id = args[0]  # in this task the user_id is the first arg
    qty = get_task_qty('A', user_id)
    if qty > LIMIT_FOR_A:
        return
elif task == 'B':
    user_id = args[2]  # in this task the user_id is the seconds arg
    qty = get_task_qty('B', user_id)
    if qty > LIMIT_FOR_B:
        return
return {'queue': 'default'}

With the approach above, every time a task starts you should increment by one in some place (for example Redis) the pair user_id/task_type and
every time a task finishes you should decrement that value in the same place.

Its seems kind of complex, hard to maintain and with few failure points for me.

Other approach, which i think could fit, is to implement some kind of ‘Distributed Semaphore’ (similar to distributed lock) per user and task, so in each task which needs to limit the number of task running you could use it.

The idea is, every time a task which should have ‘concurrency control’ starts it have to check if there is some resource available if not just return.

You could imagine this idea as below:

@shared_task
def my_task_A(user_id, arg1, arg2):
    resource_key = 'my_task_A_{}'.format(user_id)
    available = SemaphoreManager.is_available_resource(resource_key)
    if not available:
        # no resources then abort
        return

    try:
        # the resourse could be acquired just before us for other
        if SemaphoreManager.acquire(resource_key):
            #execute your code
    finally:
        SemaphoreManager.release(resource_key)

Its hard to say which approach you SHOULD take because that depends on your application.

Hope it helps you!

Good luck!

Leave a comment