[Answered ]-Django multiprocessing and empty queue after put

1šŸ‘

There are two issues with the current code. 1) with multiprocess (but not threading), the qsize() function is unreliable ā€” I suggest donā€™t use it, as it is confusing. 2) you canā€™t modify an object directly thatā€™s been taken from a queue.

Consider two processes, sending data back and forth. One wonā€™t know if the other has modified some data, as data is private. To communicate, send data explicitly, with Queue.put() or using a Pipe.

The general way producer/consumer system works is this: 1) jobs are stuff into a queue 2) worker blocks, waiting for work. When a job appears, it puts the result on a different queue. 3) a manager or ā€˜beancounterā€™ process consumes the output from the 2nd queue, and prints it or otherwise processes it.

Have fun!

#!/usr/bin/env python

import logging, multiprocessing, sys


def myproc(arg):
    return arg*2

def worker(inqueue, outqueue):
    logger = multiprocessing.get_logger()
    logger.info('start')
    while True:
        job = inqueue.get()
        logger.info('got %s', job)
        outqueue.put( myproc(job) )

def beancounter(inqueue):
    while True:
        print 'done:', inqueue.get()

def main():
    logger = multiprocessing.log_to_stderr(
            level=logging.INFO,
    )
    logger.info('setup')

    data_queue = multiprocessing.Queue()
    out_queue = multiprocessing.Queue()

    for num in range(5):
        data_queue.put(num)

    worker_p = multiprocessing.Process(
        target=worker, args=(data_queue, out_queue), 
        name='worker',
    )
    worker_p.start()

    bean_p = multiprocessing.Process(
        target=beancounter, args=(out_queue,),
        name='beancounter',
        )
    bean_p.start()

    worker_p.join()
    bean_p.join()
    logger.info('done')


if __name__=='__main__':
    main()
šŸ‘¤johntellsall

1šŸ‘

Iā€™ve got it. I do not know why, but when I tried ā€œthreadingā€, it worked!

from Queue import Queue, Empty
import threading

MailLogger = logging.getLogger('mail')


class TaskMaker(threading.Thread):

    def __init__(self, que):
        threading.Thread.__init__(self)
        self.queue = que

    def run(self):
        while True:
            try:
                print "start", self.queue.qsize()
                _data = self.queue.get()
                if _data:
                    print "make"
                    _data['function'](*_data['args'], **_data['kwargs'])
            except Empty:
                pass
            except Exception as e:
                print e
                MailLogger.error(e)

tasks = Queue()
stream = TaskMaker(tasks)
stream.start()


def add_task(func=lambda: None, args=(), kwargs={}):
    global tasks
    try:
        tasks.put_nowait({
            'function': func,
            'args': args,
            'kwargs': kwargs
        })

    except Exception as e:
        print e
        MailLogger.error(e)

Leave a comment