Python 要调用的代码 SQL 用于摄取 CSV 的服务器存储过程需要 HOURS 才能执行
Python code to invoke SQL Server stored procedure to ingest CSVs takes HOURS to execute
我正在使用 python 通过 Pandas 读取 CSV,修复一些字段,然后逐行将数据写入 table in SQL 服务器。服务器上禁用批量导入——同样,因为最终会有几十个这样的文件,以自动下载和摄取文件。我可以看到这需要几分钟,但 运行.
需要 HOURS
我知道我可以在几秒钟内批量上传这些东西,如果启用的话,但这可能是不可能的。
问题是使用 python 每个 运行 可能需要 1 到 3 个小时。这不是acceptable。我想知道是否有更快的上传方式。我可以用 table 做些什么来加快导入速度,或者使用不同的编码方式。
这是我正在使用的代码类型的示例:
def ingest_glief_reporting_exceptions_csv():
global conn
global cursor
filename = r"20200824-0800-gleif-goldencopy-repex-golden-copy.csv"
# filename = r"repex_1K.csv"
full_filename = os.path.join(raw_data_dir, filename)
sql_str = "exec dbo.util_get_gleif_last_reporting_exception"
cursor.execute(sql_str)
last_lei = ''
for result in cursor.fetchall():
last_lei = result[0]
# "repex" is short for "reporting exceptions", shorten the headers
repex_headers = [
'LEI',
'ExceptionCategory',
'ExceptionReason1',
'ExceptionReason2',
'ExceptionReason3',
'ExceptionReason4',
'ExceptionReason5',
'ExceptionReference1',
'ExceptionReference2',
'ExceptionReference3',
'ExceptionReference4',
'ExceptionReference5'
]
df = pd.read_csv(full_filename, header=0, quotechar='"')
# Change to the column headers generated in VBA
df.columns = repex_headers
for colname in df.columns:
df[colname] = df[colname].astype(str)
df[colname] = df[colname].replace({'nan': ''})
place_holder = '?,?'
for i in range(1, len(repex_headers)):
place_holder += ',?'
sql_str = "exec save_gleif_reporting_exception " + place_holder
row_count = 0
row = dict()
do_not_upload = True
if last_lei == '':
do_not_upload = False # There was no last uploaded record, so we can start now
for index, row in df.iterrows():
row_count += 1
if do_not_upload:
if row['LEI'] == last_lei:
do_not_upload = False
continue
else:
continue
values = (
row['LEI'],
row['ExceptionCategory'],
row['ExceptionReason1'],
row['ExceptionReason2'],
row['ExceptionReason3'],
row['ExceptionReason4'],
row['ExceptionReason5'],
row['ExceptionReference1'],
row['ExceptionReference2'],
row['ExceptionReference3'],
row['ExceptionReference4'],
row['ExceptionReference5'],
filename
)
if index % 1000 == 0:
print("Imported %s rows" % (index))
# print(values)
# print("processing row ", row_count)
# return Key is the unique ID the database generated as it inserted this row of data.
error_sql_str = "exec log_message ?,?,?,?,?, ?,?,?,?"
connection_failures = 0
connection_failing = True
while connection_failures < 3 and connection_failing:
try:
return_key = cursor.execute(sql_str, values).fetchval()
except pyodbc.OperationalError as e:
connection_failures += 1
connection_failing = True
print("Connection issue. connection failures = ", connection_failures)
time.sleep(30) # wait 30 seconds and go to the top of the loop to try again.
continue
except pyodbc.ProgrammingError as e:
print("Bad field ", values)
error_values = (
'ERROR',
__file__,
filename,
'gleif_reporting_exceptions',
row['LEI'],
'',
'',
'',
str(e)
)
return_key = cursor.execute(error_sql_str, error_values).fetchval()
connection_failures = 0
connection_failures = 0
connection_failing = False
if connection_failures >= 3:
print("Unable to reconnect after 3 tries")
exit(1)
conn.close()
return
我是这样打开数据库的:
def init_connection(server_name, db_name):
"""
Connect to SQL Server database
:param server_name:
:param db_name:
:return:
"""
pyodbc.pooling = False
try:
conn = pyodbc.connect(
r'DRIVER={ODBC Driver 17 for SQL Server};SERVER=' + server_name + '; \
Database=' + db_name + ';Trusted_Connection=yes;', timeout=5, autocommit=True)
except Exception as e:
print("Unable to connect to database [" + db_name + '] and server [' + server_name + ']')
print(e)
exit(1)
cursor = conn.cursor()
return [conn, cursor]
好的。
table的定义是这样的:
CREATE TABLE [dbo].[gleif_exceptions](
[id] [bigint] IDENTITY(1,1) NOT NULL,
[ida_last_update_date] [datetime] NULL,
[ida_last_update_source_file] [nvarchar](500) NULL,
[LEI] [nvarchar](500) NULL,
[ExceptionCategory] [nvarchar](500) NULL,
[ExceptionReason1] [nvarchar](500) NULL,
[ExceptionReason2] [nvarchar](500) NULL,
[ExceptionReason3] [nvarchar](500) NULL,
[ExceptionReason4] [nvarchar](500) NULL,
[ExceptionReason5] [nvarchar](500) NULL,
[ExceptionReference1] [nvarchar](500) NULL,
[ExceptionReference2] [nvarchar](500) NULL,
[ExceptionReference3] [nvarchar](500) NULL,
[ExceptionReference4] [nvarchar](500) NULL,
[ExceptionReference5] [nvarchar](500) NULL
) ON [PRIMARY]
GO
下面是一些示例数据:
LEI,Exception.Category,Exception.Reason.1,Exception.Reason.2,Exception.Reason.3,Exception.Reason.4,Exception.Reason.5,Exception.Reference.1,Exception.Reference.2,Exception.Reference.3,Exception.Reference.4,Exception.Reference.5
004L5FPTUREIWK9T2N63,DIRECT_ACCOUNTING_CONSOLIDATION_PARENT,NON_CONSOLIDATING,,,,,,,,,
00EHHQ2ZHDCFXJCPCL46,DIRECT_ACCOUNTING_CONSOLIDATION_PARENT,NON_CONSOLIDATING,,,,,,,,,
这是我调用的相应存储过程,用于将记录存储到 table:
ALTER PROCEDURE [dbo].[save_gleif_reporting_exception]
@LEI [nvarchar] (500) = NULL,
@ExceptionCategory [nvarchar] (500) = NULL,
@ExceptionReason1 [nvarchar] (500) = NULL,
@ExceptionReason2 [nvarchar] (500) = NULL,
@ExceptionReason3 [nvarchar] (500) = NULL,
@ExceptionReason4 [nvarchar] (500) = NULL,
@ExceptionReason5 [nvarchar] (500) = NULL,
@ExceptionReference1 [nvarchar] (500) = NULL,
@ExceptionReference2 [nvarchar] (500) = NULL,
@ExceptionReference3 [nvarchar] (500) = NULL,
@ExceptionReference4 [nvarchar] (500) = NULL,
@ExceptionReference5 [nvarchar] (500) = NULL,
@ida_last_update_source_file [nvarchar] (500) NULL
AS
BEGIN
-- SET NOCOUNT ON added to prevent extra result sets from
-- interfering with SELECT statements.
SET NOCOUNT ON;
-- Insert statements for procedure here
INSERT INTO dbo.gleif_reporting_exceptions(
[LEI],
[ExceptionCategory],
[ExceptionReason1],
[ExceptionReason2],
[ExceptionReason3],
[ExceptionReason4],
[ExceptionReason5],
[ExceptionReference1],
[ExceptionReference2],
[ExceptionReference3],
[ExceptionReference4],
[ExceptionReference5],
[ida_last_update_date],
[ida_last_update_source_file]
)
VALUES (
@LEI,
@ExceptionCategory,
@ExceptionReason1,
@ExceptionReason2,
@ExceptionReason3,
@ExceptionReason4,
@ExceptionReason5,
@ExceptionReference1,
@ExceptionReference2,
@ExceptionReference3,
@ExceptionReference4,
@ExceptionReference5,
GETDATE(),
@ida_last_update_source_file
)
SELECT @@IDENTITY
END
注意 1:虽然我将字符串声明为 nvarchar (500),但它们中的大多数都没有那么长。我认为这不重要。我尝试使用较短的字符串定义,运行 例程仍然需要很长时间。
注2:这只是目前7个例子中的一个。最小的 table 大约有几十个 K 行,多达几百万个。列数在 7 到大约 230 之间变化。
关闭自动提交
conn = pyodbc.connect(
r'DRIVER={ODBC Driver 17 for SQL Server};SERVER=' + server_name + '; \
Database=' + db_name + ';Trusted_Connection=yes;', timeout=5, autocommit=False)
并在此处和循环结束时提交。
if index % 1000 == 0:
print("Imported %s rows" % (index))
使用自动提交,您必须在每一行之后等待日志文件保存到磁盘。
要进一步优化,如果您使用的是 SQL 2016+,请使用 JSON 将成批的行发送到 SQL 服务器,在服务器端使用 [=12= 进行解析].
因为您不需要存储过程中的 return 值,您应该可以只使用 pandas' to_sql
方法将行直接插入到 table。此代码...
from time import time
import pandas as pd
import sqlalchemy as sa
from_engine = sa.create_engine("mssql+pyodbc://@mssqlLocal64")
to_engine = sa.create_engine(
"mssql+pyodbc://sa:_whatever_@192.168.0.199/mydb"
"?driver=ODBC+Driver+17+for+SQL+Server",
fast_executemany=False,
)
# set up test
to_cnxn = to_engine.raw_connection()
to_cnxn.execute("TRUNCATE TABLE MillionRows")
to_cnxn.commit()
num_rows_to_upload = 10000
df = pd.read_sql_query(
f"SELECT TOP {num_rows_to_upload} "
"[TextField], [LongIntegerField], [DoubleField], [varchar_column] "
"FROM MillionRows ORDER BY ID",
from_engine,
)
# run test
t0 = time()
df.to_sql("MillionRows", to_engine, index=False, if_exists="append")
s = f"{(time() - t0):0.1f} seconds"
print(f"uploading {num_rows_to_upload:,d} rows took {s}")
… 表示与您现在所做的工作大致相同的内部工作水平,即将每一行作为单独的 .execute
调用上传。结果是
uploading 10,000 rows took 60.2 seconds
但是,简单地将 to_engine
更改为使用 fast_executemany=True
会导致
uploading 10,000 rows took 1.4 seconds
我正在使用 python 通过 Pandas 读取 CSV,修复一些字段,然后逐行将数据写入 table in SQL 服务器。服务器上禁用批量导入——同样,因为最终会有几十个这样的文件,以自动下载和摄取文件。我可以看到这需要几分钟,但 运行.
需要 HOURS我知道我可以在几秒钟内批量上传这些东西,如果启用的话,但这可能是不可能的。
问题是使用 python 每个 运行 可能需要 1 到 3 个小时。这不是acceptable。我想知道是否有更快的上传方式。我可以用 table 做些什么来加快导入速度,或者使用不同的编码方式。
这是我正在使用的代码类型的示例:
def ingest_glief_reporting_exceptions_csv():
global conn
global cursor
filename = r"20200824-0800-gleif-goldencopy-repex-golden-copy.csv"
# filename = r"repex_1K.csv"
full_filename = os.path.join(raw_data_dir, filename)
sql_str = "exec dbo.util_get_gleif_last_reporting_exception"
cursor.execute(sql_str)
last_lei = ''
for result in cursor.fetchall():
last_lei = result[0]
# "repex" is short for "reporting exceptions", shorten the headers
repex_headers = [
'LEI',
'ExceptionCategory',
'ExceptionReason1',
'ExceptionReason2',
'ExceptionReason3',
'ExceptionReason4',
'ExceptionReason5',
'ExceptionReference1',
'ExceptionReference2',
'ExceptionReference3',
'ExceptionReference4',
'ExceptionReference5'
]
df = pd.read_csv(full_filename, header=0, quotechar='"')
# Change to the column headers generated in VBA
df.columns = repex_headers
for colname in df.columns:
df[colname] = df[colname].astype(str)
df[colname] = df[colname].replace({'nan': ''})
place_holder = '?,?'
for i in range(1, len(repex_headers)):
place_holder += ',?'
sql_str = "exec save_gleif_reporting_exception " + place_holder
row_count = 0
row = dict()
do_not_upload = True
if last_lei == '':
do_not_upload = False # There was no last uploaded record, so we can start now
for index, row in df.iterrows():
row_count += 1
if do_not_upload:
if row['LEI'] == last_lei:
do_not_upload = False
continue
else:
continue
values = (
row['LEI'],
row['ExceptionCategory'],
row['ExceptionReason1'],
row['ExceptionReason2'],
row['ExceptionReason3'],
row['ExceptionReason4'],
row['ExceptionReason5'],
row['ExceptionReference1'],
row['ExceptionReference2'],
row['ExceptionReference3'],
row['ExceptionReference4'],
row['ExceptionReference5'],
filename
)
if index % 1000 == 0:
print("Imported %s rows" % (index))
# print(values)
# print("processing row ", row_count)
# return Key is the unique ID the database generated as it inserted this row of data.
error_sql_str = "exec log_message ?,?,?,?,?, ?,?,?,?"
connection_failures = 0
connection_failing = True
while connection_failures < 3 and connection_failing:
try:
return_key = cursor.execute(sql_str, values).fetchval()
except pyodbc.OperationalError as e:
connection_failures += 1
connection_failing = True
print("Connection issue. connection failures = ", connection_failures)
time.sleep(30) # wait 30 seconds and go to the top of the loop to try again.
continue
except pyodbc.ProgrammingError as e:
print("Bad field ", values)
error_values = (
'ERROR',
__file__,
filename,
'gleif_reporting_exceptions',
row['LEI'],
'',
'',
'',
str(e)
)
return_key = cursor.execute(error_sql_str, error_values).fetchval()
connection_failures = 0
connection_failures = 0
connection_failing = False
if connection_failures >= 3:
print("Unable to reconnect after 3 tries")
exit(1)
conn.close()
return
我是这样打开数据库的:
def init_connection(server_name, db_name):
"""
Connect to SQL Server database
:param server_name:
:param db_name:
:return:
"""
pyodbc.pooling = False
try:
conn = pyodbc.connect(
r'DRIVER={ODBC Driver 17 for SQL Server};SERVER=' + server_name + '; \
Database=' + db_name + ';Trusted_Connection=yes;', timeout=5, autocommit=True)
except Exception as e:
print("Unable to connect to database [" + db_name + '] and server [' + server_name + ']')
print(e)
exit(1)
cursor = conn.cursor()
return [conn, cursor]
好的。
table的定义是这样的:
CREATE TABLE [dbo].[gleif_exceptions](
[id] [bigint] IDENTITY(1,1) NOT NULL,
[ida_last_update_date] [datetime] NULL,
[ida_last_update_source_file] [nvarchar](500) NULL,
[LEI] [nvarchar](500) NULL,
[ExceptionCategory] [nvarchar](500) NULL,
[ExceptionReason1] [nvarchar](500) NULL,
[ExceptionReason2] [nvarchar](500) NULL,
[ExceptionReason3] [nvarchar](500) NULL,
[ExceptionReason4] [nvarchar](500) NULL,
[ExceptionReason5] [nvarchar](500) NULL,
[ExceptionReference1] [nvarchar](500) NULL,
[ExceptionReference2] [nvarchar](500) NULL,
[ExceptionReference3] [nvarchar](500) NULL,
[ExceptionReference4] [nvarchar](500) NULL,
[ExceptionReference5] [nvarchar](500) NULL
) ON [PRIMARY]
GO
下面是一些示例数据:
LEI,Exception.Category,Exception.Reason.1,Exception.Reason.2,Exception.Reason.3,Exception.Reason.4,Exception.Reason.5,Exception.Reference.1,Exception.Reference.2,Exception.Reference.3,Exception.Reference.4,Exception.Reference.5
004L5FPTUREIWK9T2N63,DIRECT_ACCOUNTING_CONSOLIDATION_PARENT,NON_CONSOLIDATING,,,,,,,,,
00EHHQ2ZHDCFXJCPCL46,DIRECT_ACCOUNTING_CONSOLIDATION_PARENT,NON_CONSOLIDATING,,,,,,,,,
这是我调用的相应存储过程,用于将记录存储到 table:
ALTER PROCEDURE [dbo].[save_gleif_reporting_exception]
@LEI [nvarchar] (500) = NULL,
@ExceptionCategory [nvarchar] (500) = NULL,
@ExceptionReason1 [nvarchar] (500) = NULL,
@ExceptionReason2 [nvarchar] (500) = NULL,
@ExceptionReason3 [nvarchar] (500) = NULL,
@ExceptionReason4 [nvarchar] (500) = NULL,
@ExceptionReason5 [nvarchar] (500) = NULL,
@ExceptionReference1 [nvarchar] (500) = NULL,
@ExceptionReference2 [nvarchar] (500) = NULL,
@ExceptionReference3 [nvarchar] (500) = NULL,
@ExceptionReference4 [nvarchar] (500) = NULL,
@ExceptionReference5 [nvarchar] (500) = NULL,
@ida_last_update_source_file [nvarchar] (500) NULL
AS
BEGIN
-- SET NOCOUNT ON added to prevent extra result sets from
-- interfering with SELECT statements.
SET NOCOUNT ON;
-- Insert statements for procedure here
INSERT INTO dbo.gleif_reporting_exceptions(
[LEI],
[ExceptionCategory],
[ExceptionReason1],
[ExceptionReason2],
[ExceptionReason3],
[ExceptionReason4],
[ExceptionReason5],
[ExceptionReference1],
[ExceptionReference2],
[ExceptionReference3],
[ExceptionReference4],
[ExceptionReference5],
[ida_last_update_date],
[ida_last_update_source_file]
)
VALUES (
@LEI,
@ExceptionCategory,
@ExceptionReason1,
@ExceptionReason2,
@ExceptionReason3,
@ExceptionReason4,
@ExceptionReason5,
@ExceptionReference1,
@ExceptionReference2,
@ExceptionReference3,
@ExceptionReference4,
@ExceptionReference5,
GETDATE(),
@ida_last_update_source_file
)
SELECT @@IDENTITY
END
注意 1:虽然我将字符串声明为 nvarchar (500),但它们中的大多数都没有那么长。我认为这不重要。我尝试使用较短的字符串定义,运行 例程仍然需要很长时间。
注2:这只是目前7个例子中的一个。最小的 table 大约有几十个 K 行,多达几百万个。列数在 7 到大约 230 之间变化。
关闭自动提交
conn = pyodbc.connect(
r'DRIVER={ODBC Driver 17 for SQL Server};SERVER=' + server_name + '; \
Database=' + db_name + ';Trusted_Connection=yes;', timeout=5, autocommit=False)
并在此处和循环结束时提交。
if index % 1000 == 0:
print("Imported %s rows" % (index))
使用自动提交,您必须在每一行之后等待日志文件保存到磁盘。
要进一步优化,如果您使用的是 SQL 2016+,请使用 JSON 将成批的行发送到 SQL 服务器,在服务器端使用 [=12= 进行解析].
因为您不需要存储过程中的 return 值,您应该可以只使用 pandas' to_sql
方法将行直接插入到 table。此代码...
from time import time
import pandas as pd
import sqlalchemy as sa
from_engine = sa.create_engine("mssql+pyodbc://@mssqlLocal64")
to_engine = sa.create_engine(
"mssql+pyodbc://sa:_whatever_@192.168.0.199/mydb"
"?driver=ODBC+Driver+17+for+SQL+Server",
fast_executemany=False,
)
# set up test
to_cnxn = to_engine.raw_connection()
to_cnxn.execute("TRUNCATE TABLE MillionRows")
to_cnxn.commit()
num_rows_to_upload = 10000
df = pd.read_sql_query(
f"SELECT TOP {num_rows_to_upload} "
"[TextField], [LongIntegerField], [DoubleField], [varchar_column] "
"FROM MillionRows ORDER BY ID",
from_engine,
)
# run test
t0 = time()
df.to_sql("MillionRows", to_engine, index=False, if_exists="append")
s = f"{(time() - t0):0.1f} seconds"
print(f"uploading {num_rows_to_upload:,d} rows took {s}")
… 表示与您现在所做的工作大致相同的内部工作水平,即将每一行作为单独的 .execute
调用上传。结果是
uploading 10,000 rows took 60.2 seconds
但是,简单地将 to_engine
更改为使用 fast_executemany=True
会导致
uploading 10,000 rows took 1.4 seconds