来自 Python 的多行 UPSERT(插入或更新)
Multi-row UPSERT (INSERT or UPDATE) from Python
我目前正在使用 python 执行下面的简单查询,使用 pyodbc 在 SQL 服务器 table:
中插入数据
import pyodbc
table_name = 'my_table'
insert_values = [(1,2,3),(2,2,4),(3,4,5)]
cnxn = pyodbc.connect(...)
cursor = cnxn.cursor()
cursor.execute(
' '.join([
'insert into',
table_name,
'values',
','.join(
[str(i) for i in insert_values]
)
])
)
cursor.commit()
只要没有重复的键,这就应该有效(假设第一列包含键)。但是,对于具有重复键的数据(table 中已经存在的数据),它将引发错误。
我如何使用 pyodbc 一次性在 SQL 服务器 table 中插入多行,以便更新具有重复键的数据。
注意:针对单行数据提出了解决方案,但是,我想一次插入多行(避免循环)!
这可以使用 MERGE
来完成。假设您有一个键列 ID
,以及两个列 col_a
和 col_b
(您需要在更新语句中指定列名),那么该语句将如下所示:
MERGE INTO MyTable as Target
USING (SELECT * FROM
(VALUES (1, 2, 3), (2, 2, 4), (3, 4, 5))
AS s (ID, col_a, col_b)
) AS Source
ON Target.ID=Source.ID
WHEN NOT MATCHED THEN
INSERT (ID, col_a, col_b) VALUES (Source.ID, Source.col_a, Source.col_b)
WHEN MATCHED THEN
UPDATE SET col_a=Source.col_a, col_b=Source.col_b;
您可以在 rextester.com/IONFW62765 上试一试。
基本上,我正在使用您想要 upsert 的值列表创建 Source
table "on-the-fly"。当您随后将 Source
table 与 Target
合并时,您可以在每一行上测试 MATCHED
条件 (Target.ID=Source.ID
)(而您将被限制为仅使用简单的 IF <exists> INSERT (...) ELSE UPDATE (...)
条件时的单行)。
在 python 和 pyodbc
中,它可能看起来像这样:
import pyodbc
insert_values = [(1, 2, 3), (2, 2, 4), (3, 4, 5)]
table_name = 'my_table'
key_col = 'ID'
col_a = 'col_a'
col_b = 'col_b'
cnxn = pyodbc.connect(...)
cursor = cnxn.cursor()
cursor.execute(('MERGE INTO {table_name} as Target '
'USING (SELECT * FROM '
'(VALUES {vals}) '
'AS s ({k}, {a}, {b}) '
') AS Source '
'ON Target.ID=Source.ID '
'WHEN NOT MATCHED THEN '
'INSERT ({k}, {a}, {b}) VALUES (Source.{k}, Source.{a}, Source.{b}) '
'WHEN MATCHED THEN '
'UPDATE SET {k}=Source.{a}, col_b=Source.{b};'
.format(table_name=table_name,
vals=','.join([str(i) for i in insert_values]),
k=key_col,
a=col_a,
b=col_b)))
cursor.commit()
您可以在 SQL Server docs 中阅读更多关于 MERGE
的内容。
给定一个数据框 (df),我使用 ksbg 中的代码将 upsert 插入 table。请注意,我在两列(日期和站点代码)上查找匹配项,您可以使用一个。代码生成给定任何 df 的查询。
def append(df, c):
table_name = 'ddf.ddf_actuals'
columns_list = df.columns.tolist()
columns_list_query = f'({(",".join(columns_list))})'
sr_columns_list = [f'Source.{i}' for i in columns_list]
sr_columns_list_query = f'({(",".join(sr_columns_list))})'
up_columns_list = [f'{i}=Source.{i}' for i in columns_list]
up_columns_list_query = f'{",".join(up_columns_list)}'
rows_to_insert = [row.tolist() for idx, row in final_list.iterrows()]
rows_to_insert = str(rows_to_insert).replace('[', '(').replace(']', ')')[1:][:-1]
query = f"MERGE INTO {table_name} as Target \
USING (SELECT * FROM \
(VALUES {rows_to_insert}) \
AS s {columns_list_query}\
) AS Source \
ON Target.stationcode=Source.stationcode AND Target.date=Source.date \
WHEN NOT MATCHED THEN \
INSERT {columns_list_query} VALUES {sr_columns_list_query} \
WHEN MATCHED THEN \
UPDATE SET {up_columns_list_query};"
c.execute(query)
c.commit()
跟进这里的现有答案,因为它们可能容易受到注入攻击,最好使用参数化查询(对于 mssql/pyodbc,这些是“?”占位符)。我稍微调整了 Alexander Novas 的代码以在带有 sqlalchemy 的查询的参数化版本中使用数据帧行:
# assuming you already have a dataframe "df" and sqlalchemy engine called "engine"
# also assumes your dataframe columns have all the same names as the existing table
table_name_to_update = 'update_table'
table_name_to_transfer = 'placeholder_table'
# the dataframe and existing table should both have a column to use as the primary key
primary_key_col = 'id'
# replace the placeholder table with the dataframe
df.to_sql(table_name_to_transfer, engine, if_exists='replace', index=False)
# building the command terms
cols_list = df.columns.tolist()
cols_list_query = f'({(", ".join(cols_list))})'
sr_cols_list = [f'Source.{i}' for i in cols_list]
sr_cols_list_query = f'({(", ".join(sr_cols_list))})'
up_cols_list = [f'{i}=Source.{i}' for i in cols_list]
up_cols_list_query = f'{", ".join(up_cols_list)}'
# fill values that should be interpreted as "NULL" with None
def fill_null(vals: list) -> list:
def bad(val):
if isinstance(val, type(pd.NA)):
return True
# the list of values you want to interpret as 'NULL' should be
# tweaked to your needs
return val in ['NULL', np.nan, 'nan', '', '', '-', '?']
return tuple(i if not bad(i) else None for i in vals)
# create the list of parameter indicators (?, ?, ?, etc...)
# and the parameters, which are the values to be inserted
params = [fill_null(row.tolist()) for _, row in df.iterrows()]
param_slots = '('+', '.join(['?']*len(df.columns))+')'
cmd = f'''
MERGE INTO {table_name_to_update} as Target
USING (SELECT * FROM
(VALUES {param_slots})
AS s {cols_list_query}
) AS Source
ON Target.{primary_key_col}=Source.{primary_key_col}
WHEN NOT MATCHED THEN
INSERT {cols_list_query} VALUES {sr_cols_list_query}
WHEN MATCHED THEN
UPDATE SET {up_cols_list_query};
'''
# execute the command to merge tables
with engine.begin() as conn:
conn.execute(cmd, params)
如果您插入的字符串中包含与 SQL 插入文本不兼容的字符(例如混淆插入语句的撇号),则此方法也更好,因为它让连接引擎处理参数化值(这也使它更安全地抵御 SQL 注入攻击)。
作为参考,我正在使用此代码创建引擎连接 - 您显然需要根据您的 server/database/environment 以及您是否需要 fast_executemany
:
import urllib
import pyodbc
pyodbc.pooling = False
import sqlalchemy
terms = urllib.parse.quote_plus(
'DRIVER={SQL Server Native Client 11.0};'
'SERVER=<your server>;'
'DATABASE=<your database>;'
'Trusted_Connection=yes;' # to logon using Windows credentials
url = f'mssql+pyodbc:///?odbc_connect={terms}'
engine = sqlalchemy.create_engine(url, fast_executemany=True)
编辑:我意识到这段代码实际上根本没有使用“占位符”table,只是通过参数化命令直接从数据帧行复制值。
我目前正在使用 python 执行下面的简单查询,使用 pyodbc 在 SQL 服务器 table:
中插入数据import pyodbc
table_name = 'my_table'
insert_values = [(1,2,3),(2,2,4),(3,4,5)]
cnxn = pyodbc.connect(...)
cursor = cnxn.cursor()
cursor.execute(
' '.join([
'insert into',
table_name,
'values',
','.join(
[str(i) for i in insert_values]
)
])
)
cursor.commit()
只要没有重复的键,这就应该有效(假设第一列包含键)。但是,对于具有重复键的数据(table 中已经存在的数据),它将引发错误。 我如何使用 pyodbc 一次性在 SQL 服务器 table 中插入多行,以便更新具有重复键的数据。
注意:针对单行数据提出了解决方案,但是,我想一次插入多行(避免循环)!
这可以使用 MERGE
来完成。假设您有一个键列 ID
,以及两个列 col_a
和 col_b
(您需要在更新语句中指定列名),那么该语句将如下所示:
MERGE INTO MyTable as Target
USING (SELECT * FROM
(VALUES (1, 2, 3), (2, 2, 4), (3, 4, 5))
AS s (ID, col_a, col_b)
) AS Source
ON Target.ID=Source.ID
WHEN NOT MATCHED THEN
INSERT (ID, col_a, col_b) VALUES (Source.ID, Source.col_a, Source.col_b)
WHEN MATCHED THEN
UPDATE SET col_a=Source.col_a, col_b=Source.col_b;
您可以在 rextester.com/IONFW62765 上试一试。
基本上,我正在使用您想要 upsert 的值列表创建 Source
table "on-the-fly"。当您随后将 Source
table 与 Target
合并时,您可以在每一行上测试 MATCHED
条件 (Target.ID=Source.ID
)(而您将被限制为仅使用简单的 IF <exists> INSERT (...) ELSE UPDATE (...)
条件时的单行)。
在 python 和 pyodbc
中,它可能看起来像这样:
import pyodbc
insert_values = [(1, 2, 3), (2, 2, 4), (3, 4, 5)]
table_name = 'my_table'
key_col = 'ID'
col_a = 'col_a'
col_b = 'col_b'
cnxn = pyodbc.connect(...)
cursor = cnxn.cursor()
cursor.execute(('MERGE INTO {table_name} as Target '
'USING (SELECT * FROM '
'(VALUES {vals}) '
'AS s ({k}, {a}, {b}) '
') AS Source '
'ON Target.ID=Source.ID '
'WHEN NOT MATCHED THEN '
'INSERT ({k}, {a}, {b}) VALUES (Source.{k}, Source.{a}, Source.{b}) '
'WHEN MATCHED THEN '
'UPDATE SET {k}=Source.{a}, col_b=Source.{b};'
.format(table_name=table_name,
vals=','.join([str(i) for i in insert_values]),
k=key_col,
a=col_a,
b=col_b)))
cursor.commit()
您可以在 SQL Server docs 中阅读更多关于 MERGE
的内容。
给定一个数据框 (df),我使用 ksbg 中的代码将 upsert 插入 table。请注意,我在两列(日期和站点代码)上查找匹配项,您可以使用一个。代码生成给定任何 df 的查询。
def append(df, c):
table_name = 'ddf.ddf_actuals'
columns_list = df.columns.tolist()
columns_list_query = f'({(",".join(columns_list))})'
sr_columns_list = [f'Source.{i}' for i in columns_list]
sr_columns_list_query = f'({(",".join(sr_columns_list))})'
up_columns_list = [f'{i}=Source.{i}' for i in columns_list]
up_columns_list_query = f'{",".join(up_columns_list)}'
rows_to_insert = [row.tolist() for idx, row in final_list.iterrows()]
rows_to_insert = str(rows_to_insert).replace('[', '(').replace(']', ')')[1:][:-1]
query = f"MERGE INTO {table_name} as Target \
USING (SELECT * FROM \
(VALUES {rows_to_insert}) \
AS s {columns_list_query}\
) AS Source \
ON Target.stationcode=Source.stationcode AND Target.date=Source.date \
WHEN NOT MATCHED THEN \
INSERT {columns_list_query} VALUES {sr_columns_list_query} \
WHEN MATCHED THEN \
UPDATE SET {up_columns_list_query};"
c.execute(query)
c.commit()
跟进这里的现有答案,因为它们可能容易受到注入攻击,最好使用参数化查询(对于 mssql/pyodbc,这些是“?”占位符)。我稍微调整了 Alexander Novas 的代码以在带有 sqlalchemy 的查询的参数化版本中使用数据帧行:
# assuming you already have a dataframe "df" and sqlalchemy engine called "engine"
# also assumes your dataframe columns have all the same names as the existing table
table_name_to_update = 'update_table'
table_name_to_transfer = 'placeholder_table'
# the dataframe and existing table should both have a column to use as the primary key
primary_key_col = 'id'
# replace the placeholder table with the dataframe
df.to_sql(table_name_to_transfer, engine, if_exists='replace', index=False)
# building the command terms
cols_list = df.columns.tolist()
cols_list_query = f'({(", ".join(cols_list))})'
sr_cols_list = [f'Source.{i}' for i in cols_list]
sr_cols_list_query = f'({(", ".join(sr_cols_list))})'
up_cols_list = [f'{i}=Source.{i}' for i in cols_list]
up_cols_list_query = f'{", ".join(up_cols_list)}'
# fill values that should be interpreted as "NULL" with None
def fill_null(vals: list) -> list:
def bad(val):
if isinstance(val, type(pd.NA)):
return True
# the list of values you want to interpret as 'NULL' should be
# tweaked to your needs
return val in ['NULL', np.nan, 'nan', '', '', '-', '?']
return tuple(i if not bad(i) else None for i in vals)
# create the list of parameter indicators (?, ?, ?, etc...)
# and the parameters, which are the values to be inserted
params = [fill_null(row.tolist()) for _, row in df.iterrows()]
param_slots = '('+', '.join(['?']*len(df.columns))+')'
cmd = f'''
MERGE INTO {table_name_to_update} as Target
USING (SELECT * FROM
(VALUES {param_slots})
AS s {cols_list_query}
) AS Source
ON Target.{primary_key_col}=Source.{primary_key_col}
WHEN NOT MATCHED THEN
INSERT {cols_list_query} VALUES {sr_cols_list_query}
WHEN MATCHED THEN
UPDATE SET {up_cols_list_query};
'''
# execute the command to merge tables
with engine.begin() as conn:
conn.execute(cmd, params)
如果您插入的字符串中包含与 SQL 插入文本不兼容的字符(例如混淆插入语句的撇号),则此方法也更好,因为它让连接引擎处理参数化值(这也使它更安全地抵御 SQL 注入攻击)。
作为参考,我正在使用此代码创建引擎连接 - 您显然需要根据您的 server/database/environment 以及您是否需要 fast_executemany
:
import urllib
import pyodbc
pyodbc.pooling = False
import sqlalchemy
terms = urllib.parse.quote_plus(
'DRIVER={SQL Server Native Client 11.0};'
'SERVER=<your server>;'
'DATABASE=<your database>;'
'Trusted_Connection=yes;' # to logon using Windows credentials
url = f'mssql+pyodbc:///?odbc_connect={terms}'
engine = sqlalchemy.create_engine(url, fast_executemany=True)
编辑:我意识到这段代码实际上根本没有使用“占位符”table,只是通过参数化命令直接从数据帧行复制值。