1š
There are two issues with the current code. 1) with multiprocess (but not threading), the qsize()
function is unreliable ā I suggest donāt use it, as it is confusing. 2) you canāt modify an object directly thatās been taken from a queue.
Consider two processes, sending data back and forth. One wonāt know if the other has modified some data, as data is private. To communicate, send data explicitly, with Queue.put()
or using a Pipe
.
The general way producer/consumer system works is this: 1) jobs are stuff into a queue 2) worker blocks, waiting for work. When a job appears, it puts the result on a different queue. 3) a manager or ābeancounterā process consumes the output from the 2nd queue, and prints it or otherwise processes it.
Have fun!
#!/usr/bin/env python
import logging, multiprocessing, sys
def myproc(arg):
return arg*2
def worker(inqueue, outqueue):
logger = multiprocessing.get_logger()
logger.info('start')
while True:
job = inqueue.get()
logger.info('got %s', job)
outqueue.put( myproc(job) )
def beancounter(inqueue):
while True:
print 'done:', inqueue.get()
def main():
logger = multiprocessing.log_to_stderr(
level=logging.INFO,
)
logger.info('setup')
data_queue = multiprocessing.Queue()
out_queue = multiprocessing.Queue()
for num in range(5):
data_queue.put(num)
worker_p = multiprocessing.Process(
target=worker, args=(data_queue, out_queue),
name='worker',
)
worker_p.start()
bean_p = multiprocessing.Process(
target=beancounter, args=(out_queue,),
name='beancounter',
)
bean_p.start()
worker_p.join()
bean_p.join()
logger.info('done')
if __name__=='__main__':
main()
1š
Iāve got it. I do not know why, but when I tried āthreadingā, it worked!
from Queue import Queue, Empty
import threading
MailLogger = logging.getLogger('mail')
class TaskMaker(threading.Thread):
def __init__(self, que):
threading.Thread.__init__(self)
self.queue = que
def run(self):
while True:
try:
print "start", self.queue.qsize()
_data = self.queue.get()
if _data:
print "make"
_data['function'](*_data['args'], **_data['kwargs'])
except Empty:
pass
except Exception as e:
print e
MailLogger.error(e)
tasks = Queue()
stream = TaskMaker(tasks)
stream.start()
def add_task(func=lambda: None, args=(), kwargs={}):
global tasks
try:
tasks.put_nowait({
'function': func,
'args': args,
'kwargs': kwargs
})
except Exception as e:
print e
MailLogger.error(e)
- [Answered ]-During django unittests, database remains empty
- [Answered ]-Calling tasks recursively on a tree structure