使用 flask sql alchemy 以原子性更新单行

update a single row with atomicity using flask sql alchemy

我的代码如下

from flask_sqlalchemy import SQLAlchemy

db = SQLAlchemy()

class Status(db.Model):
    __tablename__ = 'status'

    process_id = db.Column(db.Integer, unique=True)
    process_name = db.Column(db.String(20))
    process_status = db.Column(db.String(10))
    
    @classmethod
    def find_by_id_status(cls, process_status, process_id):
        return cls.query.filter_by(process_status=process_status, process_id=process_id).first()

    def save_to_db(self):
        db.session.add(self)
        db.session.commit()

我可以通过获取记录进行更新,并按以下方式更新

process = Status.find_by_id_status(process_status="pending", process_id="12345")
process.process_status = "processing"
process.save_to_db()

但在我的场景中,有并行请求更新它。

我希望第一个请求只更新相同的请求并为进一步的请求抛出错误

相当于mysqlshell命令

UPDATE status SET process_status = "processing" WHERE process_status = "pending" AND process_id = "12345"

第一个请求returns

查询正常,0 行受影响(0.01 秒) 匹配行:1 更改:1 警告:0

以及进一步的请求

查询正常,0 行受影响(0.01 秒) 行匹配:0 更改:0 警告:0

请帮助我使用炼金术 'bindparam' 或任何其他方式实现同​​样的目标... 提前致谢

我找到了解决方案:-

相当于sql shell cmd

UPDATE status SET process_status = "processing" WHERE process_status = "pending" AND process_id = "12345"

在 SQLAlchemy 中是

# add this to existing methods

@classmethod
def update_status(cls, process_id, process_status, new_status):
    return cls.query.filter_by(
        process_id=process_id, 
        process_status=process_status
        ).update(
            {
                'process_status': new_status
            },
            synchronize_session='fetch'
    )

用法 第一个请求

result = Status.update_status(process_id="12345", process_status="pending", new_status="processing")

打印结果

1

(1 条记录匹配并更新)

下次开始

result = Status.update_status(process_id="12345", process_status="pending", new_status="processing")

打印结果

0

(0 条记录匹配)