[Django]-Python Celery group() – TypeError: […] argument after ** must be a mapping, not long

4👍

subtask takes a tuple of args, a dict of kwargs and task options so it should be called like this:

    all_tasks.append(write_email_bunch.subtask((some_users, execnum)))

note that we are passing it a tuple containing the args

Also you shouldn’t wait on a task inside a task – this can cause deadlocks. In this case I reckon daily_emails does not need to be a celery task – it can be a regular function that creates a canvas object and runs apply async.

def daily_emails():

    all_tasks = []

    for chunk in range(0, users.count(), 1000):
        some_users = users[chunk:chunk+1000]
        all_tasks.append(write_email_bunch.subtask(some_users, execnum))

    job = group(all_tasks)
    result = job.apply_async()
    return result.id

2👍

In addition to the other answer you could be using chunks here:
http://docs.celeryproject.org/en/latest/userguide/canvas.html#chunks

@app.task
def daily_emails():
    return write_email.chunks(users, 1000).delay()

@task
def write_email(user):
    [...]

It may be beneficial to do it manually if getting several objects at once
from the db is important. You should also consider that the model objects will be serialized here, to avoid that you can send the pk only and refetch the model in the task, or send the fields that you care about (like email address or whatever is required to send that email to the user).

👤asksol

Leave a comment