For many small projects celery is overkill. For those projects you can use schedule, it’s very easy to use.
With this library you can make any function execute a task periodically:
import schedule
import time
def job():
print("I'm working...")
while True:
The example runs in a blocking manner, but if you look in the FAQ, you will find that you can also run tasks in a parallel thread, such that you are not blocking, and remove the task once not needed anymore:
import threading
import time
from schedule import Scheduler
def run_continuously(self, interval=1):
"""Continuously run, while executing pending jobs at each elapsed
time interval.
@return cease_continuous_run: threading.Event which can be set to
cease continuous run.
Please note that it is *intended behavior that run_continuously()
does not run missed jobs*. For example, if you've registered a job
that should run every minute and you set a continuous run interval
of one hour then your job won't be run 60 times at each interval but
only once.
cease_continuous_run = threading.Event()
class ScheduleThread(threading.Thread):
def run(cls):
while not cease_continuous_run.is_set():
continuous_thread = ScheduleThread()
return cease_continuous_run
Scheduler.run_continuously = run_continuously
Here is an example for usage in a class method:
def foo(self):
if some_condition():
return schedule.CancelJob # a job can dequeue it
# can be put in __enter__ or __init__
self._job_stop = self.scheduler.run_continuously()
logger.debug("doing foo"...)
self.foo() # call foo
self.foo) # schedule foo for running every 5 seconds
# later on foo is not needed any more:
def __exit__(self, exec_type, exc_value, traceback):
# if the jobs are not stop, you can stop them
This answer expands on Oz123’s answer a little bit.
In order to get things working, I created a file called mainapp/jobs.py
to contain my scheduled jobs. Then, in my apps.py
module, I put from . import jobs
in the ready
method. Here’s my entire apps.py
from django.apps import AppConfig
import os
class MainappConfig(AppConfig):
name = 'mainapp'
def ready(self):
from . import jobs
if os.environ.get('RUN_MAIN', None) != 'true':
check is because python manage.py runserver
runs the ready
method twice—once in each of two processes—but we only want to run it once.)
Now, here’s what I put in my jobs.py
file. First, the imports. You’ll need to import Scheduler
, threading
and time
as below. The F
and UserHolding
imports are just for what my job does; you won’t import these.
from django.db.models import F
from schedule import Scheduler
import threading
import time
from .models import UserHolding
Next, write the function you want to schedule. The following is purely an example; your function won’t look anything like this.
def give_admin_gold():
admin_gold_holding = (UserHolding.objects
.filter(inventory__user__username='admin', commodity__name='gold'))
admin_gold_holding.update(amount=F('amount') + 1)
Next, monkey-patch the schedule
module by adding a run_continuously
method to its Scheduler
class. Do this by using the below code, which is copied verbatim from Oz123’s answer.
def run_continuously(self, interval=1):
"""Continuously run, while executing pending jobs at each elapsed
time interval.
@return cease_continuous_run: threading.Event which can be set to
cease continuous run.
Please note that it is *intended behavior that run_continuously()
does not run missed jobs*. For example, if you've registered a job
that should run every minute and you set a continuous run interval
of one hour then your job won't be run 60 times at each interval but
only once.
cease_continuous_run = threading.Event()
class ScheduleThread(threading.Thread):
def run(cls):
while not cease_continuous_run.is_set():
continuous_thread = ScheduleThread()
return cease_continuous_run
Scheduler.run_continuously = run_continuously
Finally, define a function to create a Scheduler
object, wire up your job, and call the scheduler’s run_continuously
def start_scheduler():
scheduler = Scheduler()
- [Django]-Custom error message in Django admin actions
- [Django]-Django update queryset with annotation
- [Django]-Django select_for_update cannot be used outside of a transaction
I recommend you use Celery’s task management. You can refer this to set up this app (package if you’re from javaScript background).
Once set, you can alter the code to:
def check_shut_down():
if not some_fun():
# add task that'll run again after 2 secs
check_shut_down.delay((), countdown=3)
# task completed; do something to notify yourself
return True
- [Django]-Filtering using viewsets in django rest framework
- [Django]-What’s the difference between a project and an app in Django world?
- [Django]-Different initial data for each form in a Django formset
I can’t comment on oz123’s (https://stackoverflow.com/a/44897678/1108505) and Tanner Swett’s (https://stackoverflow.com/a/60244694/5378866) excellent post, but as a final note I wanted to add that if you use Gunicorn and you have X number of workers, the section:
from django.apps import AppConfig
import os
class MainappConfig(AppConfig):
name = 'mainapp'
def ready(self):
from . import jobs
if os.environ.get('RUN_MAIN', None) != 'true':
will be executed that same number of times, launching X schedulers at the same time.
If we only want it to run only one instance (for example if you’re going to create objects in the database), we would have to add in our gunicorn.conf.py file something like this:
def on_starting(server):
from app_project import jobs
And finally in the gunicorn call add the argument –preload
- [Django]-Redirect to named url pattern directly from urls.py in django?
- [Django]-Django UniqueConstraint
- [Django]-How to obtain and/or save the queryset criteria to the DB?
Here is my solution, with sources noted. This function will allow you to create a scheduler that you can start with your app, then add and subtract jobs at will. The check_interval variable allows you to trade-off between system resources and job execution timing.
from schedule import Scheduler
import threading
import warnings
import time
class RepeatTimer(threading.Timer):
"""Add repeated run of target to timer functionality. Source: https://stackoverflow.com/a/48741004/16466191"""
running: bool = False
def __init__(self, *args, **kwargs):
threading.Timer.__init__(self, *args, **kwargs)
def start(self) -> None:
"""Protect from running start method multiple times"""
if not self.running:
super(RepeatTimer, self).start()
self.running = True
warnings.warn('Timer is already running, cannot be started again.')
def cancel(self) -> None:
"""Protect from running stop method multiple times"""
if self.running:
super(RepeatTimer, self).cancel()
self.running = False
warnings.warn('Timer is already canceled, cannot be canceled again.')
def run(self):
"""Replace run method of timer to run continuously"""
while not self.finished.wait(self.interval):
self.function(*self.args, **self.kwargs)
class ThreadedScheduler(Scheduler, RepeatTimer):
"""Non-blocking scheduler. Advice taken from: https://stackoverflow.com/a/50465583/16466191"""
def __init__(
run_pending_interval: float,
"""Initialize parent classes"""
super(RepeatTimer, self).__init__(
def print_work(what_to_say: str):
if __name__ == '__main__':
my_schedule = ThreadedScheduler(run_pending_interval=1)
job1 = my_schedule.every(1).seconds.do(print_work, what_to_say='Did_job1')
job2 = my_schedule.every(2).seconds.do(print_work, what_to_say='Did_job2')
- [Django]-How to set a Django model field's default value to a function call / callable (e.g., a date relative to the time of model object creation)
- [Django]-List_display – boolean icons for methods
- [Django]-How does the order of mixins affect the derived class?