2👍
EDIT: Moved to django-channels now, works well but more complex than solution below.
Previous:
Ok so below is pseudo code for how I’ve solved it for now. Basically I use https://pusher.com/docs/javascript_quick_start and server-side pass the instantiated object into the compute_module
. One downside is that the pusher messages are ephermeral so I’m going to have to do some extra work in LogPusher
to store them in a db, something for another day…
Also in my real implementation I trigger the task via a $.post()
ajax call in $(document).ready()
because small tasks completed so fast the user would never see the pusher messages because the connection wasn’t established (back to that historic message problem).
Another alternative route which I hadn’t mentioned above is https://channels.readthedocs.io/en/latest/
[Edit] Another solutions is Server-sent events which has django implementations, havent tested it. But it looks good for uni-directional updates eg from server to client (vs websockets bidirectional). You would need a messaging system like redis pubsub to get updates to the server sse route.
Front-end updates from django server via pusher:
# views.py
from tasks import run_task
def view_task():
run_task.delay('event')
return render(request, 'template.html', 'pusher_event':'event')
# tasks.py
import pusher
from django.conf import settings
from compute_module import compute_fct
class LogPusher(object):
def __init__(self, event):
self.pusher_client = pusher.Pusher(app_id=settings.PUSHER_APP_ID,
key=settings.PUSHER_KEY,
secret=settings.PUSHER_SECRET,
cluster=settings.PUSHER_CLUSTER, ssl=True)
self.event = event
def send(self, data):
self.pusher_client.trigger(settings.PUSHER_CHANNEL, self.event, json.dumps(data))
@shared_task
def run_task(pusher_event):
log_pusher = LogPusher(pusher_event)
result = compute_fct(log_pusher)
# how to catch status update messages from compute_module while compute_fct is running??
if result == 'error':
log_pusher.send('status':'error')
else:
log_pusher.send('status':'success')
# compute_module.py
import pandas as pd
def compute_fct(log_pusher):
# send message: status = loading file
log_pusher.send('status':'loading file')
df = pd.read_csv('test.csv')
# send message: status = computing
log_pusher.send('status':'computing')
val = df['col'].mean()
if val is None:
return {'status':'error'}
else:
return {'status':'success','val':val}
# context_processors.py
# see https://stackoverflow.com/questions/433162/can-i-access-constants-in-settings-py-from-templates-in-django
from django.conf import settings
def pusher(request):
return {'PUSHER_KEY': settings.PUSHER_KEY, 'PUSHER_CLUSTER': settings.PUSHER_CLUSTER , 'PUSHER_CHANNEL': settings.PUSHER_CHANNEL }
# template.html
<script>
var pusher = new Pusher("{{PUSHER_KEY}}", {
cluster: "{{PUSHER_CLUSTER}}",
encrypted: true
});
var channel = pusher.subscribe("{{PUSHER_CHANNEL}}");
channel.bind("{{pusher_event}}", function(data) {
// process data
});
</script>
1👍
The only way I’ve managed to get realtime statuses is to simply put some SQL writes/api calls into the task itself. Doing things with the return value of the task is far easier since you can just write a custom task class.
I’m not entirely sure how this works using Django but it should look something like this.
class CustomTask(celery.Task):
def __call__(self, *args, **kwargs):
self.start_time = time.time()
def on_success(self, retval, task_id, args, kwargs):
do_success_stuff()
def on_failure(self, exc, task_id, args, kwargs, einfo):
do_failure_stuff()
@shared_task(base=CustomTask)
def do_stuff():
return create_widgets()
The full list can be found here:
http://docs.celeryproject.org/en/latest/userguide/tasks.html#handlers
- Cannot install "psycopg2" on Windows 10 with Python 3.8
- Adding static() to urlpatterns only work by appending to the list
0👍
There is a library called celery-progress that might be helpful
celery-progress library
also he made a blog post about doing it manually:
blog about celery progress bars
- 'AnonymousUser' object is not iterable
- Django RequestFactory file upload
- How to use Channel instead of Group when using django channels?