14👍
My guess is that the ThreadPoolExecutor is not what is creating the DB connection, but the threaded jobs are the ones holding the connection. I’ve had to deal with this already.
I ended up building this wrapper, to ensure that threads are closed manually whenever jobs are done in a ThreadPoolExecutor. This should be useful in ensuring connections are not leaked, so far I haven’t seen any leaking while using this code.
from functools import wraps
from concurrent.futures import ThreadPoolExecutor
from django.db import connection
class DjangoConnectionThreadPoolExecutor(ThreadPoolExecutor):
"""
When a function is passed into the ThreadPoolExecutor via either submit() or map(),
this will wrap the function, and make sure that close_django_db_connection() is called
inside the thread when it's finished so Django doesn't leak DB connections.
Since map() calls submit(), only submit() needs to be overwritten.
"""
def close_django_db_connection(self):
connection.close()
def generate_thread_closing_wrapper(self, fn):
@wraps(fn)
def new_func(*args, **kwargs):
try:
return fn(*args, **kwargs)
finally:
self.close_django_db_connection()
return new_func
def submit(*args, **kwargs):
"""
I took the args filtering/unpacking logic from
https://github.com/python/cpython/blob/3.7/Lib/concurrent/futures/thread.py
so I can properly get the function object the same way it was done there.
"""
if len(args) >= 2:
self, fn, *args = args
fn = self.generate_thread_closing_wrapper(fn=fn)
elif not args:
raise TypeError("descriptor 'submit' of 'ThreadPoolExecutor' object "
"needs an argument")
elif 'fn' in kwargs:
fn = self.generate_thread_closing_wrapper(fn=kwargs.pop('fn'))
self, *args = args
return super(self.__class__, self).submit(fn, *args, **kwargs)
Then you can just use this:
with DjangoConnectionThreadPoolExecutor(max_workers=15) as executor:
results = list(executor.map(func, args_list))
…and be confident that the connections will close.
1👍
The full document is here
The final solution is to find an opportunity to actively close the database connection. Specific to our project, every time a worker thread completes a task, it closes its related connection, because we use ThreadPoolExecutor, so Django can easily do this. a little.
The code is as follows:
from django.db import connections
def on_done(future):
connections.close_all()
def main():
with ThreadPoolExecutor() as executor:
while True:
future = executor.submit(do, get_a_job())
future.add_done_callback(on_done)
- Django Admin, accessing reverse many to many
- ImportError: No module named bootstrap3
- How do I use request.META.get('HTTP_REFERER') within template?
- Why is gunicorn_django not recommended anymore?
- How to have Accent-insensitive filter in django with postgres?
1👍
django.db.close_old_connections()
and CONN_MAX_AGE
doesn’t works when you’re in other threads, it only works for the main thread.
Django always open new connections for new threads, and it leaves there to no one. You should make sure to close then before returning to main thread using db.connections.close_all()
.
from django import db
def compute(job):
result = FooModel.objects.filter(...).aggregate(...)
bar_model = BarModel.objects.create(result)
db.connections.close_all()
return bar_model
-1👍
In my case, neither connection.close()
, nor connections.close_all()
(both inside the job), nor the CONN_HEALTH_CHECKS
or CONN_MAX_AGE
parameters worked.
To solve the problem I had to change the ThreadPoolExecutor
to ProcessPoolExecutor
. As they are independent processes, when the OS kills them the connections are closed.
For the record, I’m using Python 3.11, and Django 4.2.2 with psycopg2 2.9.6