如何正确使用 asyncio 作为生成器函数?

How to use asyncio properly for a generator function?

我一次读取数千个文件,对于每个文件,我需要在从每个文件生成行之前对其执行操作。为了提高性能,我想我可以使用 asyncio 在等待读入新文件的同时对文件(和生成行)执行操作。

然而,从打印语句中我可以看到所有文件都已打开并收集,然后迭代每个文件(与没有 asyncio 时发生的情况相同)。

我觉得我在这里遗漏了一些非常明显的东西,这让我的异步尝试变成了同步。

import asyncio

async def open_files(file):
    with open(file) as file:
        # do stuff
        print('opening files')
        return x

async def async_generator():
    file_outputs = await asyncio.gather(*[open_files(file) for file in files])

    for file_output in file_ouputs:
        print('using open file')
        for row in file_output:
            # Do stuff to row
            yield row

async def main():
    async for yield_value in async_generator():
        pass

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

输出:

opening files
opening files
.
.
.
using open file
using open file

编辑

使用@user4815162342 提供的代码,我注意到,尽管速度快了 3 倍,但生成器生成的行集与没有并发时生成的行集略有不同。我不确定这是不是因为每个文件都遗漏了一些产量,或者文件是否以某种方式重新排序。所以我对 user4815162342 的代码进行了以下更改,并在 pool.submit()

中输入了一个锁

我应该在第一次询问时提到,每个文件中的行和文件本身的顺序是必需的。

import concurrent.futures

def open_files(file):
    with open(file) as file:
        # do stuff
        print('opening files')
        return x

def generator():
    m = multiprocessing.Manager()
    lock = m.Lock()
    pool = concurrent.futures.ThreadPoolExecutor()
    file_output_futures = [pool.submit(open_files, file, lock) for file in files]
    for fut in concurrent.futures.as_completed(file_output_futures):
        file_output = fut.result()
        print('using open file')
        for row in file_output:
            # Do stuff to row
            yield row

def main():
    for yield_value in generator():
        pass

if __name__ == '__main__':
    main()

这样我的非并发和并发方法每次都会产生相同的值,但是我刚刚失去了使用并发获得的所有速度。

I feel like I'm missing something quite obvious here which is making my asynchronous attempts, synchronous.

您的代码有两个问题。第一个是 asyncio.gather() 按照设计等待 所有 期货并行完成,然后才 returns 它们的结果。因此,您在生成器中所做的处理并没有像您的意图那样穿插在 open_files 中的 IO 中,而是仅在对 open_files 的所有调用都返回后才开始。要在完成时处理异步调用,您应该使用 asyncio.as_completed.

之类的东西

第二个也是更基本的问题是,与可以并行化同步代码的线程不同,asyncio 要求一切都从头开始异步。将 async 添加到 open_files 之类的函数以使其异步是不够的。您需要检查代码并将任何阻塞调用(例如对 IO 的调用)替换为等效的异步原语。例如,连接到一个网络端口应该用open_connection 来完成,等等。如果你的 async 函数不等待任何东西,就像 open_files 的情况一样,它将完全像常规函数一样执行,你将不会获得 asyncio 的任何好处。

由于您在常规文件上使用 IO,并且操作系统不会为常规文件公开可移植的异步接口,因此您不太可能从 asyncio 中获利。有像 aiofiles 这样的库在后台使用线程,但它们可能会使您的代码变慢而不是加速,因为它们的 nice-looking 异步 API 涉及大量内部线程同步。要加快代码速度,您可以使用经典的线程池,Python 通过 concurrent.futures 模块公开该线程池。例如(未经测试):

import concurrent.futures

def open_files(file):
    with open(file) as file:
        # do stuff
        print('opening files')
        return x

def generator():
    pool = concurrent.futures.ThreadPoolExecutor()
    file_output_futures = [pool.submit(open_files, file) for file in files]
    for fut in file_output_futures:
        file_output = fut.result()
        print('using open file')
        for row in file_output:
            # Do stuff to row
            yield row

def main():
    for yield_value in generator():
        pass

if __name__ == '__main__':
    main()