如何从 peewee 的日期时间中减去时间增量?
How to subtract a timedelta from a datetime in peewee?
考虑下表:
class Recurring(db.Model):
schedule = ForeignKeyField(Schedule)
occurred_at = DateTimeField(default=datetime.now)
class Schedule(db.Model):
delay = IntegerField() # I would prefer if we had a TimeDeltaField
现在,我想要获取所有应该重复发生的事件:
query = Recurring.select(Recurring, Schedule).join(Schedule)
query = query.where(Recurring.occurred_at < now - Schedule.delay) # wishful
不幸的是,这不起作用。因此,我目前正在做如下事情:
for schedule in schedules:
then = now - timedelta(minutes=schedule.delay)
query = Recurring.select(Recurring, Schedule).join(Schedule)
query = query.where(Schedule == schedule, Recurring.occurred_at < then)
但是,现在我不是执行一个查询,而是执行多个查询。
有没有一种方法可以只使用一个查询来解决上述问题?我想到的一种解决方案是:
class Recurring(db.Model):
schedule = ForeignKeyField(Schedule)
occurred_at = DateTimeField(default=datetime.now)
repeat_after = DateTimeField() # repeat_after = occurred_at + delay
query = Recurring.select(Recurring, Schedule).join(Schedule)
query = query.where(Recurring.repeat_after < now)
但是,上面的schema违反了第三范式的规则。
每个数据库都实现了不同的日期时间添加功能,这很糟糕。所以这将取决于您使用的是什么数据库。
例如,对于 postgres,我们可以使用 "interval" 助手:
# Calculate the timestamp of the next occurrence. This is done
# by taking the last occurrence and adding the number of seconds
# indicated by the schedule.
one_second = SQL("INTERVAL '1 second'")
next_occurrence = Recurring.occurred_at + (one_second * Schedule.delay)
# Get all recurring rows where the current timestamp on the
# postgres server is greater than the calculated next occurrence.
query = (Recurring
.select(Recurring, Schedule)
.join(Schedule)
.where(SQL('current_timestamp') >= next_occurrence))
for recur in query:
print(recur.occurred_at, recur.schedule.delay)
如果您愿意,也可以用日期时间对象代替 "current_timestamp":
my_dt = datetime.datetime(2019, 3, 1, 3, 3, 7)
...
.where(Value(my_dt) >= next_occurrence)
对于 SQLite,你会做:
# Convert to a timestamp, add the scheduled seconds, then convert back
# to a datetime string for comparison with the last occurrence.
next_ts = fn.strftime('%s', Recurring.occurred_at) + Schedule.delay
next_occurrence = fn.datetime(next_ts, 'unixepoch')
对于MySQL,你会做:
# from peewee import NodeList
nl = NodeList((SQL('INTERVAL'), Schedule.delay, SQL('SECOND')))
next_occurrence = fn.date_add(Recurring.occurred_at, nl)
最后,我建议您为 models/fields 尝试更好的名字。即,Schedule.interval 而不是 Schedule.delay,Recurring.last_run 而不是 occurred_at。
考虑下表:
class Recurring(db.Model):
schedule = ForeignKeyField(Schedule)
occurred_at = DateTimeField(default=datetime.now)
class Schedule(db.Model):
delay = IntegerField() # I would prefer if we had a TimeDeltaField
现在,我想要获取所有应该重复发生的事件:
query = Recurring.select(Recurring, Schedule).join(Schedule)
query = query.where(Recurring.occurred_at < now - Schedule.delay) # wishful
不幸的是,这不起作用。因此,我目前正在做如下事情:
for schedule in schedules:
then = now - timedelta(minutes=schedule.delay)
query = Recurring.select(Recurring, Schedule).join(Schedule)
query = query.where(Schedule == schedule, Recurring.occurred_at < then)
但是,现在我不是执行一个查询,而是执行多个查询。
有没有一种方法可以只使用一个查询来解决上述问题?我想到的一种解决方案是:
class Recurring(db.Model):
schedule = ForeignKeyField(Schedule)
occurred_at = DateTimeField(default=datetime.now)
repeat_after = DateTimeField() # repeat_after = occurred_at + delay
query = Recurring.select(Recurring, Schedule).join(Schedule)
query = query.where(Recurring.repeat_after < now)
但是,上面的schema违反了第三范式的规则。
每个数据库都实现了不同的日期时间添加功能,这很糟糕。所以这将取决于您使用的是什么数据库。
例如,对于 postgres,我们可以使用 "interval" 助手:
# Calculate the timestamp of the next occurrence. This is done
# by taking the last occurrence and adding the number of seconds
# indicated by the schedule.
one_second = SQL("INTERVAL '1 second'")
next_occurrence = Recurring.occurred_at + (one_second * Schedule.delay)
# Get all recurring rows where the current timestamp on the
# postgres server is greater than the calculated next occurrence.
query = (Recurring
.select(Recurring, Schedule)
.join(Schedule)
.where(SQL('current_timestamp') >= next_occurrence))
for recur in query:
print(recur.occurred_at, recur.schedule.delay)
如果您愿意,也可以用日期时间对象代替 "current_timestamp":
my_dt = datetime.datetime(2019, 3, 1, 3, 3, 7)
...
.where(Value(my_dt) >= next_occurrence)
对于 SQLite,你会做:
# Convert to a timestamp, add the scheduled seconds, then convert back
# to a datetime string for comparison with the last occurrence.
next_ts = fn.strftime('%s', Recurring.occurred_at) + Schedule.delay
next_occurrence = fn.datetime(next_ts, 'unixepoch')
对于MySQL,你会做:
# from peewee import NodeList
nl = NodeList((SQL('INTERVAL'), Schedule.delay, SQL('SECOND')))
next_occurrence = fn.date_add(Recurring.occurred_at, nl)
最后,我建议您为 models/fields 尝试更好的名字。即,Schedule.interval 而不是 Schedule.delay,Recurring.last_run 而不是 occurred_at。