22👍
You can either write a custom loader, or use the signals.
Loaders have the on_task_init
method, which is called when a task is about to be executed,
and on_worker_init
which is called by the celery+celerybeat main process.
Using signals is probably the easiest, the signals available are:
0.8.x:
-
task_prerun(task_id, task, args, kwargs)
Dispatched when a task is about to be executed by the worker (or locally
if usingapply
/or ifCELERY_ALWAYS_EAGER
has been set). -
task_postrun(task_id, task, args, kwargs, retval)
Dispatched after a task has been executed in the same conditions as above. -
task_sent(task_id, task, args, kwargs, eta, taskset)
Called when a task is applied (not good for long-running operations)
Additional signals available in 0.9.x (current master branch on github):
-
worker_init()
Called when celeryd has started (before the task is initialized, so if on a
system supportingfork
, any memory changes would be copied to the child
worker processes). -
worker_ready()
Called when celeryd is able to receive tasks.
-
worker_shutdown()
Called when celeryd is shutting down.
Here’s an example precalculating something the first time a task is run in the process:
from celery.task import Task
from celery.registry import tasks
from celery.signals import task_prerun
_precalc_table = {}
class PowersOfTwo(Task):
def run(self, x):
if x in _precalc_table:
return _precalc_table[x]
else:
return x ** 2
tasks.register(PowersOfTwo)
def _precalc_numbers(**kwargs):
if not _precalc_table: # it's empty, so haven't been generated yet
for i in range(1024):
_precalc_table[i] = i ** 2
# need to use registered instance for sender argument.
task_prerun.connect(_precalc_numbers, sender=tasks[PowerOfTwo.name])
If you want the function to be run for all tasks, just skip the sender
argument.