在 python 中使用 psycopg2 进行参数化 SQL select 语句

make parameterized SQL select statement with psycopg2 in python

我从 psycopg2 API 中学到了如何将参数传递给 SQL 查询,因此我们可以轻松地使用参数化方式操作 SQL 语句。因此,带有 %(param)s 的 stringify 参数可以做到这一点。我想知道我们是否可以将一个通用的参数化 SQL 语句封装在 python 函数中,这样我们就可以通过提供任意参数值来调用 python 函数,这些值将被 [=60= 使用] 语句,最终它将作为 运行ning 尽可能多的 SQL 语句。但是,我不知道我们如何制作参数化的 SQL select 语句,因为我们想从本地数据库中检索的项目每次都可能不同,所以如果 select 语句可以参数化。我们怎样才能做到这一点? python 中的 psycopg2 有什么方法可以做到这一点吗?如何做到这一点?有什么想法吗?

db table:

这是用于可重现目的的示例数据库 table:

CREATE TABLE trans_tbl(
date_received DATE,
pk_est VARCHAR,
grd_name VARCHAR,
cl_val SMALLINT,
quant_received NUMERIC,
mg_fb_price NUMERIC,
freight NUMERIC,
standard_price NUMERIC,
grd_delv_cost NUMERIC,
order_type VARCHAR,
pk_name VARCHAR,
item_type VARCHAR,
waiting_days NUMERIC,
item_name VARCHAR,
mk_price_variance NUMERIC,
);

并且,这里是示例查询列表,我需要一个参数化 SQL 查询语句(selectwhere 子句应参数化):

示例查询 1

SELECT
    date_trunc('week', date_received::date) AS received_week,
    cl_val,
    item_type,
    ROUND(ROUND(SUM(quant_received * mg_fb_price)::numeric,4) / SUM(quant_received),4) AS price_1,
    ROUND(ROUND(SUM(quant_received * grd_delv_cost)::numeric,4) / SUM(quant_received),4) AS dv_price,
FROM trans_tbl
GROUP BY received_week,cl_val,item_type
ORDER BY received_week;

示例查询 2:

SELECT
    date_trunc('month', date_received) AS received_month,
    ROUND(ROUND(SUM(quant_received * standard_price)::numeric,4) / SUM(quant_received),4) AS mk_price,
    ROUND(ROUND(SUM(quant_received * mg_fb_price)::numeric,4) / SUM(quant_received),4) AS price,
    ROUND(ROUND(SUM(quant_received * mk_price_variance)::numeric,4) / SUM(quant_received),4) AS fob_market_price_variance,
    ROUND(ROUND(SUM(quant_received * grd_delv_cost)::numeric,4) / SUM(quant_received),4) AS dv_cost,
    ROUND(ROUND(SUM(quant_received * freight)::numeric,4) / SUM(quant_received),4) AS weight_avg,
FROM trans_tbl

示例查询 3:

SELECT
    date_trunc('week', date_received::date) AS received_week,
    grd_name,
    pk_name,
    pk_est,
    TO_CHAR(SUM(quant_received), '999G999G990D') AS received_amt
FROM trans_tbl

我想做什么 我想要一个通用的参数化 SQL 语句,这样我就可以通过任意传递参数值来 运行 SQL 语句,这样它就可以与运行ning 以上三个SQL 语句分开。有什么方法可以用 python 中的 psycopg2 实现吗?是否可以做到这一点?有什么想法吗?

更新:

也许我的尝试不太可行,所以我对可能的可行方法持开放态度,至少可以减轻痛苦。如果我想要实现的目标不太可行,我可以采取什么有效的方法呢?有什么想法吗?

首先,这是您之前的问题 . You should have just continued the discussion there. As I stated there it is possible to do what you using the sql 来自 psycopg2 的模块的副本。作为我的一个应用程序的示例:

class NotificationReport():
    """Builds a query for finding task notifications.

    Use form_choices passed in to modify the select query for task
    notifications using psycopg2.sql module. Filter on status which is some
    combination of notify_expired and notify_cancelled.
    """

    def __init__(self, form_choices):
        self.id = "notification_report"
        self.form_choices = form_choices

    def returnQuery(self):
        flds, defaults = data.fetchFields(data.TaskNotification)
        base_sql = sql.SQL("""SELECT
            task_title, {}
        FROM
            tasks
        JOIN
            task_priority AS tp
        ON
            tasks. task_priority_fk= tp.priority_id
        JOIN
            task_type AS tt
        ON
            tasks.task_type_fk = tt.task_type_id
        LEFT JOIN
            task_notification AS tn
        ON
            tasks.task_id = tn.task_id_fk

        """).format(sql.SQL(",").join(map(sql.Identifier, flds)))
        f_choices = self.form_choices
        and_sql = None
        ops_list = []
        if f_choices:
            for choice in f_choices:
                if choice.get("status"):
                    status = choice["status"]
                    status_dict = {"open": ("notify_expired = 'f' "),
                                   "expired": ("notify_expired = 't' "),
                                   }
                    if status == "all":
                        pass
                    else:
                        ops = sql.SQL(status_dict[status])
                        ops_list.append(ops)
        if ops_list:
            and_sql = sql.Composed([base_sql, sql.SQL(" AND ")])
            additional_and = sql.SQL(" AND ").join(ops_list)
            ops_sql = sql.Composed([and_sql, additional_and])
        orderby_sql = sql.SQL("""ORDER BY
            task_title""")
        if and_sql:
            combined_sql = sql.Composed([ops_sql, orderby_sql])
        else:
            combined_sql = sql.Composed([base_sql, orderby_sql])

        return combined_sql

输出。首先没有向报告提供参数:

SELECT
    task_title, "task_id_fk","before_value","before_interval","every_value","every_interval","notify_note","notify_id","notify_expired"
FROM
    tasks
JOIN
    task_priority AS tp
ON
    tasks. task_priority_fk= tp.priority_id
JOIN
    task_type AS tt
ON
    tasks.task_type_fk = tt.task_type_id
LEFT JOIN
    task_notification AS tn
ON
    tasks.task_id = tn.task_id_fk

ORDER BY
    task_title

然后状态:

SELECT
    task_title, "task_id_fk","before_value","before_interval","every_value","every_interval","notify_note","notify_id","notify_expired"
FROM
    tasks
JOIN
    task_priority AS tp
ON
    tasks. task_priority_fk= tp.priority_id
JOIN
    task_type AS tt
ON
    tasks.task_type_fk = tt.task_type_id
LEFT JOIN
    task_notification AS tn
ON
    tasks.task_id = tn.task_id_fk

    AND notify_expired = 'f' ORDER BY
    task_title