[Answered ]-Calling tasks recursively on a tree structure

2👍

Use chords to execute a task that is dependent upon the results of a bunch of tasks.

as i could not understand exactly how you needed the recursive tasks to be called, i implemented a reference example of merge-sort.

note that this will not work on celery 3.2.0+ as calling get inside a task will result in Exception.

from celery import Celery, chord
app = Celery('tasks', backend='amqp', broker='amqp://')
app.conf.CELERY_RESULT_BACKEND = 'amqp'


def mergesort(list_obj):
    '''normal mergesort
    '''
    if len(list_obj) <= 1:
        return list_obj
    middle = len(list_obj) / 2
    left, right = list_obj[:middle], list_obj[middle:]
    return list(merge(list(mergesort(left)), list(mergesort(right))))

def merge(left, right):
    '''normal merge
    '''
    while 1:
        if left == []:
            for j in right:
                yield j
            break
        elif right == []:
            for j in left:
                yield j
            break
        elif left[0] < right[0]:
            yield left.pop(0)
        else:
            yield right.pop(0)

def merge2(left_r, right_r):
    '''celery merge
    '''
    left =left_r.get()
    right = right_r.get()
    while 1:
        if left == []:
            for j in right:
                yield j
            break
        elif right == []:
            for j in left:
                yield j
            break
        elif left[0] < right[0]:
            yield left.pop(0)
        else:
            yield right.pop(0)
@app.task
def merge_c(in_list):
    '''celery merge
    '''
    #unpack
    print '*'*21 + str( in_list)
    left, right = in_list
    return list(merge2(left, right))

@app.task
def same_object(l_obj):
    '''helper function to convert list to `result`
    '''
    return l_obj

@app.task
def mergesort_c(list_obj):
    '''celery mergesort
    '''
    if len(list_obj) <= 1:
        # make sure that you return a `result` object for merge
        return same_object.delay(list_obj)
    middle = len(list_obj) / 2
    left, right = list_obj[:middle], list_obj[middle:]
    # finish mergesort (left) and mergesort(right) and merge them
    res = chord([mergesort_c.s(left), mergesort_c.s(right)])(merge_c.s())
    return res

if __name__ == '__main__':
    l = [2,1, 3]
    #normal mergesort
    print mergesort(l) #[1, 2, 3, 3, 5]
    # with celery
    res = mergesort_c(l)
    print res.get()

Leave a comment