[Django]-Celery 4.0.0 and Class based task workflow

8πŸ‘

βœ…

I’m not sure if this is the best solution, but you can use a workaround to get the behavior you want. I’m doing something similar so that I can stick an error handler on the task in a clearer way.

From the docs 1

The best practice is to use custom task classes only for overriding
general behavior, and then using the task decorator to realize the
task:

@app.task(bind=True, base=CustomTask) 
def custom(self):
    print('running')

But you can put all of your task code inside CustomTask and just leave a stub in the decorated task declaration. You do have to call into the superclass of your task in the stub like so:

@app.task(bin=True, base=CustomTask)
def custom(self, *args):
    super(type(self), self).run(*args)

You then treat the decorated function declaration as a way to call into celery’s task registration machinery. I hope something cleaner comes out in 5.0 though.

πŸ‘€shadow chris

9πŸ‘

If you use celery-4.0.1, then you should check documentation that chris 1 pointed out docs

The Task class is no longer using a special meta-class that automatically registers the task in the task registry.

Now you should register your task like this

class CustomTask(Task):
    def run(self):
        print('running')
app.register_task(CustomTask())
πŸ‘€vvkuznetsov

Leave a comment