15👍
I think I found the answer to this issue: this seems the right way to proceed, indeed. I wonder why such common scenario is not documented anywhere, though.
For completeness I post the basic code snapshot:
@app.task(bind=True) # Note that we need bind=True for self to work
def task1(self, other_args):
#do_stuff
if end_chain:
self.request.callbacks[:] = []
....
Update
I implemented a more elegant way to cope with the issue and I want to share it with you. I am using a decorator called revoke_chain_authority
, so that it can revoke automatically the chain without rewriting the code I previously described.
from functools import wraps
class RevokeChainRequested(Exception):
def __init__(self, return_value):
Exception.__init__(self, "")
# Now for your custom code...
self.return_value = return_value
def revoke_chain_authority(a_shared_task):
"""
@see: https://gist.github.com/bloudermilk/2173940
@param a_shared_task: a @shared_task(bind=True) celery function.
@return:
"""
@wraps(a_shared_task)
def inner(self, *args, **kwargs):
try:
return a_shared_task(self, *args, **kwargs)
except RevokeChainRequested, e:
# Drop subsequent tasks in chain (if not EAGER mode)
if self.request.callbacks:
self.request.callbacks[:] = []
return e.return_value
return inner
This decorator can be used on a shared task
as follows:
@shared_task(bind=True)
@revoke_chain_authority
def apply_fetching_decision(self, latitude, longitude):
#...
if condition:
raise RevokeChainRequested(False)
Please note the use of @wraps
. It is necessary to preserve the signature of the original function, otherwise this latter will be lost and celery
will make a mess at calling the right wrapped task (e.g. it will call always the first registered function instead of the right one)
16👍
As of Celery 4.0, what I found to be working is to remove the remaining tasks from the current task instance’s request using the statement:
self.request.chain = None
Let’s say you have a chain of tasks a.s() | b.s() | c.s()
. You can only access the self
variable inside a task if you bind the task by passing bind=True
as argument to the tasks’ decorator.
@app.task(name='main.a', bind=True):
def a(self):
if something_happened:
self.request.chain = None
If something_happened
is truthy, b
and c
wouldn’t be executed.
- How do I check that user already authenticated from tastypie?
- How to pass the remote IP to a proxied service? – Nginx
- Django – skip first row of array
- Django and models with multiple foreign keys