使 Python 多处理池可等待
Making a Python multiprocessing Pool awaitable
我有一个应用程序需要根据 websocket 流执行一些处理器密集型工作。我想通过多处理并行化 CPU 密集位,但我仍然需要异步接口来处理应用程序的流式处理部分。为了解决这个问题,我希望制作一个可等待的 multiprocessing.AsyncResult 版本(multiprocessing.pool.Pool.submit_async 操作的结果)。但是,我 运行 遇到了一些奇怪的行为。
我的新可等待池结果(它是 asyncio.Future 的子类)工作正常,只要池结果在我开始等待之前返回即可。但是,如果我尝试在池结果返回之前等待它,那么程序似乎会在 await 语句上无限期地停止。
我已经用 next(future.async()) 检查了异步迭代器的结果,在池处理完成之前,迭代器 returns 是未来的实例本身,并且正如我所料,之后引发 StopIterationError。
代码如下。
import multiprocessing
import multiprocessing.pool
import asyncio
import time
class Awaitable_Multiprocessing_Pool(multiprocessing.pool.Pool):
def __init__(self, *args, **kwargs):
multiprocessing.pool.Pool.__init__(self, *args, **kwargs)
def apply_awaitably(self, func, args = list(), kwargs = dict()):
return Awaitable_Multiprocessing_Pool_Result(
self,
func,
args,
kwargs)
class Awaitable_Multiprocessing_Pool_Result(asyncio.Future):
def __init__(self, pool, func, args = list(), kwargs = dict()):
asyncio.Future.__init__(self)
self.pool_result = pool.apply_async(
func,
args,
kwargs,
self.set_result,
self.set_exception)
def result(self):
return self.pool_result.get()
def done(self):
return self.pool_result.ready()
def dummy_processing_fun():
import time
print('start processing')
time.sleep(4)
print('finished processing')
return 'result'
if __name__ == '__main__':
async def main():
ah = Async_Handler(1)
pool = Awaitable_Multiprocessing_Pool(2)
while True:
future = pool.apply_awaitably(dummy_processing_fun, [])
# print(next(future.__await__())) # would show same as print(future)
# print(await future) # would stall indefinitely because pool result isn't in
time.sleep(10) # NOTE: you may have to make this longer to account for pool startup time on the first iteration
# print(next(future.__await__())) # would raise StopIteration
print(await future) # prints 'result'
asyncio.run(main())
我是不是漏掉了什么明显的东西?我认为我拥有 awaitable 的所有基本要素,部分原因是我可以在某些情况下成功等待。有人有任何见解吗?
我不知道你为什么把它弄得这么复杂...下面的代码怎么样?
from concurrent.futures import ProcessPoolExecutor
import asyncio
import time
def dummy_processing_fun():
import time
print('start processing')
time.sleep(4)
print('finished processing')
return 'result'
if __name__ == '__main__':
async def main():
pool = ProcessPoolExecutor(2)
while True:
future = pool.submit(dummy_processing_fun)
future = asyncio.wrap_future(future)
# print(next(future.__await__())) # would show same as print(future)
# print(await future) # would stall indefinitely because pool result isn't in
# time.sleep(5)
# print(next(future.__await__())) # would raise StopIteration
print(await future) # prints 'result'
asyncio.run(main())
我有一个应用程序需要根据 websocket 流执行一些处理器密集型工作。我想通过多处理并行化 CPU 密集位,但我仍然需要异步接口来处理应用程序的流式处理部分。为了解决这个问题,我希望制作一个可等待的 multiprocessing.AsyncResult 版本(multiprocessing.pool.Pool.submit_async 操作的结果)。但是,我 运行 遇到了一些奇怪的行为。
我的新可等待池结果(它是 asyncio.Future 的子类)工作正常,只要池结果在我开始等待之前返回即可。但是,如果我尝试在池结果返回之前等待它,那么程序似乎会在 await 语句上无限期地停止。
我已经用 next(future.async()) 检查了异步迭代器的结果,在池处理完成之前,迭代器 returns 是未来的实例本身,并且正如我所料,之后引发 StopIterationError。
代码如下。
import multiprocessing
import multiprocessing.pool
import asyncio
import time
class Awaitable_Multiprocessing_Pool(multiprocessing.pool.Pool):
def __init__(self, *args, **kwargs):
multiprocessing.pool.Pool.__init__(self, *args, **kwargs)
def apply_awaitably(self, func, args = list(), kwargs = dict()):
return Awaitable_Multiprocessing_Pool_Result(
self,
func,
args,
kwargs)
class Awaitable_Multiprocessing_Pool_Result(asyncio.Future):
def __init__(self, pool, func, args = list(), kwargs = dict()):
asyncio.Future.__init__(self)
self.pool_result = pool.apply_async(
func,
args,
kwargs,
self.set_result,
self.set_exception)
def result(self):
return self.pool_result.get()
def done(self):
return self.pool_result.ready()
def dummy_processing_fun():
import time
print('start processing')
time.sleep(4)
print('finished processing')
return 'result'
if __name__ == '__main__':
async def main():
ah = Async_Handler(1)
pool = Awaitable_Multiprocessing_Pool(2)
while True:
future = pool.apply_awaitably(dummy_processing_fun, [])
# print(next(future.__await__())) # would show same as print(future)
# print(await future) # would stall indefinitely because pool result isn't in
time.sleep(10) # NOTE: you may have to make this longer to account for pool startup time on the first iteration
# print(next(future.__await__())) # would raise StopIteration
print(await future) # prints 'result'
asyncio.run(main())
我是不是漏掉了什么明显的东西?我认为我拥有 awaitable 的所有基本要素,部分原因是我可以在某些情况下成功等待。有人有任何见解吗?
我不知道你为什么把它弄得这么复杂...下面的代码怎么样?
from concurrent.futures import ProcessPoolExecutor
import asyncio
import time
def dummy_processing_fun():
import time
print('start processing')
time.sleep(4)
print('finished processing')
return 'result'
if __name__ == '__main__':
async def main():
pool = ProcessPoolExecutor(2)
while True:
future = pool.submit(dummy_processing_fun)
future = asyncio.wrap_future(future)
# print(next(future.__await__())) # would show same as print(future)
# print(await future) # would stall indefinitely because pool result isn't in
# time.sleep(5)
# print(next(future.__await__())) # would raise StopIteration
print(await future) # prints 'result'
asyncio.run(main())