在 BaseProxy 上调用方法时如何有效地使用 asyncio?
How to efficiently use asyncio when calling a method on a BaseProxy?
我正在开发一个应用程序,该应用程序使用 LevelDB
并针对不同的任务使用多个长期存在的进程。
由于 LevelDB 只允许单个进程维护数据库连接,我们所有的数据库访问都通过特殊的 数据库进程。
要从另一个进程访问数据库,我们使用 BaseProxy
。但是由于我们正在使用 asyncio
我们的代理不应该阻止这些 APIs 调用数据库进程然后最终从数据库读取。因此,我们使用执行程序在代理上实现 APIs。
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
thread_pool_executor,
self._callmethod,
method_name,
args,
)
虽然效果很好,但我想知道是否有更好的替代方法来将 BaseProxy
的 _callmethod
调用包装在 ThreadPoolExecutor
.
中
按我的理解,BaseProxy
调用DB进程是等待IO的教科书示例,因此为此使用线程似乎是不必要的浪费。
在一个完美的世界中,我假设 async _acallmethod
存在于 BaseProxy
上,但不幸的是 API 不存在。
所以,我的问题基本上可以归结为:在使用 BaseProxy
时,是否有比 运行 更有效的替代方法 ThreadPoolExecutor
中的这些跨进程调用?
一个线程池就是你想要的。 aioprocessing 提供了 multiprocessing 的一些异步功能,但它使用您所建议的线程来实现。如果没有用于公开真正的异步多处理的问题,我建议针对 python 提出一个问题。
https://github.com/dano/aioprocessing
In most cases, this library makes blocking calls to multiprocessing methods asynchronous by executing the call in a ThreadPoolExecutor
假设您在同一系统中拥有 python 和数据库 运行ning(即您不希望 async
任何网络调用),您有两种选择。
你已经在做什么(运行 在执行者中)。它阻塞了数据库线程,但主线程仍然可以自由地做其他事情。这不是纯粹的非阻塞,但它是 I/O 阻塞情况下相当可接受的解决方案,维护线程的开销很小。
对于真正的非阻塞解决方案(可以在单个线程中 运行 而不会阻塞)你必须有#1。对来自数据库的每个提取调用的 async
(回调)的本机支持,#2 将其包装在您的自定义事件循环实现中。在这里,您将 Base 循环子类化,并覆盖方法以集成您的数据库回调。例如,您可以创建一个实现管道服务器的基本循环。数据库写入管道并 python 轮询管道。请参阅 asyncio
代码库中 Proactor 事件循环的实现。注意:我从未实现过任何自定义事件循环。
我不熟悉 leveldb,但是对于键值存储,不清楚这样的回调对于 fetch 和纯非阻塞实现是否会有任何显着的好处。如果您在迭代器中进行多次提取,这是您的主要问题,您可以使循环 async
(每次提取仍然阻塞)并可以提高性能。下面是解释这一点的虚拟代码。
import asyncio
import random
import time
async def talk_to_db(d):
"""
blocking db iteration. sleep is the fetch function.
"""
for k, v in d.items():
time.sleep(1)
yield (f"{k}:{v}")
async def talk_to_db_async(d):
"""
real non-blocking db iteration. fetch (sleep) is native async here
"""
for k, v in d.items():
await asyncio.sleep(1)
yield (f"{k}:{v}")
async def talk_to_db_async_loop(d):
"""
semi-non-blocking db iteration. fetch is blocking, but the
loop is not.
"""
for k, v in d.items():
time.sleep(1)
yield (f"{k}:{v}")
await asyncio.sleep(0)
async def db_call_wrapper(db):
async for row in talk_to_db(db):
print(row)
async def db_call_wrapper_async(db):
async for row in talk_to_db_async(db):
print(row)
async def db_call_wrapper_async_loop(db):
async for row in talk_to_db_async_loop(db):
print(row)
async def func(i):
await asyncio.sleep(5)
print(f"done with {i}")
database = {i:random.randint(1,20) for i in range(20)}
async def main():
db_coro = db_call_wrapper(database)
coros = [func(i) for i in range(20)]
coros.append(db_coro)
await asyncio.gather(*coros)
async def main_async():
db_coro = db_call_wrapper_async(database)
coros = [func(i) for i in range(20)]
coros.append(db_coro)
await asyncio.gather(*coros)
async def main_async_loop():
db_coro = db_call_wrapper_async_loop(database)
coros = [func(i) for i in range(20)]
coros.append(db_coro)
await asyncio.gather(*coros)
# run the blocking db iteration
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
# run the non-blocking db iteration
loop = asyncio.get_event_loop()
loop.run_until_complete(main_async())
# run the non-blocking (loop only) db iteration
loop = asyncio.get_event_loop()
loop.run_until_complete(main_async_loop())
这是你可以尝试的。否则,我会说您当前的方法非常有效。我不认为 BaseProxy 可以给你一个异步调用 API,它不知道如何处理来自你的数据库的回调。
不幸的是,多处理库不适合转换为 asyncio,如果您必须使用 BaseProxy
来处理您的 IPC(进程间通信),您所能做的就是最好的。
虽然库确实在此处使用了阻塞 I/O,但您无法轻易进入并重新处理阻塞部分以使用非阻塞原语。如果你坚持走这条路,你必须修补或重写该库的内部实现细节,但作为内部实现细节,这些细节可能不同于 Python 点发布到点发布,使得任何补丁都脆弱且容易发生打破小 Python 升级。 _callmethod
方法是涉及线程、套接字或管道连接以及序列化程序的深层抽象层次结构的一部分。参见 multiprocessing/connection.py
and multiprocessing/managers.py
。
所以你在这里的选择是坚持你当前的方法(使用线程池执行器将 BaseProxy._callmethod()
推送到另一个线程)or 来实现你自己的 IPC 解决方案使用异步原语。您的中央数据库访问进程将充当其他进程作为客户端连接的服务器,使用套接字或命名管道,对客户端请求和服务器响应使用商定的序列化方案。这就是 multiprocessing
为您实现的,但是您可以使用 asyncio
streams 和最适合您的应用程序模式的任何序列化方案(例如 pickle、JSON、protobuffers 来实现您自己的(更简单的)版本,或其他完全不同的东西)。
我正在开发一个应用程序,该应用程序使用 LevelDB
并针对不同的任务使用多个长期存在的进程。
由于 LevelDB 只允许单个进程维护数据库连接,我们所有的数据库访问都通过特殊的 数据库进程。
要从另一个进程访问数据库,我们使用 BaseProxy
。但是由于我们正在使用 asyncio
我们的代理不应该阻止这些 APIs 调用数据库进程然后最终从数据库读取。因此,我们使用执行程序在代理上实现 APIs。
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
thread_pool_executor,
self._callmethod,
method_name,
args,
)
虽然效果很好,但我想知道是否有更好的替代方法来将 BaseProxy
的 _callmethod
调用包装在 ThreadPoolExecutor
.
按我的理解,BaseProxy
调用DB进程是等待IO的教科书示例,因此为此使用线程似乎是不必要的浪费。
在一个完美的世界中,我假设 async _acallmethod
存在于 BaseProxy
上,但不幸的是 API 不存在。
所以,我的问题基本上可以归结为:在使用 BaseProxy
时,是否有比 运行 更有效的替代方法 ThreadPoolExecutor
中的这些跨进程调用?
一个线程池就是你想要的。 aioprocessing 提供了 multiprocessing 的一些异步功能,但它使用您所建议的线程来实现。如果没有用于公开真正的异步多处理的问题,我建议针对 python 提出一个问题。
https://github.com/dano/aioprocessing
In most cases, this library makes blocking calls to multiprocessing methods asynchronous by executing the call in a ThreadPoolExecutor
假设您在同一系统中拥有 python 和数据库 运行ning(即您不希望 async
任何网络调用),您有两种选择。
你已经在做什么(运行 在执行者中)。它阻塞了数据库线程,但主线程仍然可以自由地做其他事情。这不是纯粹的非阻塞,但它是 I/O 阻塞情况下相当可接受的解决方案,维护线程的开销很小。
对于真正的非阻塞解决方案(可以在单个线程中 运行 而不会阻塞)你必须有#1。对来自数据库的每个提取调用的
async
(回调)的本机支持,#2 将其包装在您的自定义事件循环实现中。在这里,您将 Base 循环子类化,并覆盖方法以集成您的数据库回调。例如,您可以创建一个实现管道服务器的基本循环。数据库写入管道并 python 轮询管道。请参阅asyncio
代码库中 Proactor 事件循环的实现。注意:我从未实现过任何自定义事件循环。
我不熟悉 leveldb,但是对于键值存储,不清楚这样的回调对于 fetch 和纯非阻塞实现是否会有任何显着的好处。如果您在迭代器中进行多次提取,这是您的主要问题,您可以使循环 async
(每次提取仍然阻塞)并可以提高性能。下面是解释这一点的虚拟代码。
import asyncio
import random
import time
async def talk_to_db(d):
"""
blocking db iteration. sleep is the fetch function.
"""
for k, v in d.items():
time.sleep(1)
yield (f"{k}:{v}")
async def talk_to_db_async(d):
"""
real non-blocking db iteration. fetch (sleep) is native async here
"""
for k, v in d.items():
await asyncio.sleep(1)
yield (f"{k}:{v}")
async def talk_to_db_async_loop(d):
"""
semi-non-blocking db iteration. fetch is blocking, but the
loop is not.
"""
for k, v in d.items():
time.sleep(1)
yield (f"{k}:{v}")
await asyncio.sleep(0)
async def db_call_wrapper(db):
async for row in talk_to_db(db):
print(row)
async def db_call_wrapper_async(db):
async for row in talk_to_db_async(db):
print(row)
async def db_call_wrapper_async_loop(db):
async for row in talk_to_db_async_loop(db):
print(row)
async def func(i):
await asyncio.sleep(5)
print(f"done with {i}")
database = {i:random.randint(1,20) for i in range(20)}
async def main():
db_coro = db_call_wrapper(database)
coros = [func(i) for i in range(20)]
coros.append(db_coro)
await asyncio.gather(*coros)
async def main_async():
db_coro = db_call_wrapper_async(database)
coros = [func(i) for i in range(20)]
coros.append(db_coro)
await asyncio.gather(*coros)
async def main_async_loop():
db_coro = db_call_wrapper_async_loop(database)
coros = [func(i) for i in range(20)]
coros.append(db_coro)
await asyncio.gather(*coros)
# run the blocking db iteration
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
# run the non-blocking db iteration
loop = asyncio.get_event_loop()
loop.run_until_complete(main_async())
# run the non-blocking (loop only) db iteration
loop = asyncio.get_event_loop()
loop.run_until_complete(main_async_loop())
这是你可以尝试的。否则,我会说您当前的方法非常有效。我不认为 BaseProxy 可以给你一个异步调用 API,它不知道如何处理来自你的数据库的回调。
不幸的是,多处理库不适合转换为 asyncio,如果您必须使用 BaseProxy
来处理您的 IPC(进程间通信),您所能做的就是最好的。
虽然库确实在此处使用了阻塞 I/O,但您无法轻易进入并重新处理阻塞部分以使用非阻塞原语。如果你坚持走这条路,你必须修补或重写该库的内部实现细节,但作为内部实现细节,这些细节可能不同于 Python 点发布到点发布,使得任何补丁都脆弱且容易发生打破小 Python 升级。 _callmethod
方法是涉及线程、套接字或管道连接以及序列化程序的深层抽象层次结构的一部分。参见 multiprocessing/connection.py
and multiprocessing/managers.py
。
所以你在这里的选择是坚持你当前的方法(使用线程池执行器将 BaseProxy._callmethod()
推送到另一个线程)or 来实现你自己的 IPC 解决方案使用异步原语。您的中央数据库访问进程将充当其他进程作为客户端连接的服务器,使用套接字或命名管道,对客户端请求和服务器响应使用商定的序列化方案。这就是 multiprocessing
为您实现的,但是您可以使用 asyncio
streams 和最适合您的应用程序模式的任何序列化方案(例如 pickle、JSON、protobuffers 来实现您自己的(更简单的)版本,或其他完全不同的东西)。