Python 多处理池队列通信
Python multiprocessing Pool Queues communication
我正在尝试实现一个包含两个进程的池,这两个进程 运行 并行并通过队列进行通信。
目标是让一个 writer 进程通过使用 队列.
每个进程都在终端上打印反馈,以便得到反馈。
代码如下:
#!/usr/bin/env python
import os
import time
import multiprocessing as mp
import Queue
def writer(queue):
pid = os.getpid()
for i in range(1,4):
msg = i
print "### writer ", pid, " -> ", msg
queue.put(msg)
time.sleep(1)
msg = 'Done'
print '### '+msg
queue.put(msg)
def reader(queue):
pid = os.getpid()
time.sleep(0.5)
while True:
print "--- reader ", pid, " -> ",
msg = queue.get()
print msg
if msg == 'Done':
break
if __name__ == "__main__":
print "Initialize the experiment PID: ", os.getpid()
mp.freeze_support()
queue = mp.Queue()
pool = mp.Pool()
pool.apply_async(writer, (queue))
pool.apply_async(reader, (queue))
pool.close()
pool.join()
我期望的输出应该是这样的:
Initialize the experiment PID: 2341
writer 2342 -> 1
reader 2343 -> 1
writer 2342 -> 2
reader 2343 -> 2
writer 2342 -> 3
reader 2343 -> 3
Done
但是我只得到一行:
Initialize the experiment PID: 2341
然后脚本退出。
通过队列通信的池中两个进程的进程间通信的正确实现方式是什么?
我用mp.Manager().Queue()
作为队列,因为不能直接传Queue
。尝试直接使用 Queue
导致异常,但由于我们使用 apply_async
而未得到处理。
我将您的代码更新为:
#!/usr/bin/env python
import os
import time
import multiprocessing as mp
import Queue
def writer(queue):
pid = os.getpid()
for i in range(1,4):
msg = i
print "### writer ", pid, " -> ", msg
queue.put(msg)
time.sleep(1)
msg = 'Done'
print '### '+msg
queue.put(msg)
def reader(queue):
pid = os.getpid()
time.sleep(0.5)
while True:
print "--- reader ", pid, " -> ",
msg = queue.get()
print msg
if msg == 'Done':
break
if __name__ == "__main__":
print "Initialize the experiment PID: ", os.getpid()
manager = mp.Manager()
queue = manager.Queue()
pool = mp.Pool()
pool.apply_async(writer, (queue,))
pool.apply_async(reader, (queue,))
pool.close()
pool.join()
我得到了这个输出:
Initialize the experiment PID: 46182
### writer 46210 -> 1
--- reader 46211 -> 1
### writer 46210 -> 2
--- reader 46211 -> 2
### writer 46210 -> 3
--- reader 46211 -> 3
### Done
--- reader 46211 -> Done
相信这就是您所期望的。
我正在尝试实现一个包含两个进程的池,这两个进程 运行 并行并通过队列进行通信。
目标是让一个 writer 进程通过使用 队列.
每个进程都在终端上打印反馈,以便得到反馈。
代码如下:
#!/usr/bin/env python
import os
import time
import multiprocessing as mp
import Queue
def writer(queue):
pid = os.getpid()
for i in range(1,4):
msg = i
print "### writer ", pid, " -> ", msg
queue.put(msg)
time.sleep(1)
msg = 'Done'
print '### '+msg
queue.put(msg)
def reader(queue):
pid = os.getpid()
time.sleep(0.5)
while True:
print "--- reader ", pid, " -> ",
msg = queue.get()
print msg
if msg == 'Done':
break
if __name__ == "__main__":
print "Initialize the experiment PID: ", os.getpid()
mp.freeze_support()
queue = mp.Queue()
pool = mp.Pool()
pool.apply_async(writer, (queue))
pool.apply_async(reader, (queue))
pool.close()
pool.join()
我期望的输出应该是这样的:
Initialize the experiment PID: 2341
writer 2342 -> 1
reader 2343 -> 1
writer 2342 -> 2
reader 2343 -> 2
writer 2342 -> 3
reader 2343 -> 3
Done
但是我只得到一行:
Initialize the experiment PID: 2341
然后脚本退出。
通过队列通信的池中两个进程的进程间通信的正确实现方式是什么?
我用mp.Manager().Queue()
作为队列,因为不能直接传Queue
。尝试直接使用 Queue
导致异常,但由于我们使用 apply_async
而未得到处理。
我将您的代码更新为:
#!/usr/bin/env python
import os
import time
import multiprocessing as mp
import Queue
def writer(queue):
pid = os.getpid()
for i in range(1,4):
msg = i
print "### writer ", pid, " -> ", msg
queue.put(msg)
time.sleep(1)
msg = 'Done'
print '### '+msg
queue.put(msg)
def reader(queue):
pid = os.getpid()
time.sleep(0.5)
while True:
print "--- reader ", pid, " -> ",
msg = queue.get()
print msg
if msg == 'Done':
break
if __name__ == "__main__":
print "Initialize the experiment PID: ", os.getpid()
manager = mp.Manager()
queue = manager.Queue()
pool = mp.Pool()
pool.apply_async(writer, (queue,))
pool.apply_async(reader, (queue,))
pool.close()
pool.join()
我得到了这个输出:
Initialize the experiment PID: 46182
### writer 46210 -> 1
--- reader 46211 -> 1
### writer 46210 -> 2
--- reader 46211 -> 2
### writer 46210 -> 3
--- reader 46211 -> 3
### Done
--- reader 46211 -> Done
相信这就是您所期望的。