SQLAlchemy 中 ForeignKey 的查询过滤器

Query filter over ForeignKey in SQLAlchemy

为了简化,我有两个 tables (ORM FastAPI)

class Object(Base):
    __tablename__ = "object"
    id = Column('id', Integer, Identity(start=1), primary_key=True)
    name = Column(VARCHAR2(30), unique=True, index=True)
    attributes = relationship("Attributes", backref="parent", cascade="all, delete", passive_deletes=True)
    
class Attributes(Base):
    __tablename__ = "attributes"    
    id = Column('id', Integer, Identity(start=1), primary_key=True)
    attribute = Column(VARCHAR2(200), index=True)
    value = Column(VARCHAR2(2000), index=True)
    parent_id = Column(Integer, ForeignKey("object.id",ondelete="CASCADE"), nullable=False)

一个对象可以有多个属性,(1-N关系)。 属性是动态的(取决于对象,一些对象有 10 个属性,或 50 个...)

例如:

Object      | Attributes
---------------------------------
Object 1    | color = red
            | form = round
            | level = 5
            | ...
            | attribute alpha
---------------------------------
Object 2    | color = red
            | form = square
            | level = 2
            | ...
            | attribute beta

我想做类似的事情:

“找到 attribute.color = 红色且 attribute.level >= 2 且 attribute.X 已定义的所有对象”

我试过了:

query = db.query(Object).options( 
 joinedload(Attributes,innerjoin=False)).join(Attributes)
query = query.filter(Attributes.attribute == 'color')
query = query.filter(Attributes.value == 'red')
...
return query.all()

但我不知道如何在 table 属性上级联过滤器..

感谢您的帮助...

要实现过滤器,我会使用 any():

query = (
    session.query(Object)
    # -- NOTE: below join is not needed for the filter part;
    # .options(joinedload(Attributes, innerjoin=False))
    # .join(Attributes)
    # add additional criteria
    .filter(
        Object.attributes.any(
            and_(
                Attributes.attribute == "color",
                Attributes.value == "red",
            )
        )
    )
    .filter(
        Object.attributes.any(
            and_(
                Attributes.attribute == "level",
                func.cast(Attributes.value, Integer) >= 2,
            )
        )
    )
    .filter(Object.attributes.any(Attributes.attribute == "X"))  # exists
)

这将产生 SQL 语句(具体取决于数据库引擎):

SELECT object.id,
       object.name
FROM object
WHERE (EXISTS
         (SELECT 1
          FROM attributes
          WHERE object.id = attributes.parent_id
            AND attributes.attribute = 'color'
            AND attributes.value = 'red'))
  AND (EXISTS
         (SELECT 1
          FROM attributes
          WHERE object.id = attributes.parent_id
            AND attributes.attribute = 'level'
            AND CAST(attributes.value AS INTEGER) >= 2))
  AND (EXISTS
         (SELECT 1
          FROM attributes
          WHERE object.id = attributes.parent_id
            AND attributes.attribute = 'X'))