Python 要调用的代码 SQL 用于摄取 CSV 的服务器存储过程需要 HOURS 才能执行

Python code to invoke SQL Server stored procedure to ingest CSVs takes HOURS to execute

我正在使用 python 通过 Pandas 读取 CSV,修复一些字段,然后逐行将数据写入 table in SQL 服务器。服务器上禁用批量导入——同样,因为最终会有几十个这样的文件,以自动下载和摄取文件。我可以看到这需要几分钟,但 运行.

需要 HOURS

我知道我可以在几秒钟内批量上传这些东西,如果启用的话,但这可能是不可能的。

问题是使用 python 每个 运行 可能需要 1 到 3 个小时。这不是acceptable。我想知道是否有更快的上传方式。我可以用 table 做些什么来加快导入速度,或者使用不同的编码方式。

这是我正在使用的代码类型的示例:

def ingest_glief_reporting_exceptions_csv():
    global conn
    global cursor
    filename = r"20200824-0800-gleif-goldencopy-repex-golden-copy.csv"
    # filename = r"repex_1K.csv"

    full_filename = os.path.join(raw_data_dir, filename)

    sql_str = "exec dbo.util_get_gleif_last_reporting_exception"
    cursor.execute(sql_str)
    last_lei = ''
    for result in cursor.fetchall():
        last_lei = result[0]

    # "repex" is short for "reporting exceptions", shorten the headers
    repex_headers = [
        'LEI',
        'ExceptionCategory',
        'ExceptionReason1',
        'ExceptionReason2',
        'ExceptionReason3',
        'ExceptionReason4',
        'ExceptionReason5',
        'ExceptionReference1',
        'ExceptionReference2',
        'ExceptionReference3',
        'ExceptionReference4',
        'ExceptionReference5'
    ]

    df = pd.read_csv(full_filename, header=0, quotechar='"')

    # Change to the column headers generated in VBA
    df.columns = repex_headers

    for colname in df.columns:
        df[colname] = df[colname].astype(str)
        df[colname] = df[colname].replace({'nan': ''})


    place_holder = '?,?'
    for i in range(1, len(repex_headers)):
        place_holder += ',?'

    sql_str = "exec save_gleif_reporting_exception " + place_holder

    row_count = 0
    row = dict()
    do_not_upload = True
    if last_lei == '':
        do_not_upload = False   # There was no last uploaded record, so we can start now

    for index, row in df.iterrows():
        row_count += 1
        if do_not_upload:
            if row['LEI'] == last_lei:
                do_not_upload = False
                continue
            else:
                continue

        values = (
            row['LEI'],
            row['ExceptionCategory'],
            row['ExceptionReason1'],
            row['ExceptionReason2'],
            row['ExceptionReason3'],
            row['ExceptionReason4'],
            row['ExceptionReason5'],
            row['ExceptionReference1'],
            row['ExceptionReference2'],
            row['ExceptionReference3'],
            row['ExceptionReference4'],
            row['ExceptionReference5'],
            filename
        )

        if index % 1000 == 0:
                print("Imported %s rows" % (index))

        # print(values)
        # print("processing row ", row_count)
        # return Key is the unique ID the database generated as it inserted this row of data.
        error_sql_str = "exec log_message ?,?,?,?,?, ?,?,?,?"
        connection_failures = 0
        connection_failing = True
        while connection_failures < 3 and connection_failing:
            try:
                return_key = cursor.execute(sql_str, values).fetchval()
            except pyodbc.OperationalError as e:
                connection_failures += 1
                connection_failing = True
                print("Connection issue.  connection failures = ", connection_failures)
                time.sleep(30)      # wait 30 seconds and go to the top of the loop to try again.
                continue
            except pyodbc.ProgrammingError as e:
                print("Bad field ", values)
                error_values = (
                    'ERROR',
                    __file__,
                    filename,
                    'gleif_reporting_exceptions',
                    row['LEI'],
                    '',
                    '',
                    '',
                    str(e)
                )
                return_key = cursor.execute(error_sql_str, error_values).fetchval()
                connection_failures = 0
            connection_failures = 0
            connection_failing = False

        if connection_failures >= 3:
            print("Unable to reconnect after 3 tries")
            exit(1)

    conn.close()
    return

我是这样打开数据库的:

def init_connection(server_name, db_name):
    """
    Connect to SQL Server database
    :param server_name:
    :param db_name:
    :return:
    """
    pyodbc.pooling = False
    try:
        conn = pyodbc.connect(
            r'DRIVER={ODBC Driver 17 for SQL Server};SERVER=' + server_name + '; \
            Database=' + db_name + ';Trusted_Connection=yes;', timeout=5, autocommit=True)
    except Exception as e:
        print("Unable to connect to database [" + db_name + '] and server [' + server_name + ']')
        print(e)
        exit(1)

    cursor = conn.cursor()
    return [conn, cursor]

好的。

table的定义是这样的:

CREATE TABLE [dbo].[gleif_exceptions](
    [id] [bigint] IDENTITY(1,1) NOT NULL,
    [ida_last_update_date] [datetime] NULL,
    [ida_last_update_source_file] [nvarchar](500) NULL,
    [LEI] [nvarchar](500) NULL,
    [ExceptionCategory] [nvarchar](500) NULL,
    [ExceptionReason1] [nvarchar](500) NULL,
    [ExceptionReason2] [nvarchar](500) NULL,
    [ExceptionReason3] [nvarchar](500) NULL,
    [ExceptionReason4] [nvarchar](500) NULL,
    [ExceptionReason5] [nvarchar](500) NULL,
    [ExceptionReference1] [nvarchar](500) NULL,
    [ExceptionReference2] [nvarchar](500) NULL,
    [ExceptionReference3] [nvarchar](500) NULL,
    [ExceptionReference4] [nvarchar](500) NULL,
    [ExceptionReference5] [nvarchar](500) NULL
) ON [PRIMARY]
GO

下面是一些示例数据:

LEI,Exception.Category,Exception.Reason.1,Exception.Reason.2,Exception.Reason.3,Exception.Reason.4,Exception.Reason.5,Exception.Reference.1,Exception.Reference.2,Exception.Reference.3,Exception.Reference.4,Exception.Reference.5
004L5FPTUREIWK9T2N63,DIRECT_ACCOUNTING_CONSOLIDATION_PARENT,NON_CONSOLIDATING,,,,,,,,,
00EHHQ2ZHDCFXJCPCL46,DIRECT_ACCOUNTING_CONSOLIDATION_PARENT,NON_CONSOLIDATING,,,,,,,,,

这是我调用的相应存储过程,用于将记录存储到 table:

ALTER PROCEDURE [dbo].[save_gleif_reporting_exception]
    @LEI [nvarchar] (500) = NULL,
    @ExceptionCategory [nvarchar] (500) = NULL,
    @ExceptionReason1 [nvarchar] (500) = NULL,
    @ExceptionReason2 [nvarchar] (500) = NULL,
    @ExceptionReason3 [nvarchar] (500) = NULL,
    @ExceptionReason4 [nvarchar] (500) = NULL,
    @ExceptionReason5 [nvarchar] (500) = NULL,
    @ExceptionReference1 [nvarchar] (500) = NULL,
    @ExceptionReference2 [nvarchar] (500) = NULL,
    @ExceptionReference3 [nvarchar] (500) = NULL,
    @ExceptionReference4 [nvarchar] (500) = NULL,
    @ExceptionReference5 [nvarchar] (500) = NULL,
    @ida_last_update_source_file [nvarchar] (500) NULL
AS
BEGIN
    -- SET NOCOUNT ON added to prevent extra result sets from
    -- interfering with SELECT statements.
    SET NOCOUNT ON;

    -- Insert statements for procedure here
    INSERT INTO dbo.gleif_reporting_exceptions(
        [LEI],
        [ExceptionCategory],
        [ExceptionReason1],
        [ExceptionReason2],
        [ExceptionReason3],
        [ExceptionReason4],
        [ExceptionReason5],
        [ExceptionReference1],
        [ExceptionReference2],
        [ExceptionReference3],
        [ExceptionReference4],
        [ExceptionReference5],
        [ida_last_update_date],
        [ida_last_update_source_file]
    )
    VALUES (
        @LEI,
        @ExceptionCategory,
        @ExceptionReason1,
        @ExceptionReason2,
        @ExceptionReason3,
        @ExceptionReason4,
        @ExceptionReason5,
        @ExceptionReference1,
        @ExceptionReference2,
        @ExceptionReference3,
        @ExceptionReference4,
        @ExceptionReference5,
        GETDATE(),
        @ida_last_update_source_file
    
    )

    SELECT @@IDENTITY

END

注意 1:虽然我将字符串声明为 nvarchar (500),但它们中的大多数都没有那么长。我认为这不重要。我尝试使用较短的字符串定义,运行 例程仍然需要很长时间。

注2:这只是目前7个例子中的一个。最小的 table 大约有几十个 K 行,多达几百万个。列数在 7 到大约 230 之间变化。

关闭自动提交

 conn = pyodbc.connect(
        r'DRIVER={ODBC Driver 17 for SQL Server};SERVER=' + server_name + '; \
        Database=' + db_name + ';Trusted_Connection=yes;', timeout=5, autocommit=False)
 

并在此处和循环结束时提交。

    if index % 1000 == 0:
            print("Imported %s rows" % (index))

使用自动提交,您必须在每一行之后等待日志文件保存到磁盘。

要进一步优化,如果您使用的是 SQL 2016+,请使用 JSON 将成批的行发送到 SQL 服务器,在服务器端使用 [=12= 进行解析].

因为您不需要存储过程中的 return 值,您应该可以只使用 pandas' to_sql 方法将行直接插入到 table。此代码...

from time import time
import pandas as pd
import sqlalchemy as sa

from_engine = sa.create_engine("mssql+pyodbc://@mssqlLocal64")
to_engine = sa.create_engine(
    "mssql+pyodbc://sa:_whatever_@192.168.0.199/mydb"
    "?driver=ODBC+Driver+17+for+SQL+Server",
    fast_executemany=False,
)

# set up test
to_cnxn = to_engine.raw_connection()
to_cnxn.execute("TRUNCATE TABLE MillionRows")
to_cnxn.commit()
num_rows_to_upload = 10000
df = pd.read_sql_query(
    f"SELECT TOP {num_rows_to_upload} "
    "[TextField], [LongIntegerField], [DoubleField], [varchar_column] "
    "FROM MillionRows ORDER BY ID",
    from_engine,
)

# run test
t0 = time()
df.to_sql("MillionRows", to_engine, index=False, if_exists="append")
s = f"{(time() - t0):0.1f} seconds"
print(f"uploading {num_rows_to_upload:,d} rows took {s}")

… 表示与您现在所做的工作大致相同的内部工作水平,即将每一行作为单独的 .execute 调用上传。结果是

uploading 10,000 rows took 60.2 seconds

但是,简单地将 to_engine 更改为使用 fast_executemany=True 会导致

uploading 10,000 rows took 1.4 seconds