来自一个多处理管理器的多个队列
Multiple queues from one multiprocessing Manager
我正在编写一个将使用 python 的多处理和线程模块的脚本。
为了您的理解,我生成了与可用核心一样多的进程,并且在我启动的每个进程中,例如25 个线程。
每个线程从 input_queue
中消耗并生成 output_queue
。
对于队列对象,我使用 multiprocessing.Queue
。
在我的第一次测试后,我遇到了死锁,因为负责填充和刷新队列的线程挂起。一段时间后,我发现我可以使用 Queue().cancel_join_thread()
来解决这个问题。
但是由于数据丢失的可能性我想使用:multiprocessing.Manager().Queue()
现在实际问题:
为每个队列使用一个管理器对象是否更好?还是我应该创建一个管理器并从同一个管理器对象中获取两个问题?
# One manager for all queues
import multiprocessing
manager = multiprocessing.Manager()
input_queue = manager.Queue()
output_queue = manager.Queue()
...Magic...
# As much managers as queues
manager_in = multiprocessing.Manager()
queue_in = manager_in.Queue()
manager_out = multiprocessing.Manager()
queue_out = manager_out.Queue()
...Magic...
感谢您的帮助。
无需使用两个单独的 Manager
对象。正如您已经看到的,Manager
对象允许在多个进程之间共享对象;来自 docs:
Managers provide a way to create data which can be shared between different processes. A manager object controls a server process which manages shared objects. Other processes can access the shared objects by using proxies.
因此,如果您有两个不同的队列,您仍然可以使用同一个管理器。如果它对某人有帮助,这里是一个简单的例子,使用两个队列和一个经理:
from multiprocessing import Manager, Process
import time
class Worker(Process):
"""
Simple worker.
"""
def __init__(self, name, in_queue, out_queue):
super(Worker, self).__init__()
self.name = name
self.in_queue = in_queue
self.out_queue = out_queue
def run(self):
while True:
# grab work; do something to it (+1); then put the result on the output queue
work = self.in_queue.get()
print("{} got {}".format(self.name, work))
work += 1
# sleep to allow the other workers a chance (b/c the work action is too simple)
time.sleep(1)
# put the transformed work on the queue
print("{} puts {}".format(self.name, work))
self.out_queue.put(work)
if __name__ == "__main__":
# construct the queues
manager = Manager()
inq = manager.Queue()
outq = manager.Queue()
# construct the workers
workers = [Worker(str(name), inq, outq) for name in range(3)]
for worker in workers:
worker.start()
# add data to the queue for processing
work_len = 10
for x in range(work_len):
inq.put(x)
while outq.qsize() != work_len:
# waiting for workers to finish
print("Waiting for workers. Out queue size {}".format(outq.qsize()))
time.sleep(1)
# clean up
for worker in workers:
worker.terminate()
# print the outputs
while not outq.empty():
print(outq.get())
像这样使用两个管理器:
# construct the queues
manager1 = Manager()
inq = manager1.Queue()
manager2 = Manager()
outq = manager2.Queue()
可以,但没有必要。
我正在编写一个将使用 python 的多处理和线程模块的脚本。
为了您的理解,我生成了与可用核心一样多的进程,并且在我启动的每个进程中,例如25 个线程。
每个线程从 input_queue
中消耗并生成 output_queue
。
对于队列对象,我使用 multiprocessing.Queue
。
在我的第一次测试后,我遇到了死锁,因为负责填充和刷新队列的线程挂起。一段时间后,我发现我可以使用 Queue().cancel_join_thread()
来解决这个问题。
但是由于数据丢失的可能性我想使用:multiprocessing.Manager().Queue()
现在实际问题: 为每个队列使用一个管理器对象是否更好?还是我应该创建一个管理器并从同一个管理器对象中获取两个问题?
# One manager for all queues
import multiprocessing
manager = multiprocessing.Manager()
input_queue = manager.Queue()
output_queue = manager.Queue()
...Magic...
# As much managers as queues
manager_in = multiprocessing.Manager()
queue_in = manager_in.Queue()
manager_out = multiprocessing.Manager()
queue_out = manager_out.Queue()
...Magic...
感谢您的帮助。
无需使用两个单独的 Manager
对象。正如您已经看到的,Manager
对象允许在多个进程之间共享对象;来自 docs:
Managers provide a way to create data which can be shared between different processes. A manager object controls a server process which manages shared objects. Other processes can access the shared objects by using proxies.
因此,如果您有两个不同的队列,您仍然可以使用同一个管理器。如果它对某人有帮助,这里是一个简单的例子,使用两个队列和一个经理:
from multiprocessing import Manager, Process
import time
class Worker(Process):
"""
Simple worker.
"""
def __init__(self, name, in_queue, out_queue):
super(Worker, self).__init__()
self.name = name
self.in_queue = in_queue
self.out_queue = out_queue
def run(self):
while True:
# grab work; do something to it (+1); then put the result on the output queue
work = self.in_queue.get()
print("{} got {}".format(self.name, work))
work += 1
# sleep to allow the other workers a chance (b/c the work action is too simple)
time.sleep(1)
# put the transformed work on the queue
print("{} puts {}".format(self.name, work))
self.out_queue.put(work)
if __name__ == "__main__":
# construct the queues
manager = Manager()
inq = manager.Queue()
outq = manager.Queue()
# construct the workers
workers = [Worker(str(name), inq, outq) for name in range(3)]
for worker in workers:
worker.start()
# add data to the queue for processing
work_len = 10
for x in range(work_len):
inq.put(x)
while outq.qsize() != work_len:
# waiting for workers to finish
print("Waiting for workers. Out queue size {}".format(outq.qsize()))
time.sleep(1)
# clean up
for worker in workers:
worker.terminate()
# print the outputs
while not outq.empty():
print(outq.get())
像这样使用两个管理器:
# construct the queues
manager1 = Manager()
inq = manager1.Queue()
manager2 = Manager()
outq = manager2.Queue()
可以,但没有必要。