为什么我的异步函数 运行 同步 Python3.9?

Why is my async function running synchronously Python3.9?

我正在尝试使用 asynciofutures 在单独的线程上 运行 一个函数。我有一个装饰器,它异步地接受 long 运行ning 函数及其参数并输出它的值。不幸的是,这些进程似乎不是异步工作的。

def multiprocess(self, function, executor=None, *args, **kwargs):
    async def run_task(function, *args, **kwargs):
        @functools.wraps(function)
        async def wrap(*args, **kwargs):
            while True:
                execution_runner = executor or self._DEFAULT_POOL_
                executed_job = execution_runner.submit(function, *args, **kwargs)
                print(
                    f"Pending {function.__name__}:",
                    execution_runner._work_queue.qsize(),
                    "jobs",
                )
                print(
                    f"Threads: {function.__name__}:", len(execution_runner._threads)
                )
                future = await asyncio.wrap_future(executed_job)
                return future

        return wrap

    return asyncio.run(run_task(function, *args, **kwargs))

要调用装饰器,我有两个函数 _async_tasktask_function_async_task 包含一个循环,该循环 运行s task_function 用于每个需要处理的文档。

@staticmethod
def _async_task(documents):
    processed_docs = asyncio.run(task_function(documents))
    return processed_docs

task_function按如下方式处理文档中的每个文档,

@multiprocess
async def task_function(documents):
    processed_documents = None
    try:
        for doc in documents:
            processed_documents = process_document(doc)
            print(processed_documents)
    except Exception as err:
        print(err)
    return processed_documents

这不能异步工作的线索是我对多线程装饰器的诊断打印如下。

Pending summarise_news: 0 jobs
Threads: summarise_news: 2

由于没有待处理的作业,并且整个过程所花的时间与同步 运行 一样长,因此 运行ning 是同步的。

我在设置您的代码时遇到了一些问题,但我想我已经找到了答案。

首先,正如@dano 在他的评论中提到的,asyncio.run 阻塞直到协程 运行 完成。因此,您不会通过使用这种方法获得任何加速。

我使用了稍微修改过的 multiprocess 装饰器

def multiprocess(executor=None, *args, **kwargs):
    def run_task(function, *args, **kwargs):
        def wrap(*args, **kwargs):

            execution_runner = executor or DEFAULT_EXECUTOR
            executed_job = execution_runner.submit(function, *args, **kwargs)
            print(
                f"Pending {function.__name__}:",
                execution_runner._work_queue.qsize(),
                "jobs",
            )
            print(
                f"Threads: {function.__name__}:", len(execution_runner._threads)
            )
            future = asyncio.wrap_future(executed_job)

            return future

        return wrap
    return run_task

如您所见,这里没有 asyncio.run,装饰器和内部包装器都是同步的,因为 asyncio.wrap_future 不需要 await

更新后的 multiprocess 装饰器现在与 process_document 函数一起使用。这样做的原因是您不会从并行化按顺序调用阻塞函数的函数中获得任何好处。您必须 将阻塞函数 转换为可在执行程序中运行。

注意这个虚拟 process_document 与我描述的完全一样 - 完全阻塞和同步。

@multiprocess()
def process_document(doc):
    print(f"Processing doc: {doc}...")
    time.sleep(2)
    print(f"Doc {doc} done.")

现在,到了最后一点。我们已经通过将其转换为可在执行程序中运行来使 process_document 成为一种异步,但它仍然很重要,您如何调用它。

考虑以下示例:

for doc in documents:
    result = await process_document(doc)
results = await asyncio.gather(*[process_document(doc) for doc in documents])

在前者中,我们将按顺序等待协程,必须等到一个完成才能启动另一个。 在后一个示例中,它们将并行执行,因此 确实取决于 您调用协程执行的准确程度。

这是我使用的完整代码片段:

import asyncio
import concurrent.futures
import time


DEFAULT_EXECUTOR = concurrent.futures.ThreadPoolExecutor(max_workers=4)


def multiprocess(executor=None, *args, **kwargs):
    def run_task(function, *args, **kwargs):
        def wrap(*args, **kwargs):

            execution_runner = executor or DEFAULT_EXECUTOR
            executed_job = execution_runner.submit(function, *args, **kwargs)
            print(
                f"Pending {function.__name__}:",
                execution_runner._work_queue.qsize(),
                "jobs",
            )
            print(
                f"Threads: {function.__name__}:", len(execution_runner._threads)
            )
            future = asyncio.wrap_future(executed_job)

            return future

        return wrap
    return run_task


@multiprocess()
def process_document(doc):
    print(f"Processing doc: {doc}...")
    time.sleep(2)
    print(f"Doc {doc} done.")


async def task_function_sequential(documents):
    start = time.time()
    for doc in documents:
        await process_document(doc)

    end = time.time()
    print(f"task_function_sequential took: {end-start}s")


async def task_function_parallel(documents):
    start = time.time()

    jobs = [process_document(doc) for doc in documents]
    await asyncio.gather(*jobs)

    end = time.time()
    print(f"task_function_parallel took: {end-start}s")


async def main():
    documents = [i for i in range(5)]
    await task_function_sequential(documents)
    await task_function_parallel(documents)


asyncio.run(main())

请注意,task_function_parallel 示例仍然需要大约 4 秒,而不是 2 秒,因为线程池限制为 4 个工人,作业数为 5,所以最后一个作业将等待一些工人可用。