[Django]-Make sure only one worker launches the apscheduler event in a pyramid web app running multiple workers

64šŸ‘

āœ…

Because Gunicorn is starting with 8 workers (in your example), this forks the app 8 times into 8 processes. These 8 processes are forked from the Master process, which monitors each of their status & has the ability to add/remove workers.

Each process gets a copy of your APScheduler object, which initially is an exact copy of your Master processesā€™ APScheduler. This results in each ā€œnthā€ worker (process) executing each job a total of ā€œnā€ times.

A hack around this is to run gunicorn with the following options:

env/bin/gunicorn module_containing_app:app -b 0.0.0.0:8080 --workers 3 --preload

The --preload flag tells Gunicorn to ā€œload the app before forking the worker processesā€œ. By doing so, each worker is ā€œgiven a copy of the app, already instantiated by the Master, rather than instantiating the app itselfā€. This means the following code only executes once in the Master process:

rerun_monitor = Scheduler()
rerun_monitor.start()
rerun_monitor.add_interval_job(job_to_be_run,\
            seconds=JOB_INTERVAL)

Additionally, we need to set the jobstore to be anything other than :memory:.This way, although each worker is its own independent process unable of communicating with the other 7, by using a local database (rather then memory) we guarantee a single-point-of-truth for CRUD operations on the jobstore.

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore

rerun_monitor = Scheduler(
    jobstores={'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')})
rerun_monitor.start()
rerun_monitor.add_interval_job(job_to_be_run,\
            seconds=JOB_INTERVAL)

Lastly, we want to use the BackgroundScheduler because of its implementation of start(). When we call start() in the BackgroundScheduler, a new thread is spun up in the background, which is responsible for scheduling/executing jobs. This is significant because remember in step (1), due to our --preload flag we only execute the start() function once, in the Master Gunicorn process. By definition, forked processes do not inherit the threads of their Parent, so each worker doesnā€™t run the BackgroundScheduler thread.

from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore

rerun_monitor = BackgroundScheduler(
    jobstores={'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')})
rerun_monitor.start()
rerun_monitor.add_interval_job(job_to_be_run,\
            seconds=JOB_INTERVAL)

As a result of all of this, every Gunicorn worker has an APScheduler that has been tricked into a ā€œSTARTEDā€ state, but actually isnā€™t running because it drops the threads of itā€™s parent! Each instance is also capable of updating the jobstore database, just not executing any jobs!

Check out flask-APScheduler for a quick way to run APScheduler in a web-server (like Gunicorn), and enable CRUD operations for each job.

šŸ‘¤The Aelfinn

28šŸ‘

I found a fix that worked with a Django project having a very similar issue. I simply bind a TCP socket the first time the scheduler starts and check against it subsequently. I think the following code can work for you as well with minor tweaks.

import sys, socket

try:
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.bind(("127.0.0.1", 47200))
except socket.error:
    print "!!!scheduler already started, DO NOTHING"
else:
    from apscheduler.schedulers.background import BackgroundScheduler
    scheduler = BackgroundScheduler()
    scheduler.start()
    print "scheduler started"
šŸ‘¤Paolo

3šŸ‘

Short answer: You canā€™t do it properly without consequences.

Iā€™m using Gunicorn as an example, but it is essentially the same for uWSGI. There are various hacks when running multiple processes, to name a few:

  1. use --preload option
  2. use on_starting hook to start the APScheduler background scheduler
  3. use when_ready hook to start the APScheduler background scheduler

They work to some extent but may get the following errors:

  1. worker timing out frequently
  2. scheduler hanging when there are no jobs https://github.com/agronholm/apscheduler/issues/305

APScheduler is designed to run in a single process where it has complete control over the process of adding jobs to job stores. It uses threading.Eventā€˜s wait() and set() methods to coordinate. If they are run by different processes, the coordination wouldnā€™t work.

It is possible to run it in Gunicorn in a single process.

  1. use only one worker process
  2. use the post_worker_init hook to start the scheduler, this will make sure the scheduler is run only in the worker process but not the master process

The author also pointed out sharing the job store amount multiple processes isnā€™t possible. https://apscheduler.readthedocs.io/en/stable/faq.html#how-do-i-share-a-single-job-store-among-one-or-more-worker-processes He also provided a solution using RPyC.

While itā€™s entirely doable to wrap APScheduler with a REST interface. You might want to consider serving it as a standalone app with one worker. In another word, if you have others endpoints, put them in another app where you can use multiple workers.

šŸ‘¤Xuan

Leave a comment