如何处理 ProcessPool 中的 SQLAlchemy 连接?

How to handle SQLAlchemy Connections in ProcessPool?

我有一个从 RabbitMQ 代理获取消息并触发工作方法在进程池中处理这些消息的反应器,如下所示:

这是使用 python asyncioloop.run_in_executor()concurrent.futures.ProcessPoolExecutor 实现的。

现在我想使用 SQLAlchemy 在辅助方法中访问数据库。大多数情况下,处理将是非常直接和快速的 CRUD 操作。

反应器一开始每秒处理10-50条消息,所以不能接受每个请求都打开一个新的数据库连接。相反,我想为每个进程维护一个持久连接。

我的问题是:我该怎么做?我可以将它们存储在全局变量中吗? SQA 连接池会为我处理这个吗?反应堆停止时如何清理?

[更新]

为什么选择这种带有进程池的模式?

当前的实现使用不同的模式,其中每个消费者都在自己的线程中运行。不知何故,这不是很好。已经有大约 200 个消费者 运行 在他们自己的线程中,并且系统正在快速增长。为了更好地扩展,我们的想法是分离关注点并在 I/O 循环中使用消息并将处理委托给池。当然,整个系统的性能主要受I/O约束。但是,CPU 在处理大型结果集时会出现问题。

另一个原因是"ease of use."虽然消息的连接处理和消费是异步实现的,但worker中的代码可以是同步的并且很简单。

很快就很明显,从工作人员内部通过持久网络连接访问远程系统是一个问题。这就是 CommunicationChannels 的用途:在 worker 内部,我可以通过这些通道向消息总线授予请求。

我目前的一个想法是以类似的方式处理数据库访问:通过队列将语句传递到事件循环,然后将它们发送到数据库。但是,我不知道如何使用 SQLAlchemy 执行此操作。 切入点在哪里? 对象在通过队列传递时需要 pickled。我如何从 SQA 查询中获取这样的对象? 为了不阻塞事件循环,与数据库的通信必须异步进行。我可以使用例如aiomysql 作为 SQA 的数据库驱动程序?

@roman:挑战不错。

我以前遇到过类似的情况,所以这是我的 2 美分:除非这个消费者只有 "read""write" 消息,无需对其进行任何实际处理,您可以重新设计 此消费者作为 consumer/producer这将 消费 消息,它将处理消息,然后将结果放入另一个队列,该队列(比如说已处理的消息)可以由 1..N 非池化异步读取本应在其自身的整个生命周期中打开数据库连接的进程。

我可以扩展我的答案,但我不知道这种方法是否适合您的需求,如果是,我可以为您提供有关扩展设计的更多详细信息。

对我非常有用的一种方法是使用网络服务器来处理和扩展进程池。 flask-sqlalchemy 即使在其默认状态下也会保留一个连接池,并且不会在每个请求响应周期关闭每个连接。

asyncio 执行器只需调用 url 端点即可执行您的函数。额外的好处是,因为执行工作的所有进程都在 url 之后,您可以在多台机器上轻松扩展您的工作池,通过 gunicorn 或其他许多方法之一添加更多进程来扩展一个简单的 wsgi 服务器。另外,您还可以获得所有容错功能。

缺点是您可能会通过网络传递更多信息。但是,正如您所说,问题是 CPU 绑定,您可能会向数据库传入和传出更多数据。

如果您在实例化 session 时多加注意,假设您正在工作,则可以轻松满足 每个进程池进程一个数据库连接 的要求使用 orm,在工作进程中。

一个简单的解决方案是拥有一个全局 session,您可以跨请求重复使用它:

# db.py
engine = create_engine("connection_uri", pool_size=1, max_overflow=0)
DBSession = scoped_session(sessionmaker(bind=engine)) 

关于工人任务:

# task.py
from db import engine, DBSession
def task():
    DBSession.begin() # each task will get its own transaction over the global connection
    ...
    DBSession.query(...)
    ...
    DBSession.close() # cleanup on task end

参数 pool_sizemax_overflow customize the default QueuePool 由 create_engine 使用。pool_size 将确保您的进程在进程中每个进程只保持 1 个连接。池.

如果您希望它重新连接,您可以使用 DBSession.remove() 这将从注册表中删除会话,并使其在下次使用 DBSession 时重新连接。您还可以使用 Poolrecycle 参数使连接在指定的时间后重新连接。

在 development/debbuging 期间,您可以使用 AssertionPool which will raise an exception if more than one connection is checked-out from the pool, see switching pool implementations 了解如何做到这一点。