[Answered ]-Add and delete Celery periodic tasks at runtime

2👍

You can create a custom scheduler like the one below, adapted from this answer.

from django_celery_beat.schedulers import DatabaseScheduler

class AutoUpdateScheduler(DatabaseScheduler):

    def tick(self, *args, **kwargs):
        if self.schedule_changed():
            self.sync()
            self._heap = None
            new_schedule = self.all_as_schedule()

            if new_schedule:
                to_add = [x for x in new_schedule.keys() if x not in self.schedule.keys()]
                to_remove = [x for x in self.schedule.keys() if x not in new_schedule.keys()]
                for key in to_add:
                    self.schedule[key] = new_schedule[key]
                for key in to_remove:
                    del self.schedule[key]

        super(AutoUpdateScheduler, self).tick(*args, **kwargs)

    @property
    def schedule(self):
        if not self._initial_read and not self._schedule:
            self._initial_read = True
            self._schedule = self.all_as_schedule()
        return self._schedule

When you run celery beat, point it to this class:

celery --app=myproject beat --loglevel=info --scheduler=myproject.scheduler.AutoUpdateScheduler

Leave a comment