在主进程中异步等待多处理队列
Asynchronously wait for multiprocessing Queue in main process
我有以下场景:多个工作进程将有关其当前状态的事件发送到事件调度程序。如果我们在主进程中,则此事件调度程序需要处理所有事件,或者如果我们在工作进程中,则向主进程的事件调度程序发出信号以处理这些事件。
这里的主要症结是事件处理也必须在主进程的主线程中,所以我不能只是运行 while True loop在一个线程中等待来自工作进程的消息。
所以我有这个:
import asyncio
from concurrent.futures import ThreadPoolExecutor
from multiprocessing import current_process, Process, Queue
from threading import current_thread
from time import sleep
def get_q(q):
print("Waiting for the queue ({} / {})\n".format(current_thread().name, current_process().name))
return q.get()
async def message_q(q):
while True:
f = loop.run_in_executor(None, get_q, q)
await f
if f.result() is None:
print("Done")
return;
print("Got the result ({} / {})".format(current_thread().name, current_process().name))
print("Result is: {}\n".format(f.result()))
async def something_else():
while True:
print("Something else\n")
await asyncio.sleep(2)
def other_process(q):
for i in range(5):
print("Putting something in the queue ({})".format(current_process().name))
q.put(i)
sleep(1)
q.put(None)
q = Queue()
Process(target=other_process, args=(q,), daemon=True).start()
loop = asyncio.get_event_loop()
loop.set_default_executor(ThreadPoolExecutor(max_workers=1))
asyncio.ensure_future(message_q(q))
asyncio.ensure_future(something_else())
loop.run_until_complete(asyncio.sleep(6))
other_process()
是一个示例性的工作进程,它使用队列向主进程发出信号,运行 是一个事件循环来处理内容并等待队列中的任何数据。在实际情况下,此进程会向事件调度程序发出信号,事件调度程序随后会处理队列消息,将消息传递给主进程事件调度程序,但这里我稍微简化了它。
不过,我对此不是很满意。一次又一次地向 ThreadPoolExecutor
提交 get_q()
会产生更多的开销,并且不如一个 long-运行ning 线程那么干净。 await f
也不是最优的,一旦队列中没有更多数据就会阻塞,这会阻止事件循环退出。我的解决方法是在工作人员完成后发送 None
,如果 None
在队列中则退出 message_q()
。
有没有更好的方法来实现这个?性能非常重要,我想将 Queue 对象保留在事件调度程序的本地,而不是将其传递给管理工作进程的代码(或需要调用其中的某种 finalize()
方法)。
我现在将其实现为异步上下文管理器。上下文管理器调用
asyncio.ensure_future(message_q())
在其__aenter__()
方法中,并在其__aexit__()
方法中将None
添加到队列中以关闭message_q()
中的无限循环。
上下文管理器可以在进程生成代码部分周围的 async with
语句中使用,从而无需手动调用关闭方法。但是,建议在确保 message_q()
协程允许上下文管理器初始化队列侦听器后,在 __aenter__()
方法中调用 await asyncio.sleep(0)
。否则,message_q()
不会立即被调用。这本身不是问题(因为队列已满),但它会延迟事件转发,直到代码中出现下一个 await
。
应该使用 ProcessPoolExecutor
和 loop.run_in_executor()
一起生成进程,因此等待进程不会阻塞事件循环。
除了使用 Queue,您可能还想使用 JoinableQueue 来确保在退出上下文管理器之前所有事件都已处理。
我有以下场景:多个工作进程将有关其当前状态的事件发送到事件调度程序。如果我们在主进程中,则此事件调度程序需要处理所有事件,或者如果我们在工作进程中,则向主进程的事件调度程序发出信号以处理这些事件。
这里的主要症结是事件处理也必须在主进程的主线程中,所以我不能只是运行 while True loop在一个线程中等待来自工作进程的消息。
所以我有这个:
import asyncio
from concurrent.futures import ThreadPoolExecutor
from multiprocessing import current_process, Process, Queue
from threading import current_thread
from time import sleep
def get_q(q):
print("Waiting for the queue ({} / {})\n".format(current_thread().name, current_process().name))
return q.get()
async def message_q(q):
while True:
f = loop.run_in_executor(None, get_q, q)
await f
if f.result() is None:
print("Done")
return;
print("Got the result ({} / {})".format(current_thread().name, current_process().name))
print("Result is: {}\n".format(f.result()))
async def something_else():
while True:
print("Something else\n")
await asyncio.sleep(2)
def other_process(q):
for i in range(5):
print("Putting something in the queue ({})".format(current_process().name))
q.put(i)
sleep(1)
q.put(None)
q = Queue()
Process(target=other_process, args=(q,), daemon=True).start()
loop = asyncio.get_event_loop()
loop.set_default_executor(ThreadPoolExecutor(max_workers=1))
asyncio.ensure_future(message_q(q))
asyncio.ensure_future(something_else())
loop.run_until_complete(asyncio.sleep(6))
other_process()
是一个示例性的工作进程,它使用队列向主进程发出信号,运行 是一个事件循环来处理内容并等待队列中的任何数据。在实际情况下,此进程会向事件调度程序发出信号,事件调度程序随后会处理队列消息,将消息传递给主进程事件调度程序,但这里我稍微简化了它。
不过,我对此不是很满意。一次又一次地向 ThreadPoolExecutor
提交 get_q()
会产生更多的开销,并且不如一个 long-运行ning 线程那么干净。 await f
也不是最优的,一旦队列中没有更多数据就会阻塞,这会阻止事件循环退出。我的解决方法是在工作人员完成后发送 None
,如果 None
在队列中则退出 message_q()
。
有没有更好的方法来实现这个?性能非常重要,我想将 Queue 对象保留在事件调度程序的本地,而不是将其传递给管理工作进程的代码(或需要调用其中的某种 finalize()
方法)。
我现在将其实现为异步上下文管理器。上下文管理器调用
asyncio.ensure_future(message_q())
在其__aenter__()
方法中,并在其__aexit__()
方法中将None
添加到队列中以关闭message_q()
中的无限循环。
上下文管理器可以在进程生成代码部分周围的 async with
语句中使用,从而无需手动调用关闭方法。但是,建议在确保 message_q()
协程允许上下文管理器初始化队列侦听器后,在 __aenter__()
方法中调用 await asyncio.sleep(0)
。否则,message_q()
不会立即被调用。这本身不是问题(因为队列已满),但它会延迟事件转发,直到代码中出现下一个 await
。
应该使用 ProcessPoolExecutor
和 loop.run_in_executor()
一起生成进程,因此等待进程不会阻塞事件循环。
除了使用 Queue,您可能还想使用 JoinableQueue 来确保在退出上下文管理器之前所有事件都已处理。