SQLAlchemy 查询与多列元组比较
SQLAlchemy query with where with comparison on tuple of multiple columns
我有一个查询需要在我的应用程序中支持分页。
def get_paginated_items(
self,
limit: int,
last_shipment_time: int = MAX_INT,
last_item_id: int = MAX_INT,
s = (
select([self.items])
.where((
self.items.c.shipment_time,
self.items.c.item_id,
) < (
last_shipment_time,
last_item_id,
))
.order_by(
self.items.c.shipment_time.desc(),
self.items.c.item_id.desc(),
)
.limit(limit)
)
with self.get_connection() as connection:
result = connection.execute(s)
return result.fetchall()
如果两个项目具有相同的shipment_time
,则应该在比较中使用item_id
打破平局。
显然 SQL 对此有支持 ()。但是,当我使用这个查询时,它似乎忽略了元组的第二个元素。
我正在为数据库使用 SQLite。
您需要在where条件中指定您使用的是元组:
from sqlalchemy import tuple_
def get_paginated_items(
self,
limit: int,
last_shipment_time: int = MAX_INT,
last_item_id: int = MAX_INT,
s = (
select([self.items])
.where(tuple_(
self.items.c.shipment_time,
self.items.c.item_id,
) < tuple_(
last_shipment_time,
last_item_id,
))
.order_by(
self.items.c.shipment_time.desc(),
self.items.c.item_id.desc(),
)
.limit(limit)
)
with self.get_connection() as connection:
result = connection.execute(s)
return result.fetchall()
我有一个查询需要在我的应用程序中支持分页。
def get_paginated_items(
self,
limit: int,
last_shipment_time: int = MAX_INT,
last_item_id: int = MAX_INT,
s = (
select([self.items])
.where((
self.items.c.shipment_time,
self.items.c.item_id,
) < (
last_shipment_time,
last_item_id,
))
.order_by(
self.items.c.shipment_time.desc(),
self.items.c.item_id.desc(),
)
.limit(limit)
)
with self.get_connection() as connection:
result = connection.execute(s)
return result.fetchall()
如果两个项目具有相同的shipment_time
,则应该在比较中使用item_id
打破平局。
显然 SQL 对此有支持 (
我正在为数据库使用 SQLite。
您需要在where条件中指定您使用的是元组:
from sqlalchemy import tuple_
def get_paginated_items(
self,
limit: int,
last_shipment_time: int = MAX_INT,
last_item_id: int = MAX_INT,
s = (
select([self.items])
.where(tuple_(
self.items.c.shipment_time,
self.items.c.item_id,
) < tuple_(
last_shipment_time,
last_item_id,
))
.order_by(
self.items.c.shipment_time.desc(),
self.items.c.item_id.desc(),
)
.limit(limit)
)
with self.get_connection() as connection:
result = connection.execute(s)
return result.fetchall()