Python Pyodbc 无法 return 首先通过 executemany 插入 id

Python Pyodbc cannot return first inserted id via executemany

设置

MWE:我在 SQL 服务器中有一个 table

CREATE TABLE dbo.MyTable(
    order_id INT IDENTITY(1,1),
    column2 DECIMAL,
    column3 INT
    PRIMARY KEY(order_id)
)

我正在使用 pyodbc 将一些数据以 pandas.DataFrame 的形式插入到 table 中。我使用的数据如下:

   column2  column3
0     1.23        5
1     4.95        9
2     6.79       10

我在其中使用

创建了这个示例数据框
 data = pd.DataFrame({'column2':[1.23, 4.95, 6.79], 'column3':[5,9,10]})

我使用下面的语句插入数据

stmt = "INSERT INTO dbo.MyTable(column2, column3) OUTPUT Inserted.order_id VALUES (?, ?)"

问题

这是我用来插入所有内容并return输入值的代码:

# Set up connection and create cursor
conn_string = "DRIVER={MyDriver};SERVER=MyServer;DATABASE=MyDb;UID=MyUID;PWD=MyPWD"
cnxn = pyodbc.connect(conn_string)
cnxn.autocommit = False
cursor = cnxn.cursor()
cursor.fast_executemany = True
# Upload data
cursor.executemany(stmt, data.values.tolist())
# Process the result
try:
    first_result = cursor.fetchall()
except pyodbc.ProgrammingError:
    first_result = None
result_sets = []
while cursor.nextset():
    result_sets.append(cursor.fetchall())
all_inserted_ids = np.array(result_sets).flatten()

但是,我没有得到所有我应该得到的id!比如假设table里面没有数据,我就不会得到

all_inserted_ids = np.array([1, 2, 3])

但我只会得到

all_inserted_ids = np.array([2, 3])

这意味着我在某处丢失了第一个 ID!

请注意 first_result 永远不会起作用。它总是抛出以下内容:

pyodbc.ProgrammingError: No results.  Previous SQL was not a query.

我也试过使用 cursor.fetchone()cursor.fetchone()[0]cursor.fetchval(),但他们给我同样的错误。

我试过但没有用的方法

1) 添加"SET NOCOUNT ON"

我尝试使用与问题中相同的代码,但

stmt = 
"""
SET NOCOUNT ON; 
INSERT INTO dbo.MyTable(column2, column3) 
OUTPUT Inserted.order_id 
VALUES (?, ?)
"""

输出是 [1, 2],所以我遗漏了 3

2) 添加 "SET NOCOUNT ON" 并将输出插入到 table 变量

我使用了以下语句:

stmt = 
"""
SET NOCOUNT ON; 
DECLARE @NEWID TABLE(ID INT); 
INSERT INTO dbo.MyTable(column2, column3) 
OUTPUT Inserted.order_id INTO @NEWID(ID) 
VALUES (?, ?) 
SELECT ID FROM @NEWID
"""

同样这没有用,因为我只获得了“[2, 3]”但没有获得“1”。

3) 选择@@IDENTITY

我使用了以下语句:

stmt = 
"""
INSERT INTO dbo.MyTable(column2, column3) 
OUTPUT Inserted.order_id 
VALUES (?, ?)
SELECT @@IDENTITY
"""

但是没有用,因为我得到了array([Decimal('1'), 2, Decimal('2'), 3, Decimal('3')]

4) 选择@@IDENTITY 并设置 NOCOUNT ON

我用了

stmt = 
"""
SET NOCOUNT ON
INSERT INTO dbo.MyTable(column2, column3) 
OUTPUT Inserted.order_id
VALUES (?, ?);
SELECT @@IDENTITY
"""

但我又得到了array([Decimal('1'), 2, Decimal('2'), 3, Decimal('3')], dtype=object)

5) 在不使用输出的情况下选择@@IDENTITY

我用过:

stmt = 
"""
INSERT INTO dbo.MyTable(column2, column3) 
VALUES (?, ?);
SELECT @@IDENTITY
"""

但是我得到了[Decimal('2') Decimal('3')]

6) 在不使用 OUTPUT 的情况下选择 @@IDENTITY,但使用 SET NOCOUNT ON

我用过:

stmt = 
"""
SET NOCOUNT ON
INSERT INTO dbo.MyTable(column2, column3) 
VALUES (?, ?);
SELECT @@IDENTITY
"""

但我又得到了:[Decimal('2') Decimal('3')]

解决此问题的可能方法,这确实很糟糕,但可以完成工作

一种可能的方法是创建一个新的 table,我们将在其中存储 ID 并在完成后将其截断。这太可怕了,但我找不到任何其他解决方案..

创建 table:

CREATE TABLE NEWID(
    ID INT
    PRIMARY KEY (ID)
)

接下来是完整代码:

import pyodbc
import pandas as pd
import numpy as np
# Connect
conn_string = """
DRIVER={MYDRIVER};
SERVER=MYSERVER;
DATABASE=DB;
UID=USER;
PWD=PWD
"""
cnxn = pyodbc.connect(conn_string)
cnxn.autocommit = False
cursor = cnxn.cursor()
cursor.fast_executemany = True
# Data, Statement, Execution
data = pd.DataFrame({'column2': [1.23, 4.95, 6.79], 'column3': [5, 9, 10]})
stmt = """
INSERT INTO dbo.MyTable(column2, column3) 
OUTPUT Inserted.order_id INTO NEWID(ID)
VALUES (?, ?);
"""
cursor.executemany(stmt, data.values.tolist())
cursor.execute("SELECT ID FROM NEWID;")
# Get stuff
try:
    first_result = cursor.fetchall()
except pyodbc.ProgrammingError:
    first_result = None
result_sets = []
while cursor.nextset():
    result_sets.append(cursor.fetchall())
all_inserted_ids = np.array(result_sets).flatten()
print('First result: ', first_result)
print('All IDs: ', all_inserted_ids)
cursor.commit()
# Remember to truncate the table for next use
cursor.execute("TRUNCATE TABLE dbo.NEWID;", [])
cursor.commit()

这将 return

First result:  [(1, ), (2, ), (3, )]
All IDs:  []

所以我们只保留第一个结果。

我已经实现了一个与您的方法 1) 相似的方法,使用带有 pyodbc 方言的 sqlAlchemy。它可以很容易地直接适应 pyodbc 库。诀窍是在插入查询之前有一个 SELECT NULL; 。这样,插入查询的第一个输出将在 returned 集中。使用此方法,如果您插入 n 行,您将需要使用游标的 nextset() 获取 2n-1 集。 这是一个补丁,因为 MSSQL 或 pyodbc 丢弃了第一组。我想知道是否有一个选项是 MSSQL 服务器或 pyodbc,您可以在其中指定 return 第一组。

from sqlalchemy.orm import Session
from sqlalchemy.sql.expression import TableClause

def bulk_insert_return_defaults_pyodbc(
    session: Session, statement: TableClause, parameters: List[dict], mapping: dict
):
    """

    Parameters
    ----------
    session:
        SqlAlchemy Session object
    statement:
        SqlAlchemy table clause object (ie. Insert)
    parameters:
        List of parameters
        ex: [{"co1": "value1", "col2": "value2"}, {"co1": "value3", "col2": "value4"}]
    mapping
        Mapping between SqlAlchemy declarative base attribute and name of column in 
        database

    Returns
    -------

    """
    if len(parameters) > 0:
        connexion = session.connection()
        context = session.bind.dialect.execution_ctx_cls._init_statement(
            session.bind.dialect,
            connexion,
            connexion._Connection__connection.connection,
            statement,
            parameters,
        )
        statement = context.statement.compile(
            session.bind, column_keys=list(context.parameters[0].keys())
        )
        session.bind.dialect.do_executemany(
            context.cursor,
            "SELECT NULL; " + str(statement),
            [
                tuple(p[p_i] for p_i in statement.params.keys())
                for p in context.parameters
            ],
            context,
        )
        results = []

        while context.cursor.nextset():
            try:
                result = context.cursor.fetchone()
                if result[0] is not None:
                    results.append(result)
            except Exception:
                continue

        return [
            {mapping[r.cursor_description[i][0]]: c for i, c in enumerate(r)}
            for r in results
        ]
    else:
        return []

multi_params = bulk_insert_return_defaults_pyodbc(
    session,
    table_cls.__table__.insert(returning=[table_cls.id]),
    multi_params,
    {
        getattr(table_cls, c).expression.key: c
        for c in list(vars(table_cls))
        if isinstance(getattr(table_cls, c), InstrumentedAttribute)
    },
)