[Django]-Avoid concurrent access to same queue element

2đź‘Ť

âś…

In this case, it seems like a lot to select the “person” in the database again just to lock it in memory and then write it to the database. This action is not atomic.

You can have the record locked by other process between these actions:

person = People.objects.get(pk=person.id)
person.flag_allocated = True
person.date_of_allocation = datetime.now()
person.save()

That is where your problem is. But… If you directly update the record on the database passing a condition on which the update will only write on a record where the flag_allocated=False, you just have to see if your update affected any row or not. If not, you go to the next person on the queue.

Something like:

for person in people:
    rows = People.objects.filter(pk=person.id, flag_allocated=False).update(flag_allocated=True)
    if rows:
        break # Got the person... And nobody else will.

The update will have the record locked to write it to the allocation_flag (SQL principle). If two updates try to mess with the same row, one will do it first and then the second won’t update anything and will try the next person.

2đź‘Ť

If you’re not doing anything to prevent interference between concurrent processes, they’re bound to step on each others’ toes sooner or later.

A time honored approach to modeling a queue in a database is to register a worker to a particular job in a transactionally consistent manner before executing the job.

Say you have a table work with columns for a job id or specification, an initially null status, and an initially null value for worker. The workers can “register” for a job by running an update such as

 Update `work` set worker = my_worker_id, status=initializing where status is null and worker is null limit 1.

Only one worker can ” register” the next job due to the ” where” clause.

This isn’t perfect – you still have to handle jobs that were orphaned by a failed worker. The status columns, updated on job completion, combined with a heart beat of some kind for the workers, and careful design around job idempotency, would give you the primitives to ensure jobs didn’t get stuck on a failed or AWOL worker.

👤erik258

2đź‘Ť

The canonical solution here would be to use a lock of some sort so you cannot have two concurrent executions of utilitary.recover_people whatsoever – the function waits until it acquire the lock, executes, and releases the lock.

Given that Django is typically served by multiple processes (and you certainly don’t want to change this), and that you don’t want a screwed up call to keep the lock forever, a good solution here is to use something like redis to store the lock (all django processes sharing the same redis db of course), with an expiration set to a reasonable time so it won’t remain set forever.

There are examples of such setups using celery (not that you necessarily needs celery here, it’s just that the principle is the same since it’s a common use case when using celery to avoid concurrent tasks stepping on each other).

You could also just use your SQL database to store the lock but then you don’t have automatic expiration…

Leave a comment