在 scoped_session 中跨越进程边界

across process boundary in scoped_session

我正在使用 SQLAlchemy 和多处理。我也使用 scoped_session 因为它避免共享同一个会话,但我发现了一个错误和他们的解决方案,但我不明白为什么会这样。

你可以在下面看到我的代码:

db.py

engine = create_engine(connection_string)

Session = sessionmaker(bind=engine)
DBSession = scoped_session(Session)

script.py

from multiprocessing import Pool, current_process
from db import DBSession

def process_feed(test):
    session = DBSession()
    print(current_process().name, session)

def run():
    session = DBSession()
    pool = Pool()
    print(current_process().name, session)
    pool.map_async(process_feed, [1, 2]).get()

if __name__ == "__main__":
    run()

当我运行script.py输出为:

MainProcess <sqlalchemy.orm.session.Session object at 0xb707b14c>
ForkPoolWorker-1 <sqlalchemy.orm.session.Session object at 0xb707b14c>
ForkPoolWorker-2 <sqlalchemy.orm.session.Session object at 0xb707b14c>

请注意,会话对象在主进程及其工作进程(子进程)中是相同的0xb707b14c

但是如果我改变前两行的顺序 运行():

def run():
    pool = Pool() # <--- Now pool is instanced in the first line
    session = DBSession()  # <--- Now session is instanced in the second line
    print(current_process().name, session)
    pool.map_async(process_feed, [1, 2]).get()

而我 运行 script.py 再次输出为:

MainProcess <sqlalchemy.orm.session.Session object at 0xb66907cc>
ForkPoolWorker-1 <sqlalchemy.orm.session.Session object at 0xb669046c>
ForkPoolWorker-2 <sqlalchemy.orm.session.Session object at 0xb66905ec>

现在会话实例不同了。

要理解为什么会发生这种情况,您需要了解 scoped_sessionPool 的实际作用。 scoped_session 保留会话注册表以便发生以下情况

  • 您第一次调用 DBSession 时,它会在注册表中为您创建一个 Session 对象
  • 随后,如果满足必要条件(即同一线程,会话尚未关闭),它不会创建新的 Session 对象,而是 returns 您之前创建的 Session 对象返回

当您创建 Pool 时,它会在 __init__ 方法中创建工人。 (请注意,在 __init__ 中启动工作进程没有任何基础。同样有效的实现可以等到第一次需要工作人员后再启动它们,这将在您的示例中表现出不同的行为。)当这种情况发生时(在 Unix 上) ), 父进程forks自身为每个worker进程,这涉及到操作系统将当前运行ning进程的内存复制到一个新进程中,所以你会从字面上得到完全相同的对象在完全相同的地方。

将这两个放在一起,在第一个示例中,您在分叉之前创建了一个 Session,它在创建 Pool 期间被复制到所有工作进程,导致相同的标识,而在第二个示例中,您将 Session 对象的创建延迟到工作进程启动之后,从而导致不同的身份。

重要的是要注意,虽然 Session 对象共享相同的 id,但它们 不是 相同的对象,从某种意义上说,如果您在父进程中更改有关 Session 的任何内容,它们将不会反映在子进程中。由于分叉,它们恰好共享相同的内存地址。 但是,OS级资源,如连接共享的,所以如果你运行查询sessionPool() 之前,会在连接池中为您创建一个连接,然后分叉到子进程中。如果您随后尝试在子进程中执行查询,您将 运行 出现奇怪的错误,因为您的进程通过相同的确切连接相互破坏!

以上内容对 Windows 没有实际意义,因为 Windows 没有 fork().

TCP connections are represented as file descriptors, which usually work across process boundaries, meaning this will cause concurrent access to the file descriptor on behalf of two or more entirely independent Python interpreter states.

https://docs.sqlalchemy.org/en/13/core/pooling.html#using-connection-pools-with-multiprocessing