在 python 中使用 psycopg2 进行参数化 SQL select 语句
make parameterized SQL select statement with psycopg2 in python
我从 psycopg2
API 中学到了如何将参数传递给 SQL 查询,因此我们可以轻松地使用参数化方式操作 SQL 语句。因此,带有 %(param)s
的 stringify 参数可以做到这一点。我想知道我们是否可以将一个通用的参数化 SQL 语句封装在 python 函数中,这样我们就可以通过提供任意参数值来调用 python 函数,这些值将被 [=60= 使用] 语句,最终它将作为 运行ning 尽可能多的 SQL 语句。但是,我不知道我们如何制作参数化的 SQL select 语句,因为我们想从本地数据库中检索的项目每次都可能不同,所以如果 select 语句可以参数化。我们怎样才能做到这一点? python 中的 psycopg2
有什么方法可以做到这一点吗?如何做到这一点?有什么想法吗?
db table:
这是用于可重现目的的示例数据库 table:
CREATE TABLE trans_tbl(
date_received DATE,
pk_est VARCHAR,
grd_name VARCHAR,
cl_val SMALLINT,
quant_received NUMERIC,
mg_fb_price NUMERIC,
freight NUMERIC,
standard_price NUMERIC,
grd_delv_cost NUMERIC,
order_type VARCHAR,
pk_name VARCHAR,
item_type VARCHAR,
waiting_days NUMERIC,
item_name VARCHAR,
mk_price_variance NUMERIC,
);
并且,这里是示例查询列表,我需要一个参数化 SQL 查询语句(select
、where
子句应参数化):
示例查询 1
SELECT
date_trunc('week', date_received::date) AS received_week,
cl_val,
item_type,
ROUND(ROUND(SUM(quant_received * mg_fb_price)::numeric,4) / SUM(quant_received),4) AS price_1,
ROUND(ROUND(SUM(quant_received * grd_delv_cost)::numeric,4) / SUM(quant_received),4) AS dv_price,
FROM trans_tbl
GROUP BY received_week,cl_val,item_type
ORDER BY received_week;
示例查询 2:
SELECT
date_trunc('month', date_received) AS received_month,
ROUND(ROUND(SUM(quant_received * standard_price)::numeric,4) / SUM(quant_received),4) AS mk_price,
ROUND(ROUND(SUM(quant_received * mg_fb_price)::numeric,4) / SUM(quant_received),4) AS price,
ROUND(ROUND(SUM(quant_received * mk_price_variance)::numeric,4) / SUM(quant_received),4) AS fob_market_price_variance,
ROUND(ROUND(SUM(quant_received * grd_delv_cost)::numeric,4) / SUM(quant_received),4) AS dv_cost,
ROUND(ROUND(SUM(quant_received * freight)::numeric,4) / SUM(quant_received),4) AS weight_avg,
FROM trans_tbl
示例查询 3:
SELECT
date_trunc('week', date_received::date) AS received_week,
grd_name,
pk_name,
pk_est,
TO_CHAR(SUM(quant_received), '999G999G990D') AS received_amt
FROM trans_tbl
我想做什么 我想要一个通用的参数化 SQL 语句,这样我就可以通过任意传递参数值来 运行 SQL 语句,这样它就可以与运行ning 以上三个SQL 语句分开。有什么方法可以用 python 中的 psycopg2
实现吗?是否可以做到这一点?有什么想法吗?
更新:
也许我的尝试不太可行,所以我对可能的可行方法持开放态度,至少可以减轻痛苦。如果我想要实现的目标不太可行,我可以采取什么有效的方法呢?有什么想法吗?
首先,这是您之前的问题 . You should have just continued the discussion there. As I stated there it is possible to do what you using the sql 来自 psycopg2 的模块的副本。作为我的一个应用程序的示例:
class NotificationReport():
"""Builds a query for finding task notifications.
Use form_choices passed in to modify the select query for task
notifications using psycopg2.sql module. Filter on status which is some
combination of notify_expired and notify_cancelled.
"""
def __init__(self, form_choices):
self.id = "notification_report"
self.form_choices = form_choices
def returnQuery(self):
flds, defaults = data.fetchFields(data.TaskNotification)
base_sql = sql.SQL("""SELECT
task_title, {}
FROM
tasks
JOIN
task_priority AS tp
ON
tasks. task_priority_fk= tp.priority_id
JOIN
task_type AS tt
ON
tasks.task_type_fk = tt.task_type_id
LEFT JOIN
task_notification AS tn
ON
tasks.task_id = tn.task_id_fk
""").format(sql.SQL(",").join(map(sql.Identifier, flds)))
f_choices = self.form_choices
and_sql = None
ops_list = []
if f_choices:
for choice in f_choices:
if choice.get("status"):
status = choice["status"]
status_dict = {"open": ("notify_expired = 'f' "),
"expired": ("notify_expired = 't' "),
}
if status == "all":
pass
else:
ops = sql.SQL(status_dict[status])
ops_list.append(ops)
if ops_list:
and_sql = sql.Composed([base_sql, sql.SQL(" AND ")])
additional_and = sql.SQL(" AND ").join(ops_list)
ops_sql = sql.Composed([and_sql, additional_and])
orderby_sql = sql.SQL("""ORDER BY
task_title""")
if and_sql:
combined_sql = sql.Composed([ops_sql, orderby_sql])
else:
combined_sql = sql.Composed([base_sql, orderby_sql])
return combined_sql
输出。首先没有向报告提供参数:
SELECT
task_title, "task_id_fk","before_value","before_interval","every_value","every_interval","notify_note","notify_id","notify_expired"
FROM
tasks
JOIN
task_priority AS tp
ON
tasks. task_priority_fk= tp.priority_id
JOIN
task_type AS tt
ON
tasks.task_type_fk = tt.task_type_id
LEFT JOIN
task_notification AS tn
ON
tasks.task_id = tn.task_id_fk
ORDER BY
task_title
然后状态:
SELECT
task_title, "task_id_fk","before_value","before_interval","every_value","every_interval","notify_note","notify_id","notify_expired"
FROM
tasks
JOIN
task_priority AS tp
ON
tasks. task_priority_fk= tp.priority_id
JOIN
task_type AS tt
ON
tasks.task_type_fk = tt.task_type_id
LEFT JOIN
task_notification AS tn
ON
tasks.task_id = tn.task_id_fk
AND notify_expired = 'f' ORDER BY
task_title
我从 psycopg2
API 中学到了如何将参数传递给 SQL 查询,因此我们可以轻松地使用参数化方式操作 SQL 语句。因此,带有 %(param)s
的 stringify 参数可以做到这一点。我想知道我们是否可以将一个通用的参数化 SQL 语句封装在 python 函数中,这样我们就可以通过提供任意参数值来调用 python 函数,这些值将被 [=60= 使用] 语句,最终它将作为 运行ning 尽可能多的 SQL 语句。但是,我不知道我们如何制作参数化的 SQL select 语句,因为我们想从本地数据库中检索的项目每次都可能不同,所以如果 select 语句可以参数化。我们怎样才能做到这一点? python 中的 psycopg2
有什么方法可以做到这一点吗?如何做到这一点?有什么想法吗?
db table:
这是用于可重现目的的示例数据库 table:
CREATE TABLE trans_tbl(
date_received DATE,
pk_est VARCHAR,
grd_name VARCHAR,
cl_val SMALLINT,
quant_received NUMERIC,
mg_fb_price NUMERIC,
freight NUMERIC,
standard_price NUMERIC,
grd_delv_cost NUMERIC,
order_type VARCHAR,
pk_name VARCHAR,
item_type VARCHAR,
waiting_days NUMERIC,
item_name VARCHAR,
mk_price_variance NUMERIC,
);
并且,这里是示例查询列表,我需要一个参数化 SQL 查询语句(select
、where
子句应参数化):
示例查询 1
SELECT
date_trunc('week', date_received::date) AS received_week,
cl_val,
item_type,
ROUND(ROUND(SUM(quant_received * mg_fb_price)::numeric,4) / SUM(quant_received),4) AS price_1,
ROUND(ROUND(SUM(quant_received * grd_delv_cost)::numeric,4) / SUM(quant_received),4) AS dv_price,
FROM trans_tbl
GROUP BY received_week,cl_val,item_type
ORDER BY received_week;
示例查询 2:
SELECT
date_trunc('month', date_received) AS received_month,
ROUND(ROUND(SUM(quant_received * standard_price)::numeric,4) / SUM(quant_received),4) AS mk_price,
ROUND(ROUND(SUM(quant_received * mg_fb_price)::numeric,4) / SUM(quant_received),4) AS price,
ROUND(ROUND(SUM(quant_received * mk_price_variance)::numeric,4) / SUM(quant_received),4) AS fob_market_price_variance,
ROUND(ROUND(SUM(quant_received * grd_delv_cost)::numeric,4) / SUM(quant_received),4) AS dv_cost,
ROUND(ROUND(SUM(quant_received * freight)::numeric,4) / SUM(quant_received),4) AS weight_avg,
FROM trans_tbl
示例查询 3:
SELECT
date_trunc('week', date_received::date) AS received_week,
grd_name,
pk_name,
pk_est,
TO_CHAR(SUM(quant_received), '999G999G990D') AS received_amt
FROM trans_tbl
我想做什么 我想要一个通用的参数化 SQL 语句,这样我就可以通过任意传递参数值来 运行 SQL 语句,这样它就可以与运行ning 以上三个SQL 语句分开。有什么方法可以用 python 中的 psycopg2
实现吗?是否可以做到这一点?有什么想法吗?
更新:
也许我的尝试不太可行,所以我对可能的可行方法持开放态度,至少可以减轻痛苦。如果我想要实现的目标不太可行,我可以采取什么有效的方法呢?有什么想法吗?
首先,这是您之前的问题
class NotificationReport():
"""Builds a query for finding task notifications.
Use form_choices passed in to modify the select query for task
notifications using psycopg2.sql module. Filter on status which is some
combination of notify_expired and notify_cancelled.
"""
def __init__(self, form_choices):
self.id = "notification_report"
self.form_choices = form_choices
def returnQuery(self):
flds, defaults = data.fetchFields(data.TaskNotification)
base_sql = sql.SQL("""SELECT
task_title, {}
FROM
tasks
JOIN
task_priority AS tp
ON
tasks. task_priority_fk= tp.priority_id
JOIN
task_type AS tt
ON
tasks.task_type_fk = tt.task_type_id
LEFT JOIN
task_notification AS tn
ON
tasks.task_id = tn.task_id_fk
""").format(sql.SQL(",").join(map(sql.Identifier, flds)))
f_choices = self.form_choices
and_sql = None
ops_list = []
if f_choices:
for choice in f_choices:
if choice.get("status"):
status = choice["status"]
status_dict = {"open": ("notify_expired = 'f' "),
"expired": ("notify_expired = 't' "),
}
if status == "all":
pass
else:
ops = sql.SQL(status_dict[status])
ops_list.append(ops)
if ops_list:
and_sql = sql.Composed([base_sql, sql.SQL(" AND ")])
additional_and = sql.SQL(" AND ").join(ops_list)
ops_sql = sql.Composed([and_sql, additional_and])
orderby_sql = sql.SQL("""ORDER BY
task_title""")
if and_sql:
combined_sql = sql.Composed([ops_sql, orderby_sql])
else:
combined_sql = sql.Composed([base_sql, orderby_sql])
return combined_sql
输出。首先没有向报告提供参数:
SELECT
task_title, "task_id_fk","before_value","before_interval","every_value","every_interval","notify_note","notify_id","notify_expired"
FROM
tasks
JOIN
task_priority AS tp
ON
tasks. task_priority_fk= tp.priority_id
JOIN
task_type AS tt
ON
tasks.task_type_fk = tt.task_type_id
LEFT JOIN
task_notification AS tn
ON
tasks.task_id = tn.task_id_fk
ORDER BY
task_title
然后状态:
SELECT
task_title, "task_id_fk","before_value","before_interval","every_value","every_interval","notify_note","notify_id","notify_expired"
FROM
tasks
JOIN
task_priority AS tp
ON
tasks. task_priority_fk= tp.priority_id
JOIN
task_type AS tt
ON
tasks.task_type_fk = tt.task_type_id
LEFT JOIN
task_notification AS tn
ON
tasks.task_id = tn.task_id_fk
AND notify_expired = 'f' ORDER BY
task_title