FastApi Sqlalchemy 如何管理事务(会话和多次提交)

FastApi Sqlalchemy how to manage transaction (session and multiple commits)

我有一个带有插入和更新功能的 CRUD,每个功能的末尾都有 commit,如下所示:

@staticmethod
def insert(db: Session, item: Item) -> None:
    db.add(item)
    db.commit()
   
   
@staticmethod
def update(db: Session, item: Item) -> None:
    ...
    db.commit()

我有一个端点从 FastAPI 依赖项接收 sqlalchemy 会话,需要自动插入和更新(数据库事务)。

处理事务时的最佳做法是什么?我无法使用 CRUD,因为它不止一个 commit.

我应该如何处理交易?你在哪里提交你的会话?在增删改查?或者每个请求在 FastAPI 依赖函数中只出现一次?

我在使用 FastAPI 时遇到了同样的问题。我找不到在单独的方法中使用 commit 并让它们进行事务处理的方法。 我最终做的是 flush 而不是 commit,它将更改发送到数据库,但不提交事务。

需要注意的一件事是,在 FastAPI 中,每个请求都会打开一个新会话,并在完成后关闭它。这将是使用 SQLAlchemy 文档中的 example 发生的事情的粗略示例。

def run_my_program():
    # This happens in the `database = SessionLocal()` of the `get_db` method below
    session = Session()
    try:
        ThingOne().go(session)
        ThingTwo().go(session)

        session.commit()
    except:
        session.rollback()
        raise
    finally:
        # This is the same as the `get_db` method below
        session.close()

为请求生成的会话已经是一个事务。当您提交该会话时,实际正在做的是这个

When using the Session in its default mode of autocommit=False, a new transaction will be begun immediately after the commit, but note that the newly begun transaction does not use any connection resources until the first SQL is actually emitted.

阅读后我认为在端点范围内处理 commitrollback 是有意义的。

我创建了一个虚拟示例来说明它是如何工作的。我使用 FastAPI guide.

中的所有内容
def create_user(db: Session, user: UserCreate):
    """
    Create user record
    """
    fake_hashed_password = user.password + "notreallyhashed"
    db_user = models.User(email=user.email, hashed_password=fake_hashed_password)
    db.add(db_user)
    db.flush() # Changed this to a flush
    return db_user

然后在endpoint中使用crud操作如下

from typing import List
from fastapi import Depends, HTTPException
from sqlalchemy.orm import Session

...

def get_db():
    """
    Get SQLAlchemy database session
    """
    database = SessionLocal()
    try:
        yield database
    finally:
        database.close()

@router.post("/users", response_model=List[schemas.User])
def create_users(user_1: schemas.UserCreate, user_2: schemas.UserCreate, db: Session = Depends(get_db)):
    """
    Create two users
    """
    try:
        user_1 = crud.create_user(db=db, user=user_1)
        user_2 = crud.create_user(db=db, user=user_2)
        db.commit()
        return [user_1, user_2]
    except:
        db.rollback()
        raise HTTPException(status_code=400, detail="Duplicated user")

将来我可能会研究将其移至中间件,但我认为使用 commit 无法获得所需的行为。