[Answer]-Python multiprocess Process never terminates

0đź‘Ť

âś…

Ok after some fiddling (and a good night’s sleep) I believe I’ve figured out the problem (and thank you Eri, you were the inspiration I needed). The main issue of the zombie processes was that I was not signaling back that the process was finished (and killing it) both of which I (naively) thought was happening automagically with multiprocess.

The code that worked:

# function that will be run through the pool
def process_request(req):
    try:
            dr = urllib2.urlopen(req, timeout=30)

    except Exception, e:
            logging.error(traceback.print_exc())

# process killer
def sig_end(r):
    sys.exit()

# globals
MP_CONCURRENT = int(multiprocessing.cpu_count()) * 2
if MP_CONCURRENT < 2: MP_CONCURRENT = 2
CHUNK_SIZE = 20
POOL = multiprocessing.Pool(MP_CONCURRENT)    

# pool initiator
def request_manager(req_list):
    try:
            resp = POOL.map_async(process_request, req_list, CHUNK_SIZE, callback=sig_end)

    except Exception, e:
            logging.error(traceback.print_exc())

A couple of notes:

1) The function that will be hit by “map_async” (“process_request” in this example) must be defined first (and before the global declarations).

2) There is probably a more graceful way to exit the process (suggestions welcome).

3) Using pool in this example really was best (thanks again Eri) due to the “callback” feature which allows me to throw a signal right away.

👤Kruunch Arz

1đź‘Ť

Maybe i am not right, but

MP_CONCURRENT = int(multiprocessing.cpu_count()) * 2
if MP_CONCURRENT < 2: MP_CONCURRENT = 2
MPQ = multiprocessing.JoinableQueue(MP_CONCURRENT)



def request_manager(req_list):
    try:
            # put request list in the queue
            pool=[]
            for req in req_list:
                    MPQ.put(req)

                    # call processes on queue
                    worker = multiprocessing.Process(target=process_request, args=(MPQ,))
                    worker.daemon = True
                    worker.start()
                    pool.append(worker)

            # move on after queue is empty
            MPQ.join()
            # Close not needed processes
            for p in pool: p.terminate()

    except Exception, e:
            logging.error(traceback.print_exc())


# prcoess requests in queue
def process_request(MPQ):
    try:
            while True:
                    req = MPQ.get()
                    dr = urllib2.urlopen(req)
                    MPQ.task_done()

    except Exception, e:
            logging.error(traceback.print_exc())
👤eri

0đź‘Ť

MP_CONCURRENT = int(multiprocessing.cpu_count()) * 2
if MP_CONCURRENT < 2: MP_CONCURRENT = 2
MPQ = multiprocessing.JoinableQueue(MP_CONCURRENT)
CHUNK_SIZE = 20 #number of requests sended to one process.
pool = multiprocessing.Pool(MP_CONCURRENT)

def request_manager(req_list):
    try:
            # put request list in the queue
            responce=pool.map(process_request,req_list,CHUNK_SIZE) # function exits after all requests called and pool work ended
    # OR
            responce=pool.map_async(process_request,req_list,CHUNK_SIZE) #function request_manager exits after all requests passed to pool

    except Exception, e:
            logging.error(traceback.print_exc())


# prcoess requests in queue
def process_request(req):
    dr = urllib2.urlopen(req)

This works ~5-10x faster then your code

👤eri

0đź‘Ť

Integrate side “brocker” to django (such as rabbitmq or something like it).

👤eri

Leave a comment