Flask-SQLALchemy 在特定时间后自动更新记录

Flask-SQLALchemy update record automatically after specific time

我有一个这样的数据库模型:

class Payment(db.Model):
    id = db.Column(db.Integer(), primary_key=True)
    user_id = db.Column(db.Integer(), db.ForeignKey('user.id'))
    ticket_status = db.Column(db.Enum(TicketStatus, name='ticket_status', default=TicketStatus.UNUSED))
    departure_time = db.Column(db.Date)

我想在 datetime.utcnow() 传递 departure_time 的日期值后更改所有 ticket_status 的值。

我试过这样编码:

class TicketStatus(enum.Enum):
    UNUSED = 'UNUSED'
    USED = 'USED'
    EXPIRED = 'EXPIRED'

    def __repr__(self):
        return str(self.value)


class Payment(db.Model):
    id = db.Column(db.Integer(), primary_key=True)
    user_id = db.Column(db.Integer(), db.ForeignKey('user.id'))
    ticket_status = db.Column(db.Enum(TicketStatus, name='ticket_status', default=TicketStatus.UNUSED))
    departure_time = db.Column(db.Date)

    # TODO | set ticket expirations time
    def __init__(self):
        if datetime.utcnow() > self.departure_time:
            self.ticket_status = TicketStatus.EXPIRED.value
        try:
            db.session.add(self)
            db.session.commit()
        except Exception as e:
            db.session.rollback()

我也这样试过:

def ticket_expiration(self, payment_id):
    now = datetime.utcnow().strftime('%Y-%m-%d')
    payment = Payment.query.filter_by(id=payment_id).first()
    if payment.ticket_status.value == TicketStatus.USED.value:
        pass
    elif payment and str(payment.departure_time) < now:
        payment.ticket_status = TicketStatus.EXPIRED.value
    elif payment and str(payment.departure_time) >= now:
        payment.ticket_status = TicketStatus.UNUSED.value
    try:
        db.session.commit()
    except Exception as e:
        db.session.rollback()
    return str('ok')

但是当datetime.utcnow()departure_time传递日期值时似乎没有效果。

所以我的问题是,如何在一组时间后自动更改行中的值..?

您可以只用 "used" 列替换您的状态列,该列将包含布尔值并为状态创建一个混合属性。 https://docs.sqlalchemy.org/en/13/orm/extensions/hybrid.html

class Payment(db.Model):
    id = db.Column(db.Integer(), primary_key=True)
    user_id = db.Column(db.Integer(), db.ForeignKey('user.id'))
    used = db.Column(db.Boolean(), default=False)
    departure_time = db.Column(db.Date)

    @hybrid_property
    def status(self):
        if datetime.utcnow() > self.departure_time:
            return "EXPIRED"
        elif self.used:
            return "USED"
        return "UNUSED"

最后我通过使用 flask_apscheduler 解决了这个问题,这里是我解决这个问题的代码片段:

安装flask_apscheduler:

pip3 install flask_apscheduler

创建新模块tasks.py

from datetime import datetime

from flask_apscheduler import APScheduler

from app import db
from app.models import Payment, TicketStatus

scheduler = APScheduler()


def ticket_expiration():
    utc_now = datetime.utcnow().strftime('%Y-%m-%d')
    app = scheduler.app
    with app.app_context():
        payment = Payment.query.all()
        for data in payment:
            try:
                if data.ticket_status.value == TicketStatus.USED.value:
                    pass
                elif str(data.departure_time) < utc_now:
                    data.ticket_status = TicketStatus.EXPIRED.value
                elif str(data.departure_time) >= utc_now:
                    data.ticket_status = TicketStatus.UNUSED.value
            except Exception as e:
                print(str(e))
            try:
                db.session.commit()
            except Exception as e:
                db.session.rollback()
    return str('ok')

然后在__init__.py

中用flask app注册包
def create_app(config_class=Config):
    app = Flask(__name__)
    app.config.from_object(Config)
    # The other packages...
    # The other packages...
    scheduler.init_app(app)
    scheduler.start()

    return app

# import from other_module...
# To avoid SQLAlchemy circular import, do the import at the bottom.
from app.tasks import scheduler 

这里是 config.py:

class Config(object):
    # The others config...
    # The others config...

    # Flask-apscheduler
    JOBS = [
        {
            'id': 'ticket_expiration',
            'func': 'app.tasks:ticket_expiration',
            'trigger': 'interval',
            'hours': 1, # call the task function every 1 hours
            'replace_existing': True
        }
    ]
    SCHEDULER_JOBSTORES = {
        'default': SQLAlchemyJobStore(url='sqlite:///flask_context.db')
    }
    SCHEDULER_API_ENABLED = True

在上面的配置中,我们可以根据我们的情况调用每1小时,秒或其他时间更新db的函数,有关设置间隔时间的更多信息,我们可以看到它here

我希望这个回答对以后遇到这个问题的人有所帮助。