如何使用 psycopg2 以点表示法方式使用别名对两个连接表的许多字段正确构建 SELECT 查询

How to properly build a SELECT query on many fields from two joined tables using their aliases in a dot notation manner with psycopg2

有没有办法使用 psycopg2:

将这些标识符转换为 PostgreSQL 查询
total_query_fields = (
    'p.id',
    'p.name',
    'p.type',
    'p.price',
    'o.date',        # please, notice the 'o' alias here
    'o.transaction', # please, notice the 'o' alias here
    'p.warehouse',
    'p.location',
)

# they may get split into their own tables if necessary:
product_query_fields = ('id', 'name', 'type', 'price', 'warehouse', 'location',)
order_query_fields = ('date', 'transaction',)

变成这样的东西:

import psycopg2
from psycopg2 import sql

myid = 100

sql_query = sql.SQL("""
    SELECT {fields} FROM product p
    INNER JOIN owner o ON p.id = o.product_id
    WHERE p.id = {jidx} AND (o.dateof_purchase IS NOT NULL
    OR o.state = 'checked_out' );"""
).format(
    fields = # there should be the readable ***magic***
    jidx = sql.Literal(myid)
)

?

即使在理解 my problem by reading 之后,我也无法找到使用点分符号获取我的字段列表的好方法。我猜肯定可能使用了两个 map()sql.SQL('.').join(...)sql.SQL(', ').join(...)
或者更优雅的东西,例如使用 SELECT {}.{}...?


因为目前我遇到了麻烦:
fields = sql.SQL(', ').join(map(sql.Identifier, total_query_fields)),

因为它会用双引号转义所有序列"table.fields",这在SQL:

中肯定是无效的
# /!\ INVALID SQL QUERY /!\:
print(sql_query.as_string(conn))
# will print:
# SELECT "p.id", "p.name", "p.type", "p.price", "o.date", "o.transaction", "p.warehouse", "p.location" FROM product p
#    INNER JOIN owner o ON p.id = o.product_id
#    WHERE p.id = 100 AND (o.dateof_purchase IS NOT NULL
#    OR o.state = 'checked_out' );

确实,如果我 copy/paste 最后一个查询直接在我的 favorite PostgreSQL query tool:

ERROR:  column "p.id" does not exist
LINE 1: SELECT "p.id", "p.name", "p.type", "p.price", "o.date",...
               ^
HINT:  Perhaps you meant to reference the column "p.id" or the column "o.id".
SQL state: 42703
Character: 8

psycopg2 显然引发了同样的错误:

UndefinedColumn: column "p.id" does not exist
LINE 1: SELECT "p.id", "p.name", "p.type", "p.price", "o.date...
               ^
HINT:  Perhaps you meant to reference the column "p.id" or the column "o.id".

文档也清楚地说明了这一点:

版本控制:

psycopg2.__version__
 '2.9.1 (dt dec pq3 ext lo64)'

如果我理解正确的话,我认为这里的技巧是 sql.Identifier 接受 一个或多个 字符串 (*strings),所以你可以拆分点上的列并将两部分传递给 sql.Identifier,这将构成所需的 "alias"."column" 结果。

>>> i = sql.Identifier('a', 'col')
>>> i.strings
('a', 'col')
>>> conn = psycopg2.connect(database='test')
>>> cur = conn.cursor()
>>> i.as_string(cur)
'"a"."col"'

引用所有字段可以这样做:

fields = sql.SQL(', ').join(sql.Identifier(*f.split('.')) for f in total_query_fields)

sql_query = sql.SQL(
    """
    SELECT {fields} FROM product p
    INNER JOIN owner o ON p.id = o.product_id
    WHERE p.id = {jidx} AND (o.dateof_purchase IS NOT NULL
    OR o.state = 'checked_out' );"""
).format(fields=fields, jidx=sql.Literal(myid))

结果查询(来自 cursor.mogrify)是

b'\n    SELECT "p"."id", "p"."name", "p"."type", "p"."price", "o"."date", "o"."transaction", "p"."warehouse", "p"."location" FROM product p\n    INNER JOIN owner o ON p.id = o.product_id\n    WHERE p.id = 100 AND (o.dateof_purchase IS NOT NULL\n    OR o.state = \'checked_out\' );'

如果您更喜欢使用 map 而不是生成器表达式,您可以使用 itertools.starmap

from itertools import starmap

fields = sql.SQL(', ').join(
    starmap(sql.Identifier, map(lambda f: f.split('.'), total_query_fields)))

我最终找到了一种方法(复杂但有效):

list(
    map(sql.SQL('.').join,
    zip(
        map(sql.Identifier, len(product_query_fields)*'p'),
        map(sql.Identifier, product_query_fields))
    )
)

这将打印:

[Composed([Identifier('p'), SQL('.'), Identifier('id')]),
 Composed([Identifier('p'), SQL('.'), Identifier('name')]),
 Composed([Identifier('p'), SQL('.'), Identifier('type')]),
 Composed([Identifier('p'), SQL('.'), Identifier('price')]),
 Composed([Identifier('p'), SQL('.'), Identifier('warehouse')]),
 Composed([Identifier('p'), SQL('.'), Identifier('location')])]

因此,下面将使用点符号构造完整的标识符对象,注意 zip() 内置的用法:

product_table_name_or_alias = 'p'
products_composed = sql.SQL(', ').join(
    map(sql.SQL('.').join,
        zip(
            map(sql.Identifier, len(product_query_fields)*product_table_name_or_alias), 
            map(sql.Identifier, product_query_fields)
        )
    )
)

products_composed
#Composed([
#    Composed([Identifier('p'), SQL('.'), Identifier('id')]), SQL(', '),
#    Composed([Identifier('p'), SQL('.'), Identifier('name')]), SQL(', '),
#    Composed([Identifier('p'), SQL('.'), Identifier('type')]), SQL(', '),
#    Composed([Identifier('p'), SQL('.'), Identifier('price')]), SQL(', '),
#    Composed([Identifier('p'), SQL('.'), Identifier('warehouse')]), SQL(','),
#    Composed([Identifier('p'), SQL('.'), Identifier('location')])
#])

加入的相同 table:

order_table_name_or_alias = 'o'
orders_composed = sql.SQL(', ').join(
    map(sql.SQL('.').join,
        zip(
            map(sql.Identifier, len(order_query_fields)*order_table_name_or_alias),
            map(sql.Identifier, order_query_fields)
        )
    )
)

order_composed
#Composed([
#    Composed([Identifier('o'), SQL('.'), Identifier('date')]), SQL(', '),
#    Composed([Identifier('o'), SQL('.'), Identifier('transaction')])
])

你不能神奇地添加这两个对象:

total_composed = products_composed + orders_composed

total_composed
#Composed([
#    Composed([Identifier('p'), SQL('.'), Identifier('id')]), SQL(', '),
#    Composed([Identifier('p'), SQL('.'), Identifier('name')]), SQL(', '),
#    Composed([Identifier('p'), SQL('.'), Identifier('type')]), SQL(', '),
#    Composed([Identifier('p'), SQL('.'), Identifier('price')]), SQL(', '),
#    Composed([Identifier('p'), SQL('.'), Identifier('warehouse')])
#    Composed([Identifier('o'), SQL('.'), Identifier('date')]), SQL(', '),
#    Composed([Identifier('o'), SQL('.'), Identifier('transaction')])
#])

因为如您所见,它基本上会错过两个列表之间的 SQL(', ')。因此,这会将 SELECT 查询中的这两个字段折叠为 "p"."warehouse""o"."date",这显然无效并会引发错误:missing FROM-clause entry for table "warehouse""o"

因此,尝试将它们附加在一起也很诱人,但是...

products_composed.append(order_composed)
# AttributeError: 'Composed' object has no attribute 'append'

所以,回到基础并将生成的组合对象传递给您的查询

# Add the missing sql.SQL(', ') using a simple addition in between the 2 objects
total_composed = products_composed + sql.SQL(', ') + orders_composed
total_composed
#Composed([
#    Composed([Identifier('p'), SQL('.'), Identifier('id')]), SQL(', '),
#    Composed([Identifier('p'), SQL('.'), Identifier('name')]), SQL(', '),
#    Composed([Identifier('p'), SQL('.'), Identifier('type')]), SQL(', '),
#    Composed([Identifier('p'), SQL('.'), Identifier('price')]), SQL(', '),
#    Composed([Identifier('p'), SQL('.'), Identifier('warehouse')]), SQL(', '), # here it is!!!
#    Composed([Identifier('o'), SQL('.'), Identifier('date')]), SQL(', '),
#    Composed([Identifier('o'), SQL('.'), Identifier('transaction')])
#])


sql_query = sql.SQL("""
    SELECT {composed_fields} FROM product p
    INNER JOIN owner o ON p.id = o.product_id
    WHERE p.id = {jidx} AND (o.dateof_purchase IS NOT NULL
    OR o.state = 'checked_out' );"""
).format(
    composed_fields = total_composed # there should be the readable ***magic***
    jidx = sql.Literal(myid)
)

conn = psycopg2.connect(**DB_PARAMS)
sql_query.as_string(conn)
SELECT "p"."id", "p"."name", "p"."type", "p"."price",
       "p"."warehouse", "o"."date", "o"."transaction"
     FROM product p
    INNER JOIN owner o ON p.id = o.product_id
    WHERE p.id = 100
      AND (o.dateof_purchase IS NOT NULL
       OR o.state = 'checked_out' ); # This finally works!

这很好,因为它处理 table 名字中有大写字母或奇怪字符的问题! (但在 postgres 中使用小写字母肯定更好!)。

我还没有修复的缺点:如果您需要在查询中对特定列进行排序,那么处理起来会很麻烦,因为组合对象的行为不像列表(这是我所期望的)。 ..