Flask 应用程序中的异步方法未在后台执行

Async Method Not Executed in Background in Flask Application

我有一个用 Flask 和 Graphene 包构建的 GraphQL API,运行 Python 3.5.4。其中一个 GraphQL 突变需要一些时间来执行(2 到 5 分钟),我不希望最终用户等待其执行完成。我希望在后台执行突变方法并立即 return 向用户发送消息。

我查看了 asyncio 包,但出于某种原因,执行仍然出现在最前面,我的脚本正在等待。你知道我做错了什么吗?脚本很长,所以我在下面总结了与 asyncio 相关的关键元素。

文件mutation_migration_plan.py

from migration_script import Migration, main
import asyncio
[...]

class executeMigrationPlan(graphene.Mutation):
    """Mutation to execute a migration plan."""
    [...]

    @staticmethod
    def mutate(root, info, input):
    [...]
        # Execute migration asynchronously
        print('Execute migration asynchronously')
        loop = asyncio.get_event_loop()
        loop.run_until_complete(main(migration_plan))
        print('Migration plan execution has started. You will receive an e-mail when it is terminated.')

        ok = True
        message = 'Migration plan execution has started. You will receive an e-mail when it is terminated.'
        return executeMigrationPlan(ok=ok, message=message)

文件migration_script.py

class Migration():
    """Class to execute migration of Plan, Step, Object."""

    @staticmethod
    async def migrate(migration_plan, migration_step=None, migration_object=None):
        [...]

async def main(migration_plan, migration_step=None, migration_object=None):
    asyncio.ensure_future(Migration.migrate(migration_plan, migration_step, migration_object))

基本上我希望在我的控制台 window 中几乎立即看到 print('Migration plan execution has started. You will receive an e-mail when it is terminated.') 而方法 loop.run_until_complete(main(migration_plan)) 但现在情况并非如此,打印仅出现在执行结束。

[更新]

根据下面@Vincent 的回答,我更新了第一个文件 File mutation_migration_plan.py 以使用 ThreadPoolExecutor 并删除了两个文件中与 asyncio 相关的所有内容。

文件mutation_migration_plan.py

from migration_script import Migration
from concurrent.futures import ThreadPoolExecutor
[...]

class executeMigrationPlan(graphene.Mutation):
    """Mutation to execute a migration plan."""
    [...]

    @staticmethod
    def mutate(root, info, input):
        [...]
        # Execute migration asynchronously
        print('Execute migration asynchronously')
        executor = ThreadPoolExecutor(max_workers=1)
        future = executor.submit(Migration.migrate, migration_plan)
        # print(future.result())
        print('Migration plan execution has started. You will receive an e-mail when it is terminated.')

        ok = True
        message = 'Migration plan execution has started. You will receive an e-mail when it is terminated.'
        return executeMigrationPlan(ok=ok, message=message)

当我添加 print(future.result()) 行时,我的脚本运行良好,但它不会在后台执行(这是有道理的,因为我正在尝试打印结果)。但是,当我注释掉打印时,我的方法 Migration.migrate 似乎没有正确执行(我知道是因为我没有在我的数据库中看到结果)。知道为什么吗?

[更新二]

文件mutation_migration_plan.py

我已经能够使用 ProcessPoolExecutor 异步执行我的方法并删除两个文件上对 asyncio 的所有引用。见以下代码:

文件mutation_migration_plan.py

from concurrent.futures import ProcessPoolExecutor
[...]

class executeMigrationPlan(graphene.Mutation):
    """Mutation to execute a migration plan."""
    [...]

    @staticmethod
    def mutate(root, info, input):
        [...]
        # Execute migration asynchronously
        print('Execute migration asynchronously')
        executor = ProcessPoolExecutor()
        executor.submit(Migration.migrate, migration_plan.id)
        print('Migration plan execution has started. You will receive an e-mail when it is terminated.')

        ok = True
        message = 'Migration plan execution has started. You will receive an e-mail when it is terminated.'
        return executeMigrationPlan(ok=ok, message=message)

它可以正常工作,但尽管进程在后端执行,但我的 Falsk 应用程序需要很长时间才能发送 http 响应,并且响应有时为空。

这是一个常见的误解,但您不能将 asyncio 插入现有应用程序并期望它能正常工作。在 asyncio 中,每个阻塞调用都必须在协程上下文中使用 await 语法。只有这样才能实现单线程并发。这意味着您必须使用 aiohttp instead of flask, along with a library like aiohttp-graphql.

这需要对您的应用程序进行重大重写。如果您不想经历这些,还有其他解决方案可以与 Flask 很好地集成。你可以使用 celery as pointed by @dirn, or one of the executors provided by concurrent.futures.