Assign/Substitute PostgresSQL 的 SQL 语句中的值列表使用 Python

Assign/Substitute a list of values in a SQL statement for PostgreSQL using Python

我有一个巨大的 table,下面有很多分区 table,但我只想 query/update 几个(18)个分区。所以我必须准备 运行 18 python 脚本(因为我必须根据我们的 DBA 说明在插入每个分区 table 后创建并关闭连接,因为每个 period 包含数百万条记录)到 运行 我的插入语句。 所以我尝试了两种方法来解决这个问题: 一种是 运行 一个 for 循环来替换我的 SQL 语句中的句点列表,该语句涉及编写两个 python 脚本:

import os
import logging
#import psycopg2
import socket
logging.basicConfig(level=logging.INFO)
cpc_name = socket.gethostname()

list_of_periods = [
    {'period': '1940'},
    {'period': '1941_1945'},
    {'period': '1946_1950'},
    {'period': '1951_1955'},
    {'period': '1956_1960'},
    {'period': '1961_1965'},
    {'period': '1966_1970'},
    {'period': '1971_1975'},
    {'period': '1976_1980'},
    {'period': '1981_1985'},
    {'period': '1986_1990'},
    {'period': '1991_1995'},
    {'period': '1996_2000'},
    {'period': '2001_2005'},
    {'period': '2006_2010'},
    {'period': '2011_2015'},
    {'period': '2016_2020'},
    {'period': '2021_2025'}
]

if __name__ == "__main__":
    logging.info("Starting test process")
    logging.info("  cpc = {}".format(cpc_name) + '\n')
    for period in list_of_periods:
        os.system('python sample_segment.py')


########
sample_segment.py

import os
import logging
import psycopg2
import socket
logging.basicConfig(level=logging.INFO)
cpc_name = socket.gethostname()

if __name__ == "__main__":
    logging.info("Starting test process")
    logging.info("  cpc = {}".format(cpc_name) + '\n')
    connection = psycopg2.connect(user        = os.environ.get("DATABASE_USER", "SVTDATAVANT"),
                                  password    = os.environ.get("DATABASE_PASS", "pass"),
                                  host        = os.environ.get("DATABASE_HOST", "psql.silver.com"),
                                  port        = 5432,
                                  dbname      = os.environ.get("DATABASE_NAME", "psql_db"),
                                  options     = "-c search_path=DATAVANT_O")                    
    with connection.cursor() as cursor:
        logging.info(str(connection.get_dsn_parameters()) + '\n')
        cursor.execute("SELECT version();")
        connection.commit()
        conn = cursor.fetchone()
        logging.info("You are connected to - " + str(conn))
        tempb = '''
        INSERT INTO DATAVANT_STG_O.mortality_index_{period}           
        SELECT * FROM DATAVANT_STG_O.tmp_mortality_{period}; '''
        logging.info("Performing Insert Operation")
        cursor.execute(tempb)
        connection.commit()
        count = cursor.rowcount
        logging.info(str(count) + " - count for the period:  {period}")
        logging.info(" - count for the period:  {period}")
        connection.close()
        print("PostgreSQL connection is closed")

显然它没有用,我怀疑我的 python 编程技能当然或者我正在寻找类似 os.sytem.sql_with_parameters() 的东西 但无论如何我的意图是转下面的语句

INSERT INTO DATAVANT_STG_O.mortality_index_{period}           
SELECT * FROM DATAVANT_STG_O.tmp_mortality_{period};

INTO

INSERT INTO DATAVANT_STG_O.mortality_index_1940           
SELECT * FROM DATAVANT_STG_O.tmp_mortality_1940;

INSERT INTO DATAVANT_STG_O.mortality_index_1941_1945           
SELECT * FROM DATAVANT_STG_O.tmp_mortality_1941_1945;

etc...for 18 periods

我尝试的第二种方法是在我的 python 脚本中读取 YAML 文件。

import os
import logging
import psycopg2
import socket
import yaml
logging.basicConfig(level=logging.INFO)
cpc_name = socket.gethostname()

with open(r'/home/SILVER/user/test/periods.yaml') as file:
    YEARS = yaml.load(file, Loader=yaml.FullLoader)
    print("load yaml file" + '\n')
    print(YEARS)


if __name__ == "__main__":
    logging.info("Starting test process")
    logging.info("  cpc = {}".format(cpc_name) + '\n')
    connection = psycopg2.connect(user        = os.environ.get("DATABASE_USER", "SVTDATAVANT"),
                                  password    = os.environ.get("DATABASE_PASS", "pass"),
                                  host        = os.environ.get("DATABASE_HOST", "psql.host.com"),
                                  port        = 5432,
                                  dbname      = os.environ.get("DATABASE_NAME", "psql_db"),
                                  options     = "-c search_path=DATAVANT_O")                    
    with connection.cursor() as cursor:
        logging.info(str(connection.get_dsn_parameters()) + '\n')
        cursor.execute("SELECT version();")
        connection.commit()
        conn = cursor.fetchone()
        logging.info("You are connected to - " + str(conn))
        tempb = '''
        INSERT INTO DATAVANT_STG_O.mortality_index_YEARS[periods]
        SELECT * FROM DATAVANT_STG_O.tmp_mortality_YEARS[periods]; '''
        cursor.execute(tempb)
        logging.info("Performing Insert Operation")
        connection.commit()
        count = cursor.rowcount
        logging.info(str(count) + " - count for the period:  YEARS[periods]")
        logging.info(" - count for the period:  YEARS[periods]")
        connection.close()
        print("PostgreSQL connection is closed")

##YAML 文件

---
periods:

  - 1940
  - 1941_1945
  - 1946_1950
  - 1951_1955
  - 1956_1960
  - 1961_1965
  - 1966_1970
  - 1971_1975
  - 1976_1980
  - 1981_1985
  - 1986_1990
  - 1991_1995
  - 1996_2000
  - 2001_2005
  - 2006_2010
  - 2011_2015
  - 2016_2020
  - 2021_2025

这是我了解到无法将来自 YAML 文件的值分配给 python 文件中的 SQL 语句的部分。

我对这个大问题深表歉意,但我只想说清楚并提供尽可能多的信息。

如果有任何建议可以帮助我使用上述任何一种方法解决此问题,我将不胜感激,或者随时向我推荐一种新方法,谢谢!

##下面是输出,当我尝试 Mike 的方法时:

import os
import logging
import psycopg2
import socket
logging.basicConfig(level=logging.INFO)
cpc_name = socket.gethostname()

periods = ['1940']
periods.extend(['{}_{}'.format(i, i + 4) for i in range(1941, 2026, 5)])

for period in periods:
    # Do your psycopg2 connection here and get your cursor
    connection = psycopg2.connect(user        = os.environ.get("DATABASE_USER", "SVTDATAVANT"),
                                  password    = os.environ.get("DATABASE_PASS", "pass"),
                                  host        = os.environ.get("DATABASE_HOST", "psql.silver.com"),
                                  port        = 5432,
                                  dbname      = os.environ.get("DATABASE_NAME", "psql_db"),
                                  options     = "-c search_path=DATAVANT_O")                    
    with connection.cursor() as cursor:
        logging.info(str(connection.get_dsn_parameters()) + '\n')
        cursor.execute("SELECT version();")
        connection.commit()
        conn = cursor.fetchone()
        logging.info("You are connected to - " + str(conn))
        cursor.execute("""
        SELECT COUNT(*) FROM datavant_stg_o.mortality_index_{};""". format(period, period)
        )
        # Commit and close your connection here
        connection.commit()
        count = cursor.rowcount
        logging.info("Count for the period {} is: " . format(period, period) + str(count) +  '\n')
        connection.close()
        print("PostgreSQL connection is closed" + "\n")

(目前我只是尝试执行计数(*)来检查功能) 所以我期望分区的计数为:

SELECT COUNT(*) FROM datavant_stg_o.mortality_index_1940 - 1001066
SELECT COUNT(*) FROM datavant_stg_o.mortality_index_1941_1945 - 1713850
etc which are the original counts when i query from pgAdmin

但我得到的输出是

Count for the period 1940 is: 1, 
Count for the period 1941_1945 is: 1 etc,

跟引号有关系吗?

如果您使用的是 python 3.6 或更高版本:

periods = ['1940']
periods.extend([f'{i}_{i + 4}' for i in range(1941, 2026, 5)])

for period in periods:
    # Do your psycopg2 connection here and get your cursor
    cursor.execute(f"""
      INSERT INTO DATAVANT_STG_O.mortality_index_{period}
      SELECT * FROM DATAVANT_STG_O.tmp_mortality_{period};""")
    # Commit and close your connection here

较早的 python:

periods = ['1940']
periods.extend(['{}_{}'.format(i, i + 4) for i in range(1941, 2026, 5)])

for period in periods:
    # Do your psycopg2 connection here and get your cursor
    cursor.execute("""
      INSERT INTO DATAVANT_STG_O.mortality_index_{}
      SELECT * FROM DATAVANT_STG_O.tmp_mortality_{};""". format(period, period)
    )
    # Commit and close your connection here