使用 aiomultiprocess 在 Python 中进行多进程管理
Mulitprocess management in Python with aiomultiprocess
我在 Python 中遇到多处理问题。我需要创建异步进程,其中 运行 未定义的时间和进程数也未定义。一旦新请求到达,就必须使用请求中的规范创建一个新流程。我们使用 ZeroMQ 进行消息传递。还有一个从头开始的进程,只有在整个脚本终止时才结束。
现在我正在寻找一种解决方案,如何等待所有进程,同时能够添加其他进程。
asyncio.gather()
这是我的第一个想法,但它需要进程列表才能被调用。
class Object:
def __init__(self, var):
self.var = var
async def run(self):
*do async things*
class object_controller:
def __init__(self):
self.ctx = zmq.Context()
self.socket = self.ctx.socket(zmq.PULL)
self.socket.connect("tcp://127.0.0.1:5558")
self.static_process = AStaticProcess()
self.sp = aiomultiprocess.Process(target=self.static_process.run)
self.sp.start()
#here I need a good way to await this process
def process(self, var):
object = Object(var)
process = aiomultiprocess.Process(target=object.run)
process.start()
def listener(self)
while True:
msg = self.socket.recv_pyobj()
# here I need to find a way how I can start and await this process while beeing able to
# receive additional request, which result in additional processes which need to be awaited
这是一些有望解释我的问题的代码。我需要一种等待进程的收集器。
初始化之后,对象和控制器之间没有交互,只通过zeroMQ(静态进程和变量进程之间)。也没有return.
您需要为流程创建任务列表或未来对象。您也不能在等待其他任务时将进程添加到事件循环
如果您需要在等待新进程的同时启动进程,而不是显式调用 await
来知道进程何时完成,让它们使用 asyncio.create_task()
. This will return a Task
object, which has an add_done_callback
方法在后台执行,这进程完成后,您可以用来做一些工作:
class Object:
def __init__(self, var):
self.var = var
async def run(self):
*do async things*
class object_controller:
def __init__(self):
self.ctx = zmq.Context()
self.socket = self.ctx.socket(zmq.PULL)
self.socket.connect("tcp://127.0.0.1:5558")
self.static_process = AStaticProcess()
self.sp = aiomultiprocess.Process(target=self.static_process.run)
self.sp.start()
asyncio.create_task(self.sp.join() self.handle_proc_finished)
def process(self, var):
object = Object(var)
process = aiomultiprocess.Process(target=object.run)
process.start()
def listener(self)
while True:
msg = self.socket.recv_pyobj()
process = aiomultiprocess.Process(...)
process.start()
t = asyncio.create_task(process.join())
t.add_done_callback(self.handle_other_proc_finished)
def handle_proc_finished(self, task):
# do something
def handle_other_proc_finished(self, task):
# do something else
如果您想避免使用回调,您还可以传递 create_task
一个您自己定义的协程,它会等待进程完成并执行之后需要完成的任何事情。
self.sp.start()
asyncio.create_task(wait_for_proc(self.sp))
async def wait_for_proc(proc):
await proc.join()
# do other stuff
我在 Python 中遇到多处理问题。我需要创建异步进程,其中 运行 未定义的时间和进程数也未定义。一旦新请求到达,就必须使用请求中的规范创建一个新流程。我们使用 ZeroMQ 进行消息传递。还有一个从头开始的进程,只有在整个脚本终止时才结束。
现在我正在寻找一种解决方案,如何等待所有进程,同时能够添加其他进程。
asyncio.gather()
这是我的第一个想法,但它需要进程列表才能被调用。
class Object:
def __init__(self, var):
self.var = var
async def run(self):
*do async things*
class object_controller:
def __init__(self):
self.ctx = zmq.Context()
self.socket = self.ctx.socket(zmq.PULL)
self.socket.connect("tcp://127.0.0.1:5558")
self.static_process = AStaticProcess()
self.sp = aiomultiprocess.Process(target=self.static_process.run)
self.sp.start()
#here I need a good way to await this process
def process(self, var):
object = Object(var)
process = aiomultiprocess.Process(target=object.run)
process.start()
def listener(self)
while True:
msg = self.socket.recv_pyobj()
# here I need to find a way how I can start and await this process while beeing able to
# receive additional request, which result in additional processes which need to be awaited
这是一些有望解释我的问题的代码。我需要一种等待进程的收集器。
初始化之后,对象和控制器之间没有交互,只通过zeroMQ(静态进程和变量进程之间)。也没有return.
您需要为流程创建任务列表或未来对象。您也不能在等待其他任务时将进程添加到事件循环
如果您需要在等待新进程的同时启动进程,而不是显式调用 await
来知道进程何时完成,让它们使用 asyncio.create_task()
. This will return a Task
object, which has an add_done_callback
方法在后台执行,这进程完成后,您可以用来做一些工作:
class Object:
def __init__(self, var):
self.var = var
async def run(self):
*do async things*
class object_controller:
def __init__(self):
self.ctx = zmq.Context()
self.socket = self.ctx.socket(zmq.PULL)
self.socket.connect("tcp://127.0.0.1:5558")
self.static_process = AStaticProcess()
self.sp = aiomultiprocess.Process(target=self.static_process.run)
self.sp.start()
asyncio.create_task(self.sp.join() self.handle_proc_finished)
def process(self, var):
object = Object(var)
process = aiomultiprocess.Process(target=object.run)
process.start()
def listener(self)
while True:
msg = self.socket.recv_pyobj()
process = aiomultiprocess.Process(...)
process.start()
t = asyncio.create_task(process.join())
t.add_done_callback(self.handle_other_proc_finished)
def handle_proc_finished(self, task):
# do something
def handle_other_proc_finished(self, task):
# do something else
如果您想避免使用回调,您还可以传递 create_task
一个您自己定义的协程,它会等待进程完成并执行之后需要完成的任何事情。
self.sp.start()
asyncio.create_task(wait_for_proc(self.sp))
async def wait_for_proc(proc):
await proc.join()
# do other stuff