5π
Iβve given up on finding a nice solution and instead wrote a function that I call and populate CELERY_IMPORTS with. Itβs not nice, but it works.
Hereβs the code for future reference:
import os
def detect_tasks(project_root):
tasks = []
file_path = os.path.join(project_root, 'apps')
for root, dirs, files in os.walk(file_path):
for filename in files:
if os.path.basename(root) == 'tasks':
if filename != '__init__.py' and filename.endswith('.py'):
task = os.path.join(root, filename)\
.replace(os.path.dirname(project_root) + '/', '')\
.replace('/', '.')\
.replace('.py', '')
tasks.append(task)
return tuple(tasks)
And then in settings:
CELERY_IMPORTS = detect_tasks(project_root)
Where project_root would be something like this:
project_root = os.path.dirname(os.path.abspath(__file__))
15π
You could just define __all__
in your tasks/__init__.py
file. Without additional celery settings changes.
For example:
# app/tasks/__init__.py
from .file_with_tasks_1 import task1, task2
from .file_with_tasks_2 import task3
__all__ = [
'task1',
'task2',
'task3',
]
Tested on celery v4.4.6
- [Django]-Django-rest-framework returning 403 response on POST, PUT, DELETE despite AllowAny permissions
- [Django]-How do you dynamically hide form fields in Django?
- [Django]-Django: Reverse for 'detail' with arguments '('',)' and keyword arguments '{}' not found
8π
The only reason that celery defaults to searching tasks.py is the default argument to autodiscover_tasks:
./loaders/base.py:def autodiscover_tasks(packages, related_name='tasks'):
If you use the configuration recommended by the docs you can just call autodiscover_tasks with non-default values for related_name for the different file names where you expect to have tasks. For example here is our celery.py:
from __future__ import absolute_import
import os
from celery import Celery
# set the default Django settings module for the 'celery' program.
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "settings")
from django.conf import settings
app = Celery('app')
# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS, related_name='tasks2')
- [Django]-Create a field whose value is a calculation of other fields' values
- [Django]-Django-rest-framework returning 403 response on POST, PUT, DELETE despite AllowAny permissions
- [Django]-How do I remove Label text in Django generated form?
4π
proj/
app/
tasks/
__init__.py
task1.py
task2.py
...
if your files structure as above, you can do as following
import os
from celery import Celery
from django.conf import settings
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'settings')
app = Celery('celery')
app.config_from_object('django.conf:settings', namespace='CELERY')
for app_name in settings.INSTALLED_APPS:
if app_name.startswith('django'):
continue
for root, dirs, files in os.walk(app_name + '/tasks'):
for file in files:
if file.startswith('__') or file.endswith('.pyc') or not file.endswith('.py'):
continue
file = file[:-3]
app.autodiscover_tasks([app_name + '.tasks'], related_name=file)
- [Django]-Django 2 β How to register a user using email confirmation and CBVs?
- [Django]-How to delete project in django
- [Django]-Nginx doesn't serve static
0π
I tried code snippet from @oloform. It is not working for me. I have got very simple idea. Since celery looks for tasks.py for any tasks. I am defining tasks in any file but I am adding it in tasks.py like,
# tasks.py
from app.utilities.somename import upload_done
from project.celery import app
app.task(upload_done)
It is easily visible in celery logs,
-------------- celery@######## v3.1.18 (Cipater)
---- **** -----
--- * *** * -- ###############
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: project:0x2aae5c0
- ** ---------- .> transport: django://localhost//
- ** ---------- .> results: djcelery.backends.database:DatabaseBackend
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ----
--- ***** ----- [queues]
-------------- .> celery exchange=celery(direct) key=celery
[tasks]
. app.utilities.somename.upload_done
It is dirty way but, it works.
- [Django]-Django rest framework: query parameters in detail_route
- [Django]-Django models.py Circular Foreign Key
- [Django]-Django-Forms with json fields