47👍
Based on MattH’s answer, you could use a decorator like this:
from django.core.cache import cache
import functools
def single_instance_task(timeout):
def task_exc(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
lock_id = "celery-single-instance-" + func.__name__
acquire_lock = lambda: cache.add(lock_id, "true", timeout)
release_lock = lambda: cache.delete(lock_id)
if acquire_lock():
try:
func(*args, **kwargs)
finally:
release_lock()
return wrapper
return task_exc
then, use it like so…
@periodic_task(run_every=timedelta(minutes=1))
@single_instance_task(60*10)
def fetch_articles()
yada yada...
- [Django]-How to query Case-insensitive data in Django ORM?
- [Django]-Matching query does not exist Error in Django
- [Django]-OSError: [Errno 18] Invalid cross-device link
20👍
Using https://pypi.python.org/pypi/celery_once seems to do the job really nice, including reporting errors and testing against some parameters for uniqueness.
You can do things like:
from celery_once import QueueOnce
from myapp.celery import app
from time import sleep
@app.task(base=QueueOnce, once=dict(keys=('customer_id',)))
def start_billing(customer_id, year, month):
sleep(30)
return "Done!"
which just needs the following settings in your project:
ONCE_REDIS_URL = 'redis://localhost:6379/0'
ONCE_DEFAULT_TIMEOUT = 60 * 60 # remove lock after 1 hour in case it was stale
- [Django]-How to get Request.User in Django-Rest-Framework serializer?
- [Django]-Django: return string from view
- [Django]-Django form fails validation on a unique field
10👍
If you’re looking for an example that doesn’t use Django, then try this example (caveat: uses Redis instead, which I was already using).
The decorator code is as follows (full credit to the author of the article, go read it)
import redis
REDIS_CLIENT = redis.Redis()
def only_one(function=None, key="", timeout=None):
"""Enforce only one celery task at a time."""
def _dec(run_func):
"""Decorator."""
def _caller(*args, **kwargs):
"""Caller."""
ret_value = None
have_lock = False
lock = REDIS_CLIENT.lock(key, timeout=timeout)
try:
have_lock = lock.acquire(blocking=False)
if have_lock:
ret_value = run_func(*args, **kwargs)
finally:
if have_lock:
lock.release()
return ret_value
return _caller
return _dec(function) if function is not None else _dec
- [Django]-What is the right way to validate if an object exists in a django view without returning 404?
- [Django]-Django composite unique on multiple model fields
- [Django]-Django admin make a field read-only when modifying obj but required when adding new obj
6👍
I was wondering why nobody mentioned using celery.app.control.inspect().active() to get the list of the currently running tasks. Is it not real time? Because otherwise it would be very easy to implement, for instance:
def unique_task(callback, *decorator_args, **decorator_kwargs):
"""
Decorator to ensure only one instance of the task is running at once.
"""
@wraps(callback)
def _wrapper(celery_task, *args, **kwargs):
active_queues = task.app.control.inspect().active()
if active_queues:
for queue in active_queues:
for running_task in active_queues[queue]:
# Discard the currently running task from the list.
if task.name == running_task['name'] and task.request.id != running_task['id']:
return f'Task "{callback.__name__}()" cancelled! already running...'
return callback(celery_task, *args, **kwargs)
return _wrapper
And then just applying the decorator to the corresponding tasks:
@celery.task(bind=True)
@unique_task
def my_task(self):
# task executed once at a time.
pass
- [Django]-Resource temporarily unavailable using uwsgi + nginx
- [Django]-Django request get parameters
- [Django]-Django rest framework nested self-referential objects
0👍
This solution for celery working at single host with concurency greater 1. Other kinds (without dependencies like redis) of locks difference file-based don’t work with concurrency greater 1.
class Lock(object):
def __init__(self, filename):
self.f = open(filename, 'w')
def __enter__(self):
try:
flock(self.f.fileno(), LOCK_EX | LOCK_NB)
return True
except IOError:
pass
return False
def __exit__(self, *args):
self.f.close()
class SinglePeriodicTask(PeriodicTask):
abstract = True
run_every = timedelta(seconds=1)
def __call__(self, *args, **kwargs):
lock_filename = join('/tmp',
md5(self.name).hexdigest())
with Lock(lock_filename) as is_locked:
if is_locked:
super(SinglePeriodicTask, self).__call__(*args, **kwargs)
else:
print 'already working'
class SearchTask(SinglePeriodicTask):
restart_delay = timedelta(seconds=60)
def run(self, *args, **kwargs):
print self.name, 'start', datetime.now()
sleep(5)
print self.name, 'end', datetime.now()
- [Django]-Resize fields in Django Admin
- [Django]-Django, Turbo Gears, Web2Py, which is better for what?
- [Django]-Django Forms and Bootstrap – CSS classes and <divs>