multiprocessing.Pipe是否必须通过继承传递给子进程

Does multiprocessing.Pipe have to be passed to subprocess through inheritance

我理解 multiprocessing.Queue has to be passed to subprocess through inheritance。但是,当我尝试通过消息传递将 Pipe 传递给子进程时,就像下面的代码一样,我得到的错误并不是说 "Pipe can only be shared between processes through inheritance"。相反,它在 q.get() 处失败并且错误显示 TypeError: Required argument 'handle' (pos 1) not found。我想知道是否有可能这样做?假设管道是使用 linux 命名管道实现的,那么重要的是管道的名称,它可能是要序列化并在进程之间传递的状态,对吧?

from multiprocessing import Process, Pipe, Queue    

def reader(q):
  output_p = q.get()
  msg = output_p.recv()
  while msg is not None:
    msg = output_p.recv()    

if __name__ == '__main__':
    q = Queue()
    reader_p = Process(target=reader, args=(q,))
    reader_p.start()     # Launch the reader process

    output_p, input_p = Pipe(True)
    q.put(output_p)

    input_p.send('MyMessage')
    input_p.send(None)
    reader_p.join()

这是 bug,已在 Python 3.

中修复

您在 Python 3 中的代码可以完美运行。

noxadofox 在这里给出了正确的答案。我正在添加一个我设计的示例来验证管道不需要继承。在此示例中,我在执行程序启动其两个进程后创建了第二个管道,并将其作为参数传递给现有进程。

""" Multiprocessing pipe and queue test """
import multiprocessing
import concurrent.futures
import time


class Example:
    def __init__(self):
        manager = multiprocessing.Manager()
        q = manager.Queue()
        executor = concurrent.futures.ProcessPoolExecutor(max_workers=2)

        pipe_out_1, pipe_in_1 = multiprocessing.Pipe(duplex=True)
        executor.submit(self.requester, q, pipe_in_1)
        executor.submit(self.worker, q, pipe_out_1)
        print(executor._processes)

        pipe_out_2, pipe_in_2 = multiprocessing.Pipe(duplex=True)
        executor.submit(self.requester, q, pipe_in_2)
        executor.submit(self.worker, q, pipe_out_2)
        print(executor._processes)

    @staticmethod
    def worker(q, pipe_out):
        task = q.get()
        print('worker got task {}'.format(task))
        pipe_out.send(task + '-RESPONSE')
        print('loop_proc sent')

    @staticmethod
    def requester(q, pipe_in):
        q.put('TASK')
        response = pipe_in.recv()
        print('requester got response {}'.format(response))
        time.sleep(2)


if __name__ == '__main__':
    Example()
    time.sleep(30)