如何使用 psycopg2 以点表示法方式使用别名对两个连接表的许多字段正确构建 SELECT 查询
How to properly build a SELECT query on many fields from two joined tables using their aliases in a dot notation manner with psycopg2
有没有办法使用 psycopg2:
将这些标识符转换为 PostgreSQL 查询
total_query_fields = (
'p.id',
'p.name',
'p.type',
'p.price',
'o.date', # please, notice the 'o' alias here
'o.transaction', # please, notice the 'o' alias here
'p.warehouse',
'p.location',
)
# they may get split into their own tables if necessary:
product_query_fields = ('id', 'name', 'type', 'price', 'warehouse', 'location',)
order_query_fields = ('date', 'transaction',)
变成这样的东西:
import psycopg2
from psycopg2 import sql
myid = 100
sql_query = sql.SQL("""
SELECT {fields} FROM product p
INNER JOIN owner o ON p.id = o.product_id
WHERE p.id = {jidx} AND (o.dateof_purchase IS NOT NULL
OR o.state = 'checked_out' );"""
).format(
fields = # there should be the readable ***magic***
jidx = sql.Literal(myid)
)
?
即使在理解 my problem by reading 之后,我也无法找到使用点分符号获取我的字段列表的好方法。我猜肯定可能使用了两个 map()
,sql.SQL('.').join(...)
和 sql.SQL(', ').join(...)
。
或者更优雅的东西,例如使用 SELECT {}.{}...
?
因为目前我遇到了麻烦:
fields = sql.SQL(', ').join(map(sql.Identifier, total_query_fields)),
因为它会用双引号转义所有序列"table.fields",这在SQL:
中肯定是无效的
# /!\ INVALID SQL QUERY /!\:
print(sql_query.as_string(conn))
# will print:
# SELECT "p.id", "p.name", "p.type", "p.price", "o.date", "o.transaction", "p.warehouse", "p.location" FROM product p
# INNER JOIN owner o ON p.id = o.product_id
# WHERE p.id = 100 AND (o.dateof_purchase IS NOT NULL
# OR o.state = 'checked_out' );
确实,如果我 copy/paste 最后一个查询直接在我的 favorite PostgreSQL query tool:
ERROR: column "p.id" does not exist
LINE 1: SELECT "p.id", "p.name", "p.type", "p.price", "o.date",...
^
HINT: Perhaps you meant to reference the column "p.id" or the column "o.id".
SQL state: 42703
Character: 8
psycopg2 显然引发了同样的错误:
UndefinedColumn: column "p.id" does not exist
LINE 1: SELECT "p.id", "p.name", "p.type", "p.price", "o.date...
^
HINT: Perhaps you meant to reference the column "p.id" or the column "o.id".
文档也清楚地说明了这一点:
版本控制:
psycopg2.__version__
'2.9.1 (dt dec pq3 ext lo64)'
如果我理解正确的话,我认为这里的技巧是 sql.Identifier
接受 一个或多个 字符串 (*strings
),所以你可以拆分点上的列并将两部分传递给 sql.Identifier
,这将构成所需的 "alias"."column"
结果。
>>> i = sql.Identifier('a', 'col')
>>> i.strings
('a', 'col')
>>> conn = psycopg2.connect(database='test')
>>> cur = conn.cursor()
>>> i.as_string(cur)
'"a"."col"'
引用所有字段可以这样做:
fields = sql.SQL(', ').join(sql.Identifier(*f.split('.')) for f in total_query_fields)
sql_query = sql.SQL(
"""
SELECT {fields} FROM product p
INNER JOIN owner o ON p.id = o.product_id
WHERE p.id = {jidx} AND (o.dateof_purchase IS NOT NULL
OR o.state = 'checked_out' );"""
).format(fields=fields, jidx=sql.Literal(myid))
结果查询(来自 cursor.mogrify
)是
b'\n SELECT "p"."id", "p"."name", "p"."type", "p"."price", "o"."date", "o"."transaction", "p"."warehouse", "p"."location" FROM product p\n INNER JOIN owner o ON p.id = o.product_id\n WHERE p.id = 100 AND (o.dateof_purchase IS NOT NULL\n OR o.state = \'checked_out\' );'
如果您更喜欢使用 map
而不是生成器表达式,您可以使用 itertools.starmap
from itertools import starmap
fields = sql.SQL(', ').join(
starmap(sql.Identifier, map(lambda f: f.split('.'), total_query_fields)))
我最终找到了一种方法(复杂但有效):
list(
map(sql.SQL('.').join,
zip(
map(sql.Identifier, len(product_query_fields)*'p'),
map(sql.Identifier, product_query_fields))
)
)
这将打印:
[Composed([Identifier('p'), SQL('.'), Identifier('id')]),
Composed([Identifier('p'), SQL('.'), Identifier('name')]),
Composed([Identifier('p'), SQL('.'), Identifier('type')]),
Composed([Identifier('p'), SQL('.'), Identifier('price')]),
Composed([Identifier('p'), SQL('.'), Identifier('warehouse')]),
Composed([Identifier('p'), SQL('.'), Identifier('location')])]
因此,下面将使用点符号构造完整的标识符对象,注意 zip()
内置的用法:
product_table_name_or_alias = 'p'
products_composed = sql.SQL(', ').join(
map(sql.SQL('.').join,
zip(
map(sql.Identifier, len(product_query_fields)*product_table_name_or_alias),
map(sql.Identifier, product_query_fields)
)
)
)
products_composed
#Composed([
# Composed([Identifier('p'), SQL('.'), Identifier('id')]), SQL(', '),
# Composed([Identifier('p'), SQL('.'), Identifier('name')]), SQL(', '),
# Composed([Identifier('p'), SQL('.'), Identifier('type')]), SQL(', '),
# Composed([Identifier('p'), SQL('.'), Identifier('price')]), SQL(', '),
# Composed([Identifier('p'), SQL('.'), Identifier('warehouse')]), SQL(','),
# Composed([Identifier('p'), SQL('.'), Identifier('location')])
#])
加入的相同 table:
order_table_name_or_alias = 'o'
orders_composed = sql.SQL(', ').join(
map(sql.SQL('.').join,
zip(
map(sql.Identifier, len(order_query_fields)*order_table_name_or_alias),
map(sql.Identifier, order_query_fields)
)
)
)
order_composed
#Composed([
# Composed([Identifier('o'), SQL('.'), Identifier('date')]), SQL(', '),
# Composed([Identifier('o'), SQL('.'), Identifier('transaction')])
])
你不能神奇地添加这两个对象:
total_composed = products_composed + orders_composed
total_composed
#Composed([
# Composed([Identifier('p'), SQL('.'), Identifier('id')]), SQL(', '),
# Composed([Identifier('p'), SQL('.'), Identifier('name')]), SQL(', '),
# Composed([Identifier('p'), SQL('.'), Identifier('type')]), SQL(', '),
# Composed([Identifier('p'), SQL('.'), Identifier('price')]), SQL(', '),
# Composed([Identifier('p'), SQL('.'), Identifier('warehouse')])
# Composed([Identifier('o'), SQL('.'), Identifier('date')]), SQL(', '),
# Composed([Identifier('o'), SQL('.'), Identifier('transaction')])
#])
因为如您所见,它基本上会错过两个列表之间的 SQL(', ')
。因此,这会将 SELECT 查询中的这两个字段折叠为 "p"."warehouse""o"."date"
,这显然无效并会引发错误:missing FROM-clause entry for table "warehouse""o"
因此,尝试将它们附加在一起也很诱人,但是...
products_composed.append(order_composed)
# AttributeError: 'Composed' object has no attribute 'append'
所以,回到基础并将生成的组合对象传递给您的查询
# Add the missing sql.SQL(', ') using a simple addition in between the 2 objects
total_composed = products_composed + sql.SQL(', ') + orders_composed
total_composed
#Composed([
# Composed([Identifier('p'), SQL('.'), Identifier('id')]), SQL(', '),
# Composed([Identifier('p'), SQL('.'), Identifier('name')]), SQL(', '),
# Composed([Identifier('p'), SQL('.'), Identifier('type')]), SQL(', '),
# Composed([Identifier('p'), SQL('.'), Identifier('price')]), SQL(', '),
# Composed([Identifier('p'), SQL('.'), Identifier('warehouse')]), SQL(', '), # here it is!!!
# Composed([Identifier('o'), SQL('.'), Identifier('date')]), SQL(', '),
# Composed([Identifier('o'), SQL('.'), Identifier('transaction')])
#])
sql_query = sql.SQL("""
SELECT {composed_fields} FROM product p
INNER JOIN owner o ON p.id = o.product_id
WHERE p.id = {jidx} AND (o.dateof_purchase IS NOT NULL
OR o.state = 'checked_out' );"""
).format(
composed_fields = total_composed # there should be the readable ***magic***
jidx = sql.Literal(myid)
)
conn = psycopg2.connect(**DB_PARAMS)
sql_query.as_string(conn)
SELECT "p"."id", "p"."name", "p"."type", "p"."price",
"p"."warehouse", "o"."date", "o"."transaction"
FROM product p
INNER JOIN owner o ON p.id = o.product_id
WHERE p.id = 100
AND (o.dateof_purchase IS NOT NULL
OR o.state = 'checked_out' ); # This finally works!
这很好,因为它处理 table 名字中有大写字母或奇怪字符的问题! (但在 postgres 中使用小写字母肯定更好!)。
我还没有修复的缺点:如果您需要在查询中对特定列进行排序,那么处理起来会很麻烦,因为组合对象的行为不像列表(这是我所期望的)。 ..
有没有办法使用 psycopg2:
将这些标识符转换为 PostgreSQL 查询total_query_fields = (
'p.id',
'p.name',
'p.type',
'p.price',
'o.date', # please, notice the 'o' alias here
'o.transaction', # please, notice the 'o' alias here
'p.warehouse',
'p.location',
)
# they may get split into their own tables if necessary:
product_query_fields = ('id', 'name', 'type', 'price', 'warehouse', 'location',)
order_query_fields = ('date', 'transaction',)
变成这样的东西:
import psycopg2
from psycopg2 import sql
myid = 100
sql_query = sql.SQL("""
SELECT {fields} FROM product p
INNER JOIN owner o ON p.id = o.product_id
WHERE p.id = {jidx} AND (o.dateof_purchase IS NOT NULL
OR o.state = 'checked_out' );"""
).format(
fields = # there should be the readable ***magic***
jidx = sql.Literal(myid)
)
?
即使在理解 my problem by reading map()
,sql.SQL('.').join(...)
和 sql.SQL(', ').join(...)
。
或者更优雅的东西,例如使用 SELECT {}.{}...
?
因为目前我遇到了麻烦:
fields = sql.SQL(', ').join(map(sql.Identifier, total_query_fields)),
因为它会用双引号转义所有序列"table.fields",这在SQL:
中肯定是无效的# /!\ INVALID SQL QUERY /!\:
print(sql_query.as_string(conn))
# will print:
# SELECT "p.id", "p.name", "p.type", "p.price", "o.date", "o.transaction", "p.warehouse", "p.location" FROM product p
# INNER JOIN owner o ON p.id = o.product_id
# WHERE p.id = 100 AND (o.dateof_purchase IS NOT NULL
# OR o.state = 'checked_out' );
确实,如果我 copy/paste 最后一个查询直接在我的 favorite PostgreSQL query tool:
ERROR: column "p.id" does not exist
LINE 1: SELECT "p.id", "p.name", "p.type", "p.price", "o.date",...
^
HINT: Perhaps you meant to reference the column "p.id" or the column "o.id".
SQL state: 42703
Character: 8
psycopg2 显然引发了同样的错误:
UndefinedColumn: column "p.id" does not exist
LINE 1: SELECT "p.id", "p.name", "p.type", "p.price", "o.date...
^
HINT: Perhaps you meant to reference the column "p.id" or the column "o.id".
文档也清楚地说明了这一点:
版本控制:
psycopg2.__version__
'2.9.1 (dt dec pq3 ext lo64)'
如果我理解正确的话,我认为这里的技巧是 sql.Identifier
接受 一个或多个 字符串 (*strings
),所以你可以拆分点上的列并将两部分传递给 sql.Identifier
,这将构成所需的 "alias"."column"
结果。
>>> i = sql.Identifier('a', 'col')
>>> i.strings
('a', 'col')
>>> conn = psycopg2.connect(database='test')
>>> cur = conn.cursor()
>>> i.as_string(cur)
'"a"."col"'
引用所有字段可以这样做:
fields = sql.SQL(', ').join(sql.Identifier(*f.split('.')) for f in total_query_fields)
sql_query = sql.SQL(
"""
SELECT {fields} FROM product p
INNER JOIN owner o ON p.id = o.product_id
WHERE p.id = {jidx} AND (o.dateof_purchase IS NOT NULL
OR o.state = 'checked_out' );"""
).format(fields=fields, jidx=sql.Literal(myid))
结果查询(来自 cursor.mogrify
)是
b'\n SELECT "p"."id", "p"."name", "p"."type", "p"."price", "o"."date", "o"."transaction", "p"."warehouse", "p"."location" FROM product p\n INNER JOIN owner o ON p.id = o.product_id\n WHERE p.id = 100 AND (o.dateof_purchase IS NOT NULL\n OR o.state = \'checked_out\' );'
如果您更喜欢使用 map
而不是生成器表达式,您可以使用 itertools.starmap
from itertools import starmap
fields = sql.SQL(', ').join(
starmap(sql.Identifier, map(lambda f: f.split('.'), total_query_fields)))
我最终找到了一种方法(复杂但有效):
list(
map(sql.SQL('.').join,
zip(
map(sql.Identifier, len(product_query_fields)*'p'),
map(sql.Identifier, product_query_fields))
)
)
这将打印:
[Composed([Identifier('p'), SQL('.'), Identifier('id')]),
Composed([Identifier('p'), SQL('.'), Identifier('name')]),
Composed([Identifier('p'), SQL('.'), Identifier('type')]),
Composed([Identifier('p'), SQL('.'), Identifier('price')]),
Composed([Identifier('p'), SQL('.'), Identifier('warehouse')]),
Composed([Identifier('p'), SQL('.'), Identifier('location')])]
因此,下面将使用点符号构造完整的标识符对象,注意 zip()
内置的用法:
product_table_name_or_alias = 'p'
products_composed = sql.SQL(', ').join(
map(sql.SQL('.').join,
zip(
map(sql.Identifier, len(product_query_fields)*product_table_name_or_alias),
map(sql.Identifier, product_query_fields)
)
)
)
products_composed
#Composed([
# Composed([Identifier('p'), SQL('.'), Identifier('id')]), SQL(', '),
# Composed([Identifier('p'), SQL('.'), Identifier('name')]), SQL(', '),
# Composed([Identifier('p'), SQL('.'), Identifier('type')]), SQL(', '),
# Composed([Identifier('p'), SQL('.'), Identifier('price')]), SQL(', '),
# Composed([Identifier('p'), SQL('.'), Identifier('warehouse')]), SQL(','),
# Composed([Identifier('p'), SQL('.'), Identifier('location')])
#])
加入的相同 table:
order_table_name_or_alias = 'o'
orders_composed = sql.SQL(', ').join(
map(sql.SQL('.').join,
zip(
map(sql.Identifier, len(order_query_fields)*order_table_name_or_alias),
map(sql.Identifier, order_query_fields)
)
)
)
order_composed
#Composed([
# Composed([Identifier('o'), SQL('.'), Identifier('date')]), SQL(', '),
# Composed([Identifier('o'), SQL('.'), Identifier('transaction')])
])
你不能神奇地添加这两个对象:
total_composed = products_composed + orders_composed
total_composed
#Composed([
# Composed([Identifier('p'), SQL('.'), Identifier('id')]), SQL(', '),
# Composed([Identifier('p'), SQL('.'), Identifier('name')]), SQL(', '),
# Composed([Identifier('p'), SQL('.'), Identifier('type')]), SQL(', '),
# Composed([Identifier('p'), SQL('.'), Identifier('price')]), SQL(', '),
# Composed([Identifier('p'), SQL('.'), Identifier('warehouse')])
# Composed([Identifier('o'), SQL('.'), Identifier('date')]), SQL(', '),
# Composed([Identifier('o'), SQL('.'), Identifier('transaction')])
#])
因为如您所见,它基本上会错过两个列表之间的 SQL(', ')
。因此,这会将 SELECT 查询中的这两个字段折叠为 "p"."warehouse""o"."date"
,这显然无效并会引发错误:missing FROM-clause entry for table "warehouse""o"
因此,尝试将它们附加在一起也很诱人,但是...
products_composed.append(order_composed)
# AttributeError: 'Composed' object has no attribute 'append'
所以,回到基础并将生成的组合对象传递给您的查询
# Add the missing sql.SQL(', ') using a simple addition in between the 2 objects
total_composed = products_composed + sql.SQL(', ') + orders_composed
total_composed
#Composed([
# Composed([Identifier('p'), SQL('.'), Identifier('id')]), SQL(', '),
# Composed([Identifier('p'), SQL('.'), Identifier('name')]), SQL(', '),
# Composed([Identifier('p'), SQL('.'), Identifier('type')]), SQL(', '),
# Composed([Identifier('p'), SQL('.'), Identifier('price')]), SQL(', '),
# Composed([Identifier('p'), SQL('.'), Identifier('warehouse')]), SQL(', '), # here it is!!!
# Composed([Identifier('o'), SQL('.'), Identifier('date')]), SQL(', '),
# Composed([Identifier('o'), SQL('.'), Identifier('transaction')])
#])
sql_query = sql.SQL("""
SELECT {composed_fields} FROM product p
INNER JOIN owner o ON p.id = o.product_id
WHERE p.id = {jidx} AND (o.dateof_purchase IS NOT NULL
OR o.state = 'checked_out' );"""
).format(
composed_fields = total_composed # there should be the readable ***magic***
jidx = sql.Literal(myid)
)
conn = psycopg2.connect(**DB_PARAMS)
sql_query.as_string(conn)
SELECT "p"."id", "p"."name", "p"."type", "p"."price",
"p"."warehouse", "o"."date", "o"."transaction"
FROM product p
INNER JOIN owner o ON p.id = o.product_id
WHERE p.id = 100
AND (o.dateof_purchase IS NOT NULL
OR o.state = 'checked_out' ); # This finally works!
这很好,因为它处理 table 名字中有大写字母或奇怪字符的问题! (但在 postgres 中使用小写字母肯定更好!)。
我还没有修复的缺点:如果您需要在查询中对特定列进行排序,那么处理起来会很麻烦,因为组合对象的行为不像列表(这是我所期望的)。 ..