使 Python 多处理池可等待

Making a Python multiprocessing Pool awaitable

我有一个应用程序需要根据 websocket 流执行一些处理器密集型工作。我想通过多处理并行化 CPU 密集位,但我仍然需要异步接口来处理应用程序的流式处理部分。为了解决这个问题,我希望制作一个可等待的 multiprocessing.AsyncResult 版本(multiprocessing.pool.Pool.submit_async 操作的结果)。但是,我 运行 遇到了一些奇怪的行为。

我的新可等待池结果(它是 asyncio.Future 的子类)工作正常,只要池结果在我开始等待之前返回即可。但是,如果我尝试在池结果返回之前等待它,那么程序似乎会在 await 语句上无限期地停止。

我已经用 next(future.async()) 检查了异步迭代器的结果,在池处理完成之前,迭代器 returns 是未来的实例本身,并且正如我所料,之后引发 StopIterationError。

代码如下。

import multiprocessing
import multiprocessing.pool
import asyncio
import time

class Awaitable_Multiprocessing_Pool(multiprocessing.pool.Pool):
    def __init__(self, *args, **kwargs):
        multiprocessing.pool.Pool.__init__(self, *args, **kwargs)

    def apply_awaitably(self, func, args = list(), kwargs = dict()):
        return Awaitable_Multiprocessing_Pool_Result(
                self,
                func,
                args,
                kwargs)

class Awaitable_Multiprocessing_Pool_Result(asyncio.Future):
    def __init__(self, pool, func, args = list(), kwargs = dict()):
        asyncio.Future.__init__(self)
        self.pool_result = pool.apply_async(
                func,
                args,
                kwargs,
                self.set_result,
                self.set_exception)

    def result(self):
        return self.pool_result.get()

    def done(self):
        return self.pool_result.ready()

def dummy_processing_fun():
    import time
    print('start processing')
    time.sleep(4)
    print('finished processing')
    return 'result'

if __name__ == '__main__':
    async def main():
        ah = Async_Handler(1)
        pool = Awaitable_Multiprocessing_Pool(2)
        while True:
            future = pool.apply_awaitably(dummy_processing_fun, [])
            # print(next(future.__await__())) # would show same as print(future)
            # print(await future) # would stall indefinitely because pool result isn't in
            time.sleep(10) # NOTE: you may have to make this longer to account for pool startup time on the first iteration
            # print(next(future.__await__())) # would raise StopIteration
            print(await future) # prints 'result'
    asyncio.run(main())

我是不是漏掉了什么明显的东西?我认为我拥有 awaitable 的所有基本要素,部分原因是我可以在某些情况下成功等待。有人有任何见解吗?

我不知道你为什么把它弄得这么复杂...下面的代码怎么样?

from concurrent.futures import ProcessPoolExecutor
import asyncio
import time

def dummy_processing_fun():
    import time
    print('start processing')
    time.sleep(4)
    print('finished processing')
    return 'result'

if __name__ == '__main__':
    async def main():
        pool = ProcessPoolExecutor(2)
        while True:
            future = pool.submit(dummy_processing_fun)
            future = asyncio.wrap_future(future)
            # print(next(future.__await__())) # would show same as print(future)
            # print(await future) # would stall indefinitely because pool result isn't in
            # time.sleep(5)
            # print(next(future.__await__())) # would raise StopIteration
            print(await future) # prints 'result'
    asyncio.run(main())