Assign/Substitute PostgresSQL 的 SQL 语句中的值列表使用 Python
Assign/Substitute a list of values in a SQL statement for PostgreSQL using Python
我有一个巨大的 table,下面有很多分区 table,但我只想 query/update 几个(18)个分区。所以我必须准备 运行 18 python 脚本(因为我必须根据我们的 DBA 说明在插入每个分区 table 后创建并关闭连接,因为每个 period 包含数百万条记录)到 运行 我的插入语句。
所以我尝试了两种方法来解决这个问题:
一种是 运行 一个 for 循环来替换我的 SQL 语句中的句点列表,该语句涉及编写两个 python 脚本:
import os
import logging
#import psycopg2
import socket
logging.basicConfig(level=logging.INFO)
cpc_name = socket.gethostname()
list_of_periods = [
{'period': '1940'},
{'period': '1941_1945'},
{'period': '1946_1950'},
{'period': '1951_1955'},
{'period': '1956_1960'},
{'period': '1961_1965'},
{'period': '1966_1970'},
{'period': '1971_1975'},
{'period': '1976_1980'},
{'period': '1981_1985'},
{'period': '1986_1990'},
{'period': '1991_1995'},
{'period': '1996_2000'},
{'period': '2001_2005'},
{'period': '2006_2010'},
{'period': '2011_2015'},
{'period': '2016_2020'},
{'period': '2021_2025'}
]
if __name__ == "__main__":
logging.info("Starting test process")
logging.info(" cpc = {}".format(cpc_name) + '\n')
for period in list_of_periods:
os.system('python sample_segment.py')
########
sample_segment.py
import os
import logging
import psycopg2
import socket
logging.basicConfig(level=logging.INFO)
cpc_name = socket.gethostname()
if __name__ == "__main__":
logging.info("Starting test process")
logging.info(" cpc = {}".format(cpc_name) + '\n')
connection = psycopg2.connect(user = os.environ.get("DATABASE_USER", "SVTDATAVANT"),
password = os.environ.get("DATABASE_PASS", "pass"),
host = os.environ.get("DATABASE_HOST", "psql.silver.com"),
port = 5432,
dbname = os.environ.get("DATABASE_NAME", "psql_db"),
options = "-c search_path=DATAVANT_O")
with connection.cursor() as cursor:
logging.info(str(connection.get_dsn_parameters()) + '\n')
cursor.execute("SELECT version();")
connection.commit()
conn = cursor.fetchone()
logging.info("You are connected to - " + str(conn))
tempb = '''
INSERT INTO DATAVANT_STG_O.mortality_index_{period}
SELECT * FROM DATAVANT_STG_O.tmp_mortality_{period}; '''
logging.info("Performing Insert Operation")
cursor.execute(tempb)
connection.commit()
count = cursor.rowcount
logging.info(str(count) + " - count for the period: {period}")
logging.info(" - count for the period: {period}")
connection.close()
print("PostgreSQL connection is closed")
显然它没有用,我怀疑我的 python 编程技能当然或者我正在寻找类似 os.sytem.sql_with_parameters()
的东西
但无论如何我的意图是转下面的语句
INSERT INTO DATAVANT_STG_O.mortality_index_{period}
SELECT * FROM DATAVANT_STG_O.tmp_mortality_{period};
INTO
INSERT INTO DATAVANT_STG_O.mortality_index_1940
SELECT * FROM DATAVANT_STG_O.tmp_mortality_1940;
INSERT INTO DATAVANT_STG_O.mortality_index_1941_1945
SELECT * FROM DATAVANT_STG_O.tmp_mortality_1941_1945;
etc...for 18 periods
我尝试的第二种方法是在我的 python 脚本中读取 YAML 文件。
import os
import logging
import psycopg2
import socket
import yaml
logging.basicConfig(level=logging.INFO)
cpc_name = socket.gethostname()
with open(r'/home/SILVER/user/test/periods.yaml') as file:
YEARS = yaml.load(file, Loader=yaml.FullLoader)
print("load yaml file" + '\n')
print(YEARS)
if __name__ == "__main__":
logging.info("Starting test process")
logging.info(" cpc = {}".format(cpc_name) + '\n')
connection = psycopg2.connect(user = os.environ.get("DATABASE_USER", "SVTDATAVANT"),
password = os.environ.get("DATABASE_PASS", "pass"),
host = os.environ.get("DATABASE_HOST", "psql.host.com"),
port = 5432,
dbname = os.environ.get("DATABASE_NAME", "psql_db"),
options = "-c search_path=DATAVANT_O")
with connection.cursor() as cursor:
logging.info(str(connection.get_dsn_parameters()) + '\n')
cursor.execute("SELECT version();")
connection.commit()
conn = cursor.fetchone()
logging.info("You are connected to - " + str(conn))
tempb = '''
INSERT INTO DATAVANT_STG_O.mortality_index_YEARS[periods]
SELECT * FROM DATAVANT_STG_O.tmp_mortality_YEARS[periods]; '''
cursor.execute(tempb)
logging.info("Performing Insert Operation")
connection.commit()
count = cursor.rowcount
logging.info(str(count) + " - count for the period: YEARS[periods]")
logging.info(" - count for the period: YEARS[periods]")
connection.close()
print("PostgreSQL connection is closed")
##YAML 文件
---
periods:
- 1940
- 1941_1945
- 1946_1950
- 1951_1955
- 1956_1960
- 1961_1965
- 1966_1970
- 1971_1975
- 1976_1980
- 1981_1985
- 1986_1990
- 1991_1995
- 1996_2000
- 2001_2005
- 2006_2010
- 2011_2015
- 2016_2020
- 2021_2025
这是我了解到无法将来自 YAML 文件的值分配给 python 文件中的 SQL 语句的部分。
我对这个大问题深表歉意,但我只想说清楚并提供尽可能多的信息。
如果有任何建议可以帮助我使用上述任何一种方法解决此问题,我将不胜感激,或者随时向我推荐一种新方法,谢谢!
##下面是输出,当我尝试 Mike 的方法时:
import os
import logging
import psycopg2
import socket
logging.basicConfig(level=logging.INFO)
cpc_name = socket.gethostname()
periods = ['1940']
periods.extend(['{}_{}'.format(i, i + 4) for i in range(1941, 2026, 5)])
for period in periods:
# Do your psycopg2 connection here and get your cursor
connection = psycopg2.connect(user = os.environ.get("DATABASE_USER", "SVTDATAVANT"),
password = os.environ.get("DATABASE_PASS", "pass"),
host = os.environ.get("DATABASE_HOST", "psql.silver.com"),
port = 5432,
dbname = os.environ.get("DATABASE_NAME", "psql_db"),
options = "-c search_path=DATAVANT_O")
with connection.cursor() as cursor:
logging.info(str(connection.get_dsn_parameters()) + '\n')
cursor.execute("SELECT version();")
connection.commit()
conn = cursor.fetchone()
logging.info("You are connected to - " + str(conn))
cursor.execute("""
SELECT COUNT(*) FROM datavant_stg_o.mortality_index_{};""". format(period, period)
)
# Commit and close your connection here
connection.commit()
count = cursor.rowcount
logging.info("Count for the period {} is: " . format(period, period) + str(count) + '\n')
connection.close()
print("PostgreSQL connection is closed" + "\n")
(目前我只是尝试执行计数(*)来检查功能)
所以我期望分区的计数为:
SELECT COUNT(*) FROM datavant_stg_o.mortality_index_1940 - 1001066
SELECT COUNT(*) FROM datavant_stg_o.mortality_index_1941_1945 - 1713850
etc which are the original counts when i query from pgAdmin
但我得到的输出是
Count for the period 1940 is: 1,
Count for the period 1941_1945 is: 1 etc,
跟引号有关系吗?
如果您使用的是 python 3.6 或更高版本:
periods = ['1940']
periods.extend([f'{i}_{i + 4}' for i in range(1941, 2026, 5)])
for period in periods:
# Do your psycopg2 connection here and get your cursor
cursor.execute(f"""
INSERT INTO DATAVANT_STG_O.mortality_index_{period}
SELECT * FROM DATAVANT_STG_O.tmp_mortality_{period};""")
# Commit and close your connection here
较早的 python:
periods = ['1940']
periods.extend(['{}_{}'.format(i, i + 4) for i in range(1941, 2026, 5)])
for period in periods:
# Do your psycopg2 connection here and get your cursor
cursor.execute("""
INSERT INTO DATAVANT_STG_O.mortality_index_{}
SELECT * FROM DATAVANT_STG_O.tmp_mortality_{};""". format(period, period)
)
# Commit and close your connection here
我有一个巨大的 table,下面有很多分区 table,但我只想 query/update 几个(18)个分区。所以我必须准备 运行 18 python 脚本(因为我必须根据我们的 DBA 说明在插入每个分区 table 后创建并关闭连接,因为每个 period 包含数百万条记录)到 运行 我的插入语句。 所以我尝试了两种方法来解决这个问题: 一种是 运行 一个 for 循环来替换我的 SQL 语句中的句点列表,该语句涉及编写两个 python 脚本:
import os
import logging
#import psycopg2
import socket
logging.basicConfig(level=logging.INFO)
cpc_name = socket.gethostname()
list_of_periods = [
{'period': '1940'},
{'period': '1941_1945'},
{'period': '1946_1950'},
{'period': '1951_1955'},
{'period': '1956_1960'},
{'period': '1961_1965'},
{'period': '1966_1970'},
{'period': '1971_1975'},
{'period': '1976_1980'},
{'period': '1981_1985'},
{'period': '1986_1990'},
{'period': '1991_1995'},
{'period': '1996_2000'},
{'period': '2001_2005'},
{'period': '2006_2010'},
{'period': '2011_2015'},
{'period': '2016_2020'},
{'period': '2021_2025'}
]
if __name__ == "__main__":
logging.info("Starting test process")
logging.info(" cpc = {}".format(cpc_name) + '\n')
for period in list_of_periods:
os.system('python sample_segment.py')
########
sample_segment.py
import os
import logging
import psycopg2
import socket
logging.basicConfig(level=logging.INFO)
cpc_name = socket.gethostname()
if __name__ == "__main__":
logging.info("Starting test process")
logging.info(" cpc = {}".format(cpc_name) + '\n')
connection = psycopg2.connect(user = os.environ.get("DATABASE_USER", "SVTDATAVANT"),
password = os.environ.get("DATABASE_PASS", "pass"),
host = os.environ.get("DATABASE_HOST", "psql.silver.com"),
port = 5432,
dbname = os.environ.get("DATABASE_NAME", "psql_db"),
options = "-c search_path=DATAVANT_O")
with connection.cursor() as cursor:
logging.info(str(connection.get_dsn_parameters()) + '\n')
cursor.execute("SELECT version();")
connection.commit()
conn = cursor.fetchone()
logging.info("You are connected to - " + str(conn))
tempb = '''
INSERT INTO DATAVANT_STG_O.mortality_index_{period}
SELECT * FROM DATAVANT_STG_O.tmp_mortality_{period}; '''
logging.info("Performing Insert Operation")
cursor.execute(tempb)
connection.commit()
count = cursor.rowcount
logging.info(str(count) + " - count for the period: {period}")
logging.info(" - count for the period: {period}")
connection.close()
print("PostgreSQL connection is closed")
显然它没有用,我怀疑我的 python 编程技能当然或者我正在寻找类似 os.sytem.sql_with_parameters()
的东西
但无论如何我的意图是转下面的语句
INSERT INTO DATAVANT_STG_O.mortality_index_{period}
SELECT * FROM DATAVANT_STG_O.tmp_mortality_{period};
INTO
INSERT INTO DATAVANT_STG_O.mortality_index_1940
SELECT * FROM DATAVANT_STG_O.tmp_mortality_1940;
INSERT INTO DATAVANT_STG_O.mortality_index_1941_1945
SELECT * FROM DATAVANT_STG_O.tmp_mortality_1941_1945;
etc...for 18 periods
我尝试的第二种方法是在我的 python 脚本中读取 YAML 文件。
import os
import logging
import psycopg2
import socket
import yaml
logging.basicConfig(level=logging.INFO)
cpc_name = socket.gethostname()
with open(r'/home/SILVER/user/test/periods.yaml') as file:
YEARS = yaml.load(file, Loader=yaml.FullLoader)
print("load yaml file" + '\n')
print(YEARS)
if __name__ == "__main__":
logging.info("Starting test process")
logging.info(" cpc = {}".format(cpc_name) + '\n')
connection = psycopg2.connect(user = os.environ.get("DATABASE_USER", "SVTDATAVANT"),
password = os.environ.get("DATABASE_PASS", "pass"),
host = os.environ.get("DATABASE_HOST", "psql.host.com"),
port = 5432,
dbname = os.environ.get("DATABASE_NAME", "psql_db"),
options = "-c search_path=DATAVANT_O")
with connection.cursor() as cursor:
logging.info(str(connection.get_dsn_parameters()) + '\n')
cursor.execute("SELECT version();")
connection.commit()
conn = cursor.fetchone()
logging.info("You are connected to - " + str(conn))
tempb = '''
INSERT INTO DATAVANT_STG_O.mortality_index_YEARS[periods]
SELECT * FROM DATAVANT_STG_O.tmp_mortality_YEARS[periods]; '''
cursor.execute(tempb)
logging.info("Performing Insert Operation")
connection.commit()
count = cursor.rowcount
logging.info(str(count) + " - count for the period: YEARS[periods]")
logging.info(" - count for the period: YEARS[periods]")
connection.close()
print("PostgreSQL connection is closed")
##YAML 文件
---
periods:
- 1940
- 1941_1945
- 1946_1950
- 1951_1955
- 1956_1960
- 1961_1965
- 1966_1970
- 1971_1975
- 1976_1980
- 1981_1985
- 1986_1990
- 1991_1995
- 1996_2000
- 2001_2005
- 2006_2010
- 2011_2015
- 2016_2020
- 2021_2025
这是我了解到无法将来自 YAML 文件的值分配给 python 文件中的 SQL 语句的部分。
我对这个大问题深表歉意,但我只想说清楚并提供尽可能多的信息。
如果有任何建议可以帮助我使用上述任何一种方法解决此问题,我将不胜感激,或者随时向我推荐一种新方法,谢谢!
##下面是输出,当我尝试 Mike 的方法时:
import os
import logging
import psycopg2
import socket
logging.basicConfig(level=logging.INFO)
cpc_name = socket.gethostname()
periods = ['1940']
periods.extend(['{}_{}'.format(i, i + 4) for i in range(1941, 2026, 5)])
for period in periods:
# Do your psycopg2 connection here and get your cursor
connection = psycopg2.connect(user = os.environ.get("DATABASE_USER", "SVTDATAVANT"),
password = os.environ.get("DATABASE_PASS", "pass"),
host = os.environ.get("DATABASE_HOST", "psql.silver.com"),
port = 5432,
dbname = os.environ.get("DATABASE_NAME", "psql_db"),
options = "-c search_path=DATAVANT_O")
with connection.cursor() as cursor:
logging.info(str(connection.get_dsn_parameters()) + '\n')
cursor.execute("SELECT version();")
connection.commit()
conn = cursor.fetchone()
logging.info("You are connected to - " + str(conn))
cursor.execute("""
SELECT COUNT(*) FROM datavant_stg_o.mortality_index_{};""". format(period, period)
)
# Commit and close your connection here
connection.commit()
count = cursor.rowcount
logging.info("Count for the period {} is: " . format(period, period) + str(count) + '\n')
connection.close()
print("PostgreSQL connection is closed" + "\n")
(目前我只是尝试执行计数(*)来检查功能) 所以我期望分区的计数为:
SELECT COUNT(*) FROM datavant_stg_o.mortality_index_1940 - 1001066
SELECT COUNT(*) FROM datavant_stg_o.mortality_index_1941_1945 - 1713850
etc which are the original counts when i query from pgAdmin
但我得到的输出是
Count for the period 1940 is: 1,
Count for the period 1941_1945 is: 1 etc,
跟引号有关系吗?
如果您使用的是 python 3.6 或更高版本:
periods = ['1940']
periods.extend([f'{i}_{i + 4}' for i in range(1941, 2026, 5)])
for period in periods:
# Do your psycopg2 connection here and get your cursor
cursor.execute(f"""
INSERT INTO DATAVANT_STG_O.mortality_index_{period}
SELECT * FROM DATAVANT_STG_O.tmp_mortality_{period};""")
# Commit and close your connection here
较早的 python:
periods = ['1940']
periods.extend(['{}_{}'.format(i, i + 4) for i in range(1941, 2026, 5)])
for period in periods:
# Do your psycopg2 connection here and get your cursor
cursor.execute("""
INSERT INTO DATAVANT_STG_O.mortality_index_{}
SELECT * FROM DATAVANT_STG_O.tmp_mortality_{};""". format(period, period)
)
# Commit and close your connection here