使用 aiomultiprocess 在 Python 中进行多进程管理

Mulitprocess management in Python with aiomultiprocess

我在 Python 中遇到多处理问题。我需要创建异步进程,其中 运行 未定义的时间和进程数也未定义。一旦新请求到达,就必须使用请求中的规范创建一个新流程。我们使用 ZeroMQ 进行消息传递。还有一个从头开始的进程,只有在整个脚本终止时才结束。

现在我正在寻找一种解决方案,如何等待所有进程,同时能够添加其他进程。

asyncio.gather()

这是我的第一个想法,但它需要进程列表才能被调用。

class Object:
  def __init__(self, var):
     self.var = var

  async def run(self):
      *do async things*

class object_controller:
  
  def __init__(self):
     self.ctx = zmq.Context()
     self.socket = self.ctx.socket(zmq.PULL)
     self.socket.connect("tcp://127.0.0.1:5558")

     self.static_process = AStaticProcess()
     self.sp = aiomultiprocess.Process(target=self.static_process.run)
     self.sp.start()
     #here I need a good way to await this process


  def process(self, var):
    object = Object(var)
    process = aiomultiprocess.Process(target=object.run)
    process.start()
  
  def listener(self)
    while True:
      msg = self.socket.recv_pyobj()
      # here I need to find a way how I can start and await this process while beeing able to 
      # receive additional request, which result in additional processes which need to be awaited

这是一些有望解释我的问题的代码。我需要一种等待进程的收集器。

初始化之后,对象和控制器之间没有交互,只通过zeroMQ(静态进程和变量进程之间)。也没有return.

您需要为流程创建任务列表或未来对象。您也不能在等待其他任务时将进程添加到事件循环

如果您需要在等待新进程的同时启动进程,而不是显式调用 await 来知道进程何时完成,让它们使用 asyncio.create_task(). This will return a Task object, which has an add_done_callback 方法在后台执行,这进程完成后,您可以用来做一些工作:

class Object:
  def __init__(self, var):
     self.var = var

  async def run(self):
      *do async things*

class object_controller:
  
  def __init__(self):
     self.ctx = zmq.Context()
     self.socket = self.ctx.socket(zmq.PULL)
     self.socket.connect("tcp://127.0.0.1:5558")

     self.static_process = AStaticProcess()
     self.sp = aiomultiprocess.Process(target=self.static_process.run)
     self.sp.start()
     asyncio.create_task(self.sp.join() self.handle_proc_finished)


  def process(self, var):
    object = Object(var)
    process = aiomultiprocess.Process(target=object.run)
    process.start()
  
  def listener(self)
    while True:
      msg = self.socket.recv_pyobj()
      process = aiomultiprocess.Process(...)
      process.start()
      t = asyncio.create_task(process.join())
      t.add_done_callback(self.handle_other_proc_finished)

  def handle_proc_finished(self, task):
     # do something

  def handle_other_proc_finished(self, task):
    # do something else

如果您想避免使用回调,您还可以传递 create_task 一个您自己定义的协程,它会等待进程完成并执行之后需要完成的任何事情。

self.sp.start()
asyncio.create_task(wait_for_proc(self.sp))

async def wait_for_proc(proc):
   await proc.join()
   # do other stuff