27👍
In order for Celery to know what the current state of the task is, it sets some metadata in whatever result backend you have. You can piggy-back on that to store other kinds of metadata.
def yielder():
for i in range(2**100):
yield i
@task
def report_progress():
for progress in yielder():
# set current progress on the task
report_progress.backend.mark_as_started(
report_progress.request.id,
progress=progress)
def view_function(request):
task_id = request.session['task_id']
task = AsyncResult(task_id)
progress = task.info['progress']
# do something with your current progress
I wouldn’t throw a ton of data in there, but it works well for tracking the progress of a long-running task.
14👍
Paul’s answer is great. As an alternative to using mark_as_started
you can use Task
‘s update_state
method. They ultimately do the same thing, but the name “update_state” is a little more appropriate for what you’re trying to do. You can optionally define a custom state that indicates your task is in progress (I’ve named my custom state ‘PROGRESS’):
def yielder():
for i in range(2**100):
yield i
@task
def report_progress():
for progress in yielder():
# set current progress on the task
report_progress.update_state(state='PROGRESS', meta={'progress': progress})
def view_function(request):
task_id = request.session['task_id']
task = AsyncResult(task_id)
progress = task.info['progress']
# do something with your current progress
- [Django]-Python vs C#/.NET — what are the key differences to consider for using one to develop a large web application?
- [Django]-Import csv data into database in Django Admin
- [Django]-How to set a Django model field's default value to a function call / callable (e.g., a date relative to the time of model object creation)
6👍
Celery part:
def long_func(*args, **kwargs):
i = 0
while True:
yield i
do_something_here(*args, **kwargs)
i += 1
@task()
def test_yield_task(task_id=None, **kwargs):
the_progress = 0
for the_progress in long_func(**kwargs):
cache.set('celery-task-%s' % task_id, the_progress)
Webclient side, starting task:
r = test_yield_task.apply_async()
request.session['task_id'] = r.task_id
Testing last yielded value:
v = cache.get('celery-task-%s' % session.get('task_id'))
if v:
do_someting()
If you do not like to use cache, or it’s impossible, you can use db, file or any other place which celery worker and server side will have both accesss. With cache it’s a simplest solution, but workers and server have to use the same cache.
- [Django]-Django shell mode in docker
- [Django]-Django {{ MEDIA_URL }} blank @DEPRECATED
- [Django]-FileUploadParser doesn't get the file name
4👍
A couple options to consider:
1 — task groups. If you can enumerate all the sub tasks from the time of invocation, you can apply the group as a whole — that returns a TaskSetResult object you can use to monitor the results of the group as a whole, or of individual tasks in the group — query this as-needed when you need to check status.
2 — callbacks. If you can’t enumerate all sub tasks (or even if you can!) you can define a web hook / callback that’s the last step in the task — called when the rest of the task completes. The hook would be against a URI in your app that ingests the result and makes it available via DB or app-internal API.
Some combination of these could solve your challenge.
- [Django]-Site matching query does not exist
- [Django]-Django Multiple Choice Field / Checkbox Select Multiple
- [Django]-Django: Arbitrary number of unnamed urls.py parameters
3👍
See also this great PyCon preso from one of the Instagram engineers.
http://blogs.vmware.com/vfabric/2013/04/how-instagram-feeds-work-celery-and-rabbitmq.html
At video mark 16:00, he discusses how they structure long lists of sub-tasks.
- [Django]-What base_name parameter do I need in my route to make this Django API work?
- [Django]-Defining a model class in Django shell fails
- [Django]-How to access custom HTTP request headers on Django Rest Framework?