multiprocessing.Pipe是否必须通过继承传递给子进程
Does multiprocessing.Pipe have to be passed to subprocess through inheritance
我理解 multiprocessing.Queue
has to be passed to subprocess through inheritance。但是,当我尝试通过消息传递将 Pipe
传递给子进程时,就像下面的代码一样,我得到的错误并不是说 "Pipe can only be shared between processes through inheritance"。相反,它在 q.get()
处失败并且错误显示 TypeError: Required argument 'handle' (pos 1) not found
。我想知道是否有可能这样做?假设管道是使用 linux 命名管道实现的,那么重要的是管道的名称,它可能是要序列化并在进程之间传递的状态,对吧?
from multiprocessing import Process, Pipe, Queue
def reader(q):
output_p = q.get()
msg = output_p.recv()
while msg is not None:
msg = output_p.recv()
if __name__ == '__main__':
q = Queue()
reader_p = Process(target=reader, args=(q,))
reader_p.start() # Launch the reader process
output_p, input_p = Pipe(True)
q.put(output_p)
input_p.send('MyMessage')
input_p.send(None)
reader_p.join()
这是 bug,已在 Python 3.
中修复
您在 Python 3 中的代码可以完美运行。
noxadofox 在这里给出了正确的答案。我正在添加一个我设计的示例来验证管道不需要继承。在此示例中,我在执行程序启动其两个进程后创建了第二个管道,并将其作为参数传递给现有进程。
""" Multiprocessing pipe and queue test """
import multiprocessing
import concurrent.futures
import time
class Example:
def __init__(self):
manager = multiprocessing.Manager()
q = manager.Queue()
executor = concurrent.futures.ProcessPoolExecutor(max_workers=2)
pipe_out_1, pipe_in_1 = multiprocessing.Pipe(duplex=True)
executor.submit(self.requester, q, pipe_in_1)
executor.submit(self.worker, q, pipe_out_1)
print(executor._processes)
pipe_out_2, pipe_in_2 = multiprocessing.Pipe(duplex=True)
executor.submit(self.requester, q, pipe_in_2)
executor.submit(self.worker, q, pipe_out_2)
print(executor._processes)
@staticmethod
def worker(q, pipe_out):
task = q.get()
print('worker got task {}'.format(task))
pipe_out.send(task + '-RESPONSE')
print('loop_proc sent')
@staticmethod
def requester(q, pipe_in):
q.put('TASK')
response = pipe_in.recv()
print('requester got response {}'.format(response))
time.sleep(2)
if __name__ == '__main__':
Example()
time.sleep(30)
我理解 multiprocessing.Queue
has to be passed to subprocess through inheritance。但是,当我尝试通过消息传递将 Pipe
传递给子进程时,就像下面的代码一样,我得到的错误并不是说 "Pipe can only be shared between processes through inheritance"。相反,它在 q.get()
处失败并且错误显示 TypeError: Required argument 'handle' (pos 1) not found
。我想知道是否有可能这样做?假设管道是使用 linux 命名管道实现的,那么重要的是管道的名称,它可能是要序列化并在进程之间传递的状态,对吧?
from multiprocessing import Process, Pipe, Queue
def reader(q):
output_p = q.get()
msg = output_p.recv()
while msg is not None:
msg = output_p.recv()
if __name__ == '__main__':
q = Queue()
reader_p = Process(target=reader, args=(q,))
reader_p.start() # Launch the reader process
output_p, input_p = Pipe(True)
q.put(output_p)
input_p.send('MyMessage')
input_p.send(None)
reader_p.join()
这是 bug,已在 Python 3.
中修复您在 Python 3 中的代码可以完美运行。
noxadofox 在这里给出了正确的答案。我正在添加一个我设计的示例来验证管道不需要继承。在此示例中,我在执行程序启动其两个进程后创建了第二个管道,并将其作为参数传递给现有进程。
""" Multiprocessing pipe and queue test """
import multiprocessing
import concurrent.futures
import time
class Example:
def __init__(self):
manager = multiprocessing.Manager()
q = manager.Queue()
executor = concurrent.futures.ProcessPoolExecutor(max_workers=2)
pipe_out_1, pipe_in_1 = multiprocessing.Pipe(duplex=True)
executor.submit(self.requester, q, pipe_in_1)
executor.submit(self.worker, q, pipe_out_1)
print(executor._processes)
pipe_out_2, pipe_in_2 = multiprocessing.Pipe(duplex=True)
executor.submit(self.requester, q, pipe_in_2)
executor.submit(self.worker, q, pipe_out_2)
print(executor._processes)
@staticmethod
def worker(q, pipe_out):
task = q.get()
print('worker got task {}'.format(task))
pipe_out.send(task + '-RESPONSE')
print('loop_proc sent')
@staticmethod
def requester(q, pipe_in):
q.put('TASK')
response = pipe_in.recv()
print('requester got response {}'.format(response))
time.sleep(2)
if __name__ == '__main__':
Example()
time.sleep(30)