SQL 服务器存储过程 - 在执行插入时插入良好记录并捕获错误记录 select

SQL Server stored procedure - Insert good records and capture error records while doing Insert select

我正在尝试通过从暂存 table 中选择来插入 table,由于某些 'not null' 类型的列和数据类型,某些记录会抛出错误。

如何插入非错误记录并在另一个中捕获错误记录table?

问题是我不想停止插入,因为它有数百万条良好记录。另外,我无法预测哪一列会抛出错误。

我试着玩 TRY CATCH,但没用。我可以得到错误,但没有插入好的记录。

示例代码:

CREATE PROCEDURE Procedure1  
    @FileName varchar(240), 
    @Status varchar(50), 
    @Count int, 
    @Date datetime

    DECLARE @FileId AS BIGINT, @linkId AS BIGINT
BEGIN
    BEGIN TRAN
        INSERT INTO finalTable (Date, billno, vendor, vendorId, repName, location)
            SELECT 
                Date, billno, vendor, vendorId, 
                CONCAT(repFirstName + repLastName),
                CAST(NULLIF(Location, '') AS int) Location

        INSERT INTO summaryTable (Date, billno, vendor)
            SELECT Date, billno, vendor 
            FROM finalTable 
            WHERE id > SELECT MAX(id) FROM finalTable

        COMMIT TRANSACTION
END

预期结果:插入了 157828 - 157825 条记录,其中 3 条记录被推入错误 table 与主 table 的结构相似所有列的类型为 nvarchar 并接受任何类型的记录。

我会通过导入 table 来解决它,就像你建议的那样,将所有内容都设置为 NVARCHAR(MAX) NULL,然后在 table 上创建一个视图,以确定是否尚未导入的记录是否有效,有一个 IsValid 列和其他列已经转换为目标格式,并且在导入 table 上的 INSERT 上有一个 AFTER 触发器,它将记录插入到 final- 和错误 tables 分别用

之类的东西
INSERT INTO finalTable (Date, billno, vendor, vendorId, repName, location)
  SELECT mv.Date, mv.billno, mv.vendor, mv.vendorId, mv.repName, mv.location FROM
    inserted i INNER JOIN MyImortView mv ON i.ID = mv.ID WHERE mv.IsValid = 1;

INSERT INTO errorTable (Date, billno, vendor, vendorId, repName, location)
  SELECT i.Date, i.billno, i.vendor, i.vendorId, i.repName, i.location FROM
    inserted i INNER JOIN MyImortView mv ON i.ID = mv.ID WHERE mv.IsValid = 0;

顺便说一下,您不应该使用这样的代码结构:

WHERE id > SELECT MAX(id) FROM finalTable

那是一个随时可能爆炸的 time-bomb。

此处 step-by-step 根据您在示例中提供的一些列名称对我的意思进行了解释。由于我没有你的 table 定义,我假设了一些东西,你必须根据你的需要调整它。

创建StagingTable。我选择使用 GUID,以便能够对所有 3 个 table (StagingTable_Id = FinalTable_Id = ErrorTable_Id) 使用相同的 ID。使用 GUID 时,请确保不要将其作为 table(聚集键)中记录的物理排序顺序。当记录存储在 StagingTable 而不是所有 3 table 时,我选择 point-in-time。

CREATE TABLE dbo.StagingTable(
    StagingTable_Id UNIQUEIDENTIFIER ROWGUIDCOL NOT NULL,
    StagingTable_ImportDateUtc DATETIME2(7) NOT NULL,
    StagingTable_DateTime NVARCHAR(MAX) NULL,
    StagingTable_BillNo NVARCHAR(MAX) NULL,
    StagingTable_VendorId NVARCHAR(MAX) NULL,
    StagingTable_RepFirstName NVARCHAR(MAX) NULL,
    StagingTable_RepLastName NVARCHAR(MAX) NULL,
    StagingTable_LocationId NVARCHAR(MAX) NULL,
    CONSTRAINT PK_StagingTable PRIMARY KEY NONCLUSTERED (StagingTable_Id ASC) ON [PRIMARY]
) ON [PRIMARY];
ALTER TABLE dbo.StagingTable ADD CONSTRAINT DF_StagingTable_StagingTable_ID DEFAULT (NEWID()) FOR StagingTable_Id;
ALTER TABLE dbo.StagingTable ADD CONSTRAINT DF_StagingTable_StagingTable_ImportDateUtc DEFAULT (SYSUTCDATETIME()) FOR StagingTable_ImportDateUtc;
ALTER TABLE dbo.StagingTable SET (LOCK_ESCALATION = DISABLE);
CREATE CLUSTERED INDEX IX_StagingTable01 ON dbo.StagingTable (StagingTable_ImportDateUtc ASC) ON [PRIMARY];

错误 table 与 StagingTable 的结构相同:

CREATE TABLE dbo.ErrorTable(
    ErrorTable_Id UNIQUEIDENTIFIER ROWGUIDCOL NOT NULL,
    ErrorTable_ImportDateUtc DATETIME2(7) NOT NULL,
    ErrorTable_DateTime NVARCHAR(MAX) NULL,
    ErrorTable_BillNo NVARCHAR(MAX) NULL,
    ErrorTable_VendorId NVARCHAR(MAX) NULL,
    ErrorTable_RepFirstName NVARCHAR(MAX) NULL,
    ErrorTable_RepLastName NVARCHAR(MAX) NULL,
    ErrorTable_LocationId NVARCHAR(MAX) NULL,
    CONSTRAINT PK_ErrorTable PRIMARY KEY NONCLUSTERED (ErrorTable_Id ASC) ON [PRIMARY]
) ON [PRIMARY];
ALTER TABLE dbo.ErrorTable ADD CONSTRAINT DF_ErrorTable_ErrorTable_ID DEFAULT (NEWID()) FOR ErrorTable_Id;
ALTER TABLE dbo.ErrorTable SET (LOCK_ESCALATION = DISABLE);
CREATE CLUSTERED INDEX IX_ErrorTable01 ON dbo.ErrorTable (ErrorTable_ImportDateUtc ASC) ON [PRIMARY];

但是最后的table已经有了转换数据类型的结构:

CREATE TABLE dbo.FinalTable(
    FinalTable_Id UNIQUEIDENTIFIER ROWGUIDCOL NOT NULL,
    FinalTable_ImportDateUtc DATETIME2(7) NOT NULL,
    FinalTable_DateTime DATETIME2(7) NOT NULL,
    FinalTable_BillNo INT NOT NULL,
    FinalTable_VendorId INT NOT NULL,
    FinalTable_RepName NVARCHAR(60) NOT NULL,
    FinalTable_LocationId INT NOT NULL,
    CONSTRAINT PK_FinalTable PRIMARY KEY NONCLUSTERED (FinalTable_Id ASC) ON [PRIMARY]
) ON [PRIMARY];
ALTER TABLE dbo.FinalTable ADD CONSTRAINT DF_FinalTable_FinalTable_Id DEFAULT (NEWID()) FOR FinalTable_Id;
ALTER TABLE dbo.FinalTable SET (LOCK_ESCALATION = DISABLE);
CREATE CLUSTERED INDEX IX_FinalTable01 ON dbo.FinalTable (FinalTable_ImportDateUtc ASC) ON [PRIMARY];

接下来我们创建转化视图,它有两个用途:

  • 它过滤视图只包含尚未处理的记录(其id既不在FinalTable也不在ErrorTable)。

  • 它尝试将 StagingTable 记录的每个字段转换为它们的目标格式。对于转换失败的字段,它分配 NULL.

CREATE VIEW dbo.V_StagingConversion
AS
SELECT 
    st.StagingTable_Id, 
    st.StagingTable_ImportDateUtc, 
    st.StagingTable_DateTime, 
    TRY_CONVERT(datetime2, st.StagingTable_DateTime, 121) AS FinalTable_DateTime,
    st.StagingTable_BillNo,
    TRY_CAST(st.StagingTable_BillNo AS INT) AS FinalTable_BillNo,
    st.StagingTable_VendorId,
    TRY_CAST(st.StagingTable_VendorId AS INT) AS FinalTable_VendorId,
    st.StagingTable_RepFirstName, 
    st.StagingTable_RepLastName,
    RTRIM(LTRIM(ISNULL(st.StagingTable_RepFirstName, '') + N' ' + ISNULL(st.StagingTable_RepLastName, ''))) AS FinalTable_RepName, 
    st.StagingTable_LocationId,
    TRY_CAST(st.StagingTable_LocationId AS INT) AS FinalTable_LocationId
FROM StagingTable st
LEFT OUTER JOIN FinalTable ft ON st.StagingTable_Id = ft.FinalTable_Id
LEFT OUTER JOIN ErrorTable et ON st.StagingTable_Id = et.ErrorTable_Id
WHERE (ft.FinalTable_Id IS NULL) AND (et.ErrorTable_Id IS NULL);

然后我们创建一个基于前一个视图的视图,该视图决定记录是否有效:

CREATE VIEW dbo.V_StagingValidation
AS
SELECT 
    *, 
    CAST(CASE WHEN FinalTable_DateTime IS NULL THEN 0 ELSE 
            CASE WHEN FinalTable_BillNo IS NULL THEN 0 ELSE 
                CASE WHEN FinalTable_VendorId IS NULL THEN 0 ELSE 
                    CASE WHEN (LEN(FinalTable_RepName) = 0) OR (LEN(FinalTable_RepName) > 60) THEN 0 ELSE 
                        CASE WHEN FinalTable_LocationId IS NULL THEN 0 ELSE 1 END 
                    END 
                END 
            END 
         END AS BIT) AS StagingTable_IsValid 
FROM dbo.V_StagingConversion;

接下来我们创建代表未来状态的视图,并用于各自的 INSERT 查询:

CREATE VIEW dbo.V_StagingPublicationFinal
AS
SELECT StagingTable_Id AS FinalTable_Id, StagingTable_ImportDateUtc AS FinalTable_ImportDateUtc, FinalTable_DateTime, FinalTable_BillNo, FinalTable_VendorId, FinalTable_RepName, FinalTable_LocationId 
FROM dbo.V_StagingValidation WHERE StagingTable_IsValid = 1;

CREATE VIEW dbo.V_StagingPublicationError
AS
SELECT StagingTable_Id AS ErrorTable_Id, StagingTable_ImportDateUtc AS ErrorTable_ImportDateUtc, StagingTable_DateTime AS ErrorTable_DateTime, StagingTable_BillNo AS ErrorTable_BillNo, StagingTable_VendorId AS ErrorTable_VendorId, StagingTable_RepFirstName AS ErrorTable_RepFirstName, StagingTable_RepLastName AS ErrorTable_RepLastName, StagingTable_LocationId AS ErrorTable_LocationId 
FROM dbo.V_StagingValidation WHERE StagingTable_IsValid = 0;

和一个视图,显示我们需要 DELETE 查询的所有已处理记录(如果您不喜欢在插入和处理后立即删除 StagingTable 记录,请取消注释根据存储过程中的语句)。

CREATE VIEW dbo.V_StagingPublicationDone
AS
SELECT st.*, CAST(CASE WHEN ft.FinalTable_Id IS NULL THEN 0 ELSE 1 END AS BIT) AS StagingTable_IsValid FROM StagingTable st
LEFT OUTER JOIN FinalTable ft ON st.StagingTable_Id = ft.FinalTable_Id
LEFT OUTER JOIN ErrorTable et ON st.StagingTable_Id = et.ErrorTable_Id
WHERE (ft.FinalTable_Id IS NOT NULL) OR (et.ErrorTable_Id IS NOT NULL);

然后我决定将导入逻辑放入存储过程中,这样可以灵活地手动执行它,例如如果触发器被禁用,因为必须尝试一些东西或调整一些东西。

CREATE PROCEDURE dbo.ImportStagingTable
AS
BEGIN
    SET NOCOUNT ON;
    INSERT INTO dbo.FinalTable 
      SELECT FinalTable_Id, FinalTable_ImportDateUtc, FinalTable_DateTime, FinalTable_BillNo, FinalTable_VendorId, FinalTable_RepName, FinalTable_LocationId FROM V_StagingPublicationFinal p ORDER BY p.FinalTable_ImportDateUtc;
    INSERT INTO dbo.ErrorTable 
      SELECT ErrorTable_Id, ErrorTable_ImportDateUtc, ErrorTable_DateTime, ErrorTable_BillNo, ErrorTable_VendorId, ErrorTable_RepFirstName, ErrorTable_RepLastName, ErrorTable_LocationId FROM V_StagingPublicationError p ORDER BY p.ErrorTable_ImportDateUtc;
    DELETE st FROM dbo.V_StagingPublicationDone spd INNER JOIN dbo.StagingTable st ON spd.StagingTable_Id = st.StagingTable_Id;
END

每个人都应该能够执行这个存储过程:

GRANT EXECUTE ON dbo.ImportStagingTable TO PUBLIC;

最后我们创建了触发器。我添加了一个 COMMIT/BEGIN TRANSACTION 以确保已经写入的 StagingTable 记录不会在后触发器失败时回滚:

CREATE TRIGGER dbo.StagingTableAfterInsert ON dbo.StagingTable AFTER INSERT
AS 
BEGIN
    SET NOCOUNT ON;
    COMMIT TRANSACTION;
    BEGIN TRANSACTION;
    EXECUTE dbo.ImportStagingTable;
END

并启用它(但它可能已经启用):

ALTER TABLE dbo.StagingTable ENABLE TRIGGER StagingTableAfterInsert

现在您可以将记录插入 StagingTable,它们会自动分布到 FinalTableErrorTable