[Django]-Celery task duplication issue

5👍

Finally did a fix. Added a lock mechanism to ensure task is only executed once.more details here .

task.py

# ...
import redis
@task.task(ignore_result=True)
def celery_scheduled_campaign(schedule_id):
    LOCK_EXPIRE = 60 * 30 # Lock expires in 30 minutes
    obj = campaign.objects.get(pk=schedule_id)
    my_lock = redis.Redis().lock(obj.campaign_uuid,timeout=LOCK_EXPIRE)
    if my_lock.acquire(blocking=False) and obj.is_complete == False:
        #...
        # Task to run
        #...
        obj.is_complete = True
        my_lock.release()

models.py

# ...
import uuid
class campaign(models.Model):
    # ...
    campaign_uuid  = models.CharField(editable=False, max_length=100)
    is_complete    = models.BooleanField(default=False)
    # ...
    def save(self, *args, **kwargs):
            if not self.id:
                self.campaign_uuid = str(uuid.uuid4())
            super(campaign, self).save(*args, **kwargs)

1👍

Make sure all 3 messages are not going to the same port which causes multiple celery instances on the same port.

Leave a comment