Flask-SQLALchemy 在特定时间后自动更新记录
Flask-SQLALchemy update record automatically after specific time
我有一个这样的数据库模型:
class Payment(db.Model):
id = db.Column(db.Integer(), primary_key=True)
user_id = db.Column(db.Integer(), db.ForeignKey('user.id'))
ticket_status = db.Column(db.Enum(TicketStatus, name='ticket_status', default=TicketStatus.UNUSED))
departure_time = db.Column(db.Date)
我想在 datetime.utcnow()
传递 departure_time
的日期值后更改所有 ticket_status
的值。
我试过这样编码:
class TicketStatus(enum.Enum):
UNUSED = 'UNUSED'
USED = 'USED'
EXPIRED = 'EXPIRED'
def __repr__(self):
return str(self.value)
class Payment(db.Model):
id = db.Column(db.Integer(), primary_key=True)
user_id = db.Column(db.Integer(), db.ForeignKey('user.id'))
ticket_status = db.Column(db.Enum(TicketStatus, name='ticket_status', default=TicketStatus.UNUSED))
departure_time = db.Column(db.Date)
# TODO | set ticket expirations time
def __init__(self):
if datetime.utcnow() > self.departure_time:
self.ticket_status = TicketStatus.EXPIRED.value
try:
db.session.add(self)
db.session.commit()
except Exception as e:
db.session.rollback()
我也这样试过:
def ticket_expiration(self, payment_id):
now = datetime.utcnow().strftime('%Y-%m-%d')
payment = Payment.query.filter_by(id=payment_id).first()
if payment.ticket_status.value == TicketStatus.USED.value:
pass
elif payment and str(payment.departure_time) < now:
payment.ticket_status = TicketStatus.EXPIRED.value
elif payment and str(payment.departure_time) >= now:
payment.ticket_status = TicketStatus.UNUSED.value
try:
db.session.commit()
except Exception as e:
db.session.rollback()
return str('ok')
但是当datetime.utcnow()
从departure_time
传递日期值时似乎没有效果。
所以我的问题是,如何在一组时间后自动更改行中的值..?
您可以只用 "used" 列替换您的状态列,该列将包含布尔值并为状态创建一个混合属性。 https://docs.sqlalchemy.org/en/13/orm/extensions/hybrid.html
class Payment(db.Model):
id = db.Column(db.Integer(), primary_key=True)
user_id = db.Column(db.Integer(), db.ForeignKey('user.id'))
used = db.Column(db.Boolean(), default=False)
departure_time = db.Column(db.Date)
@hybrid_property
def status(self):
if datetime.utcnow() > self.departure_time:
return "EXPIRED"
elif self.used:
return "USED"
return "UNUSED"
最后我通过使用 flask_apscheduler 解决了这个问题,这里是我解决这个问题的代码片段:
安装flask_apscheduler:
pip3 install flask_apscheduler
创建新模块tasks.py
from datetime import datetime
from flask_apscheduler import APScheduler
from app import db
from app.models import Payment, TicketStatus
scheduler = APScheduler()
def ticket_expiration():
utc_now = datetime.utcnow().strftime('%Y-%m-%d')
app = scheduler.app
with app.app_context():
payment = Payment.query.all()
for data in payment:
try:
if data.ticket_status.value == TicketStatus.USED.value:
pass
elif str(data.departure_time) < utc_now:
data.ticket_status = TicketStatus.EXPIRED.value
elif str(data.departure_time) >= utc_now:
data.ticket_status = TicketStatus.UNUSED.value
except Exception as e:
print(str(e))
try:
db.session.commit()
except Exception as e:
db.session.rollback()
return str('ok')
然后在__init__.py
中用flask app注册包
def create_app(config_class=Config):
app = Flask(__name__)
app.config.from_object(Config)
# The other packages...
# The other packages...
scheduler.init_app(app)
scheduler.start()
return app
# import from other_module...
# To avoid SQLAlchemy circular import, do the import at the bottom.
from app.tasks import scheduler
这里是 config.py:
class Config(object):
# The others config...
# The others config...
# Flask-apscheduler
JOBS = [
{
'id': 'ticket_expiration',
'func': 'app.tasks:ticket_expiration',
'trigger': 'interval',
'hours': 1, # call the task function every 1 hours
'replace_existing': True
}
]
SCHEDULER_JOBSTORES = {
'default': SQLAlchemyJobStore(url='sqlite:///flask_context.db')
}
SCHEDULER_API_ENABLED = True
在上面的配置中,我们可以根据我们的情况调用每1小时,秒或其他时间更新db的函数,有关设置间隔时间的更多信息,我们可以看到它here。
我希望这个回答对以后遇到这个问题的人有所帮助。
我有一个这样的数据库模型:
class Payment(db.Model):
id = db.Column(db.Integer(), primary_key=True)
user_id = db.Column(db.Integer(), db.ForeignKey('user.id'))
ticket_status = db.Column(db.Enum(TicketStatus, name='ticket_status', default=TicketStatus.UNUSED))
departure_time = db.Column(db.Date)
我想在 datetime.utcnow()
传递 departure_time
的日期值后更改所有 ticket_status
的值。
我试过这样编码:
class TicketStatus(enum.Enum):
UNUSED = 'UNUSED'
USED = 'USED'
EXPIRED = 'EXPIRED'
def __repr__(self):
return str(self.value)
class Payment(db.Model):
id = db.Column(db.Integer(), primary_key=True)
user_id = db.Column(db.Integer(), db.ForeignKey('user.id'))
ticket_status = db.Column(db.Enum(TicketStatus, name='ticket_status', default=TicketStatus.UNUSED))
departure_time = db.Column(db.Date)
# TODO | set ticket expirations time
def __init__(self):
if datetime.utcnow() > self.departure_time:
self.ticket_status = TicketStatus.EXPIRED.value
try:
db.session.add(self)
db.session.commit()
except Exception as e:
db.session.rollback()
我也这样试过:
def ticket_expiration(self, payment_id):
now = datetime.utcnow().strftime('%Y-%m-%d')
payment = Payment.query.filter_by(id=payment_id).first()
if payment.ticket_status.value == TicketStatus.USED.value:
pass
elif payment and str(payment.departure_time) < now:
payment.ticket_status = TicketStatus.EXPIRED.value
elif payment and str(payment.departure_time) >= now:
payment.ticket_status = TicketStatus.UNUSED.value
try:
db.session.commit()
except Exception as e:
db.session.rollback()
return str('ok')
但是当datetime.utcnow()
从departure_time
传递日期值时似乎没有效果。
所以我的问题是,如何在一组时间后自动更改行中的值..?
您可以只用 "used" 列替换您的状态列,该列将包含布尔值并为状态创建一个混合属性。 https://docs.sqlalchemy.org/en/13/orm/extensions/hybrid.html
class Payment(db.Model):
id = db.Column(db.Integer(), primary_key=True)
user_id = db.Column(db.Integer(), db.ForeignKey('user.id'))
used = db.Column(db.Boolean(), default=False)
departure_time = db.Column(db.Date)
@hybrid_property
def status(self):
if datetime.utcnow() > self.departure_time:
return "EXPIRED"
elif self.used:
return "USED"
return "UNUSED"
最后我通过使用 flask_apscheduler 解决了这个问题,这里是我解决这个问题的代码片段:
安装flask_apscheduler:
pip3 install flask_apscheduler
创建新模块tasks.py
from datetime import datetime
from flask_apscheduler import APScheduler
from app import db
from app.models import Payment, TicketStatus
scheduler = APScheduler()
def ticket_expiration():
utc_now = datetime.utcnow().strftime('%Y-%m-%d')
app = scheduler.app
with app.app_context():
payment = Payment.query.all()
for data in payment:
try:
if data.ticket_status.value == TicketStatus.USED.value:
pass
elif str(data.departure_time) < utc_now:
data.ticket_status = TicketStatus.EXPIRED.value
elif str(data.departure_time) >= utc_now:
data.ticket_status = TicketStatus.UNUSED.value
except Exception as e:
print(str(e))
try:
db.session.commit()
except Exception as e:
db.session.rollback()
return str('ok')
然后在__init__.py
中用flask app注册包def create_app(config_class=Config):
app = Flask(__name__)
app.config.from_object(Config)
# The other packages...
# The other packages...
scheduler.init_app(app)
scheduler.start()
return app
# import from other_module...
# To avoid SQLAlchemy circular import, do the import at the bottom.
from app.tasks import scheduler
这里是 config.py:
class Config(object):
# The others config...
# The others config...
# Flask-apscheduler
JOBS = [
{
'id': 'ticket_expiration',
'func': 'app.tasks:ticket_expiration',
'trigger': 'interval',
'hours': 1, # call the task function every 1 hours
'replace_existing': True
}
]
SCHEDULER_JOBSTORES = {
'default': SQLAlchemyJobStore(url='sqlite:///flask_context.db')
}
SCHEDULER_API_ENABLED = True
在上面的配置中,我们可以根据我们的情况调用每1小时,秒或其他时间更新db的函数,有关设置间隔时间的更多信息,我们可以看到它here。
我希望这个回答对以后遇到这个问题的人有所帮助。