Python 3 asyncio 和 GIL(如何使用所有 cpu 核心 - 除了 ProcessPoolExecutor 之外的任何其他选项)?

Python 3 asyncio and GIL (how to use all cpu cores - any other options than ProcessPoolExecutor)?

如何使用所有 cpu 内核制作 asyncio - 除了 ProcessPoolExecutor 之外还有其他选择吗?

我假设 asyncio 不能打破 GIL 限制(也许我错了)所以程序将比 treading 版本执行得更快,但会在一个核心上执行。

我研究了一些示例,发现一种方法是多处理和 ProcessPoolExecutor。

https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor

# 3. Run in a custom process pool:
with concurrent.futures.ProcessPoolExecutor() as pool:
    result = await loop.run_in_executor(
        pool, cpu_bound)
    print('custom process pool', result)

这很好,但进程之间需要 "pickle",因此需要一些开销并对传递的参数进行一些优化以减少 "pickle" 序列化。

利用上面这个简单的模式,我写了这样的测试代码(如果不喜欢可以跳过这段代码阅读,因为它和以前一样)。顺便说一句,这是我解析文件问题的最快解决方案。这部分代码不是整个程序。

def _match_general_and_specific_file_chunk(file_name):
    with codecs.open(file_name, encoding='utf8') as f:
        while True:
            lines = f.readlines(sizehint=10000)
            if not lines:
                break
            for line in lines:
                general_match = RE_RULES.match(line)
                if general_match:
                    specific_match = RULES[general_match.lastindex].match(line)
                    groups = list(specific_match.groups())
                    continue


async def _async_process_executor_match_general_and_specific_read_lines_with_limit_file_chunk():
    loop = asyncio.get_event_loop()
    with ProcessPoolExecutor() as pool:
        futures = []
        for file_name in get_file_names():
            future = loop.run_in_executor(pool, _match_general_and_specific_file_chunk, file_name)
            futures.append(future)
        await asyncio.gather(*futures)


def async_process_executor_match_general_and_specific_read_lines_with_limit_file_chunk():
    asyncio.run(_async_process_executor_match_general_and_specific_read_lines_with_limit_file_chunk())

How to make asyncio using all cpu cores - any other options than ProcessPoolExecutor?

Asyncio 是不适合这项工作的工具,因为它是专门为管理 IO 绑定程序的状态而设计的(您可以将其视为 Twisted 的继承者)。

要并行执行 CPU 绑定代码,您将需要线程或进程提供的 OS 级并发性。在 Python 中,最方便的方法是 concurrent.futures 模块,其中 类 与 ThreadPoolExecutor and ProcessPoolExecutor come from in the first place. You only need to submit() the pieces of work to the executor and wait 类似,用于完成生成的期货。

如果你想避免酸洗的开销,有两种方法:

  • 使用进程,并在控制进程和工作进程之间share state使用共享内存、映射内存或管理器对象。

  • 使用线程,但在执行 CPU 密集型工作时调用内部释放 GIL 的代码。一些软件包已经这样做了,例如 or many parts of numpy, but that won't help you if you have needs not covered by those packages. In that case you might need to write a new C extension, referring to the documentation 有关如何临时释放 GIL 以及何时可以安全释放 GIL 的详细信息。