在 BaseProxy 上调用方法时如何有效地使用 asyncio?

How to efficiently use asyncio when calling a method on a BaseProxy?

我正在开发一个应用程序,该应用程序使用 LevelDB 并针对不同的任务使用多个长期存在的进程。

由于 LevelDB 只允许单个进程维护数据库连接,我们所有的数据库访问都通过特殊的 数据库进程

要从另一个进程访问数据库,我们使用 BaseProxy。但是由于我们正在使用 asyncio 我们的代理不应该阻止这些 APIs 调用数据库进程然后最终从数据库读取。因此,我们使用执行程序在代理上实现 APIs。

    loop = asyncio.get_event_loop()

    return await loop.run_in_executor(
        thread_pool_executor,
        self._callmethod,
        method_name,
        args,
    )

虽然效果很好,但我想知道是否有更好的替代方法来将 BaseProxy_callmethod 调用包装在 ThreadPoolExecutor.

按我的理解,BaseProxy调用DB进程是等待IO的教科书示例,因此为此使用线程似乎是不必要的浪费。

在一个完美的世界中,我假设 async _acallmethod 存在于 BaseProxy 上,但不幸的是 API 不存在。

所以,我的问题基本上可以归结为:在使用 BaseProxy 时,是否有比 运行 更有效的替代方法 ThreadPoolExecutor 中的这些跨进程调用?

一个线程池就是你想要的。 aioprocessing 提供了 multiprocessing 的一些异步功能,但它使用您所建议的线程来实现。如果没有用于公开真正的异步多处理的问题,我建议针对 python 提出一个问题。

https://github.com/dano/aioprocessing

In most cases, this library makes blocking calls to multiprocessing methods asynchronous by executing the call in a ThreadPoolExecutor

假设您在同一系统中拥有 python 和数据库 运行ning(即您不希望 async 任何网络调用),您有两种选择。

  1. 你已经在做什么(运行 在执行者中)。它阻塞了数据库线程,但主线程仍然可以自由地做其他事情。这不是纯粹的非阻塞,但它是 I/O 阻塞情况下相当可接受的解决方案,维护线程的开销很小。

  2. 对于真正的非阻塞解决方案(可以在单个线程中 运行 而不会阻塞)你必须有#1。对来自数据库的每个提取调用的 async(回调)的本机支持,#2 将其包装在您的自定义事件循环实现中。在这里,您将 Base 循环子类化,并覆盖方法以集成您的数据库回调。例如,您可以创建一个实现管道服务器的基本循环。数据库写入管道并 python 轮询管道。请参阅 asyncio 代码库中 Proactor 事件循环的实现。注意:我从未实现过任何自定义事件循环。

我不熟悉 leveldb,但是对于键值存储,不清楚这样的回调对于 fetch 和纯非阻塞实现是否会有任何显着的好处。如果您在迭代器中进行多次提取,这是您的主要问题,您可以使循环 async (每次提取仍然阻塞)并可以提高性能。下面是解释这一点的虚拟代码。

import asyncio
import random
import time

async def talk_to_db(d):
    """ 
        blocking db iteration. sleep is the fetch function.
    """
    for k, v in d.items():
        time.sleep(1)
        yield (f"{k}:{v}")

async def talk_to_db_async(d):
    """ 
        real non-blocking db iteration. fetch (sleep) is native async here 
    """
    for k, v in d.items():
        await asyncio.sleep(1)
        yield (f"{k}:{v}")

async def talk_to_db_async_loop(d):
    """ 
        semi-non-blocking db iteration. fetch is blocking, but the
        loop is not.
    """
    for k, v in d.items():
        time.sleep(1)
        yield (f"{k}:{v}")
        await asyncio.sleep(0)

async def db_call_wrapper(db):
    async for row in talk_to_db(db):
        print(row)

async def db_call_wrapper_async(db):
    async for row in talk_to_db_async(db):
        print(row)

async def db_call_wrapper_async_loop(db):
    async for row in talk_to_db_async_loop(db):
        print(row)

async def func(i):
    await asyncio.sleep(5)
    print(f"done with {i}")

database = {i:random.randint(1,20) for i in range(20)}

async def main():
    db_coro = db_call_wrapper(database)
    coros = [func(i) for i in range(20)]
    coros.append(db_coro)
    await asyncio.gather(*coros)

async def main_async():
    db_coro = db_call_wrapper_async(database)
    coros = [func(i) for i in range(20)]
    coros.append(db_coro)
    await asyncio.gather(*coros)

async def main_async_loop():
    db_coro = db_call_wrapper_async_loop(database)
    coros = [func(i) for i in range(20)]
    coros.append(db_coro)
    await asyncio.gather(*coros)

# run the blocking db iteration
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

# run the non-blocking db iteration
loop = asyncio.get_event_loop()
loop.run_until_complete(main_async())

# run the non-blocking (loop only) db iteration
loop = asyncio.get_event_loop()
loop.run_until_complete(main_async_loop())

这是你可以尝试的。否则,我会说您当前的方法非常有效。我不认为 BaseProxy 可以给你一个异步调用 API,它不知道如何处理来自你的数据库的回调。

不幸的是,多处理库不适合转换为 asyncio,如果您必须使用 BaseProxy 来处理您的 IPC(进程间通信),您所能做的就是最好的。

虽然库确实在此处使用了阻塞 I/O,但您无法轻易进入并重新处理阻塞部分以使用非阻塞原语。如果你坚持走这条路,你必须修补或重写该库的内部实现细节,但作为内部实现细节,这些细节可能不同于 Python 点发布到点发布,使得任何补丁都脆弱且容易发生打破小 Python 升级。 _callmethod 方法是涉及线程、套接字或管道连接以及序列化程序的深层抽象层次结构的一部分。参见 multiprocessing/connection.py and multiprocessing/managers.py

所以你在这里的选择是坚持你当前的方法(使用线程池执行器将 BaseProxy._callmethod() 推送到另一个线程)or 来实现你自己的 IPC 解决方案使用异步原语。您的中央数据库访问进程将充当其他进程作为客户端连接的服务器,使用套接字或命名管道,对客户端请求和服务器响应使用商定的序列化方案。这就是 multiprocessing 为您实现的,但是您可以使用 asyncio streams 和最适合您的应用程序模式的任何序列化方案(例如 pickle、JSON、protobuffers 来实现您自己的(更简单的)版本,或其他完全不同的东西)。