6👍
tinkering around on the shell (ipython’s tab auto-completion) I found that group_task
(which is a celery.result.ResultSet
object) had a method called completed_count
which gave exactly what I needed.
Also found the documentation at http://docs.celeryproject.org/en/latest/reference/celery.result.html#celery.result.ResultSet.completed_count
5👍
Here’s a full working example based on @dalore’s answer.
First tasks.py
.
import time
from celery import Celery, group
app = Celery('tasks', broker='pyamqp://guest@127.0.0.1//', backend='redis://localhost')
@app.task(trail=True)
def add(x, y):
time.sleep(1)
return x + y
@app.task(trail=True)
def group_add(l1, l2):
return group(add.s(x1, x2) for x1, x2 in zip(l1, l2))()
Start redis server using Docker: docker run --name my-redis -p 6379:6379 -d redis
.
Start RabbitMQ using Docker: docker run -d --hostname my-rabbit --name my-rabbit -p 5672:5672 rabbitmq:alpine
.
Start a single process celery worker in a separate shell: celery -A tasks worker --loglevel=info -c 1
.
Then run the test script below.
from tasks import group_add
from tqdm import tqdm
total = 10
l1 = range(total)
l2 = range(total)
delayed_results = group_add.delay(l1, l2)
delayed_results.get() # Wait for parent task to be ready.
results = []
for result in tqdm(delayed_results.children[0], total=total):
results.append(result.get())
print(results)
You should see something like the following with the progress bar increasing by 10% every second.
50%|##### | 5/10 [00:05<00:05, 1.01s/it
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
Finally, clean up your redis and rabbitmq containers.
docker stop my-rabbit my-redis
docker rm my-rabbit my-redis
1👍
Reading the documentation for AsyncResult
there is a collect
method that collects results as they come in.
from celery import group
from proj.celery import app
@app.task(trail=True)
def A(how_many):
return group(B.s(i) for i in range(how_many))()
@app.task(trail=True)
def B(i):
return pow2.delay(i)
@app.task(trail=True)
def pow2(i):
return i ** 2
Example output:
>>> from celery.result import ResultBase
>>> from proj.tasks import A
>>> result = A.delay(10)
>>> [v for v in result.collect()
... if not isinstance(v, (ResultBase, tuple))]
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
Note:
The Task.trail
option must be enabled so that the list of children is stored in result.children
. This is the default but enabled explicitly for illustration.
Edit:
Upon further testing this have found that whilst collect states it will collect results, it still waits. I found that to get the progress you need to get the result of the children, like so:
group_result = mygrouptask.delay().get()
for result in tqdm(group_result.children, total=count):
yield result.get()
tqdm displays a progress in the console
The mygrouptask is a returning a celery group like so:
return group(mytask.s(arg) for arg in args)()