带有事务和 table 变量的批处理存储过程

Batch stored procedure with transactions and table variables

这个周末我有一个数据迁移,然后发现我无法获得 DBA 的帮助,所以我缺乏 SQL 服务器知识。所有其他 DBA 都是 Oracle,不会接触 SQL 服务器。我在安全方面受限,无法创建作业或 SSIS 包来处理此问题。

我有一个脚本,我在其中 运行批量处理。在此批次中,我正在 运行 使用逻辑存储过程。存储过程有 table 个变量,我刚刚了解到您无法 运行 这些作为事务处理。有人可以观察一下这个整体方法,看看我是否遗漏了什么,或者我可以 运行 更有效吗? BigTable 有大约 2500 万条记录,所有索引、FK、约束都被删除了。我打算为这批临时添加一些索引。大约 运行 5 天。

Create Procedure ConvertStuff AS
BEGIN

declare @id uniqueIdentifier
declare @importdate DateTime
declare @Data varchar(max)

declare @tableX table 
    ---
declare @tableY table 
    ---
declare @tableZ table 
    ---


SET NOCOUNT ON
    select top 1 @ID = bt.ID, @Data = bt.RawData, @importDate = bt.ImportDate from Processed p with (NOLOCK)
        Inner join BigTable bt with (NOLOCK) on p.ID = bt.ID where p.isProcessed = 0

    while (not @ID is null)
    Begin
        BEGIN TRY
            --Do stuff here
        END TRY
        BEGIN CATCH

            DECLARE @ErrorMessage NVARCHAR(4000);
            DECLARE @ErrorSeverity INT;
            DECLARE @ErrorState INT;

            SELECT @ErrorMessage = ERROR_MESSAGE(),
                   @ErrorSeverity = ERROR_SEVERITY(),
                   @ErrorState = ERROR_STATE();

            RAISERROR (@ErrorMessage,
                       @ErrorSeverity,
                       @ErrorState
                       );


            update bigTable set isProcessed = -1 where ID = @ID
            break
        END CATCH
    select top 1 @ID = bt.ID, @Data = bt.RawData, @importDate = bt.ImportDate from Processed p with (NOLOCK)
        Inner join BigTable bt with (NOLOCK) on p.ID = bt.ID where p.isProcessed = 0
    END
    --Do I need to drop the @ tables here? Should I convert these to # ?
END

--------------------------------------------------------------------------------------------------------------------------------------------------------------------
-- Running this....

-- This will be dropped once the migration is done
CREATE TABLE [Processed]
(
    [ID] UNIQUEIDENTIFIER NOT NULL PRIMARY KEY,
    [isProcessed] [bit] Default(0) NOT NULL,
)

CREATE NONCLUSTERED INDEX [idx_isProcessed] ON [Processed]
(
    [isProcessed] ASC
)

GO

SET ROWCOUNT 25000

declare @msg varchar(50)
DECLARE @ErrorMessage NVARCHAR(4000)
DECLARE @ErrorSeverity INT
DECLARE @ErrorState INT

While (1=1) 
BEGIN
    BEGIN TRY
    BEGIN TRANSACTION
    Insert into [Processed] (ID, isProcessed) 
        Select ID, 0 from BigTable where recordUpdated = 0

    exec ConvertStuff

    IF @@ROWCOUNT = 0
    BEGIN
        Print @@ROWCOUNT
        COMMIT TRANSACTION
        BREAK
    END

    COMMIT TRANSACTION

    END TRY
    BEGIN CATCH
        ROLLBACK TRANSACTION


        SELECT @ErrorMessage = ERROR_MESSAGE(),
               @ErrorSeverity = ERROR_SEVERITY(),
               @ErrorState = ERROR_STATE();
        RAISERROR (@ErrorMessage, -- Message text.
                   @ErrorSeverity, -- Severity.
                   @ErrorState -- State.
                   );
        BREAK
    END CATCH
END

drop table Processed

这是有效地批量复制 table 而不会杀死您的系统的正确方法。此策略仅在 table 在复制过程中为只读时才有效。如果您的 table 可以更改,您必须将其与另一种跟踪和更新已更改记录的策略结合使用。

批量复制方法将防止您在 table 上持有 4 天的锁定,并允许您继续定期备份您的事务日志。如果您需要停止它或失败,它还会阻止 4 天的回滚。

运行 这在你发布之前,然后在发布期间再次达到 table 的上限。一如既往,练习 运行 并在尝试实时系统之前停止脚本。

DECLARE @CurrentId UNIQUEIDENTIFIER,
        @BatchSize INT;
SET @BatchSize = 50000;

SELECT TOP 1
    @CurrentId = ID
FROM NewTable
ORDER BY ID DESC;

SELECT
    @LastId = ID
FROM OldTable
ORDER BY ID DESC;

IF (@CurrentId IS NULL)
    SET @CurrentId = '00000000-0000-0000-0000-000000000000';

PRINT 'Copying from ' + CONVERT(VARCHAR(40), @CurrentId) + ' to ' + CONVERT(VARCHAR(40), @LastId);

CREATE TABLE #Batch
(
    ID UNIQUEIDENTIFIER
);

WHILE (@CurrentId < @LastId)
BEGIN
    PRINT CONVERT(VARCHAR(40), @CurrentId);

    TRUNCATE TABLE #Batch;

    -- Get your new batch
    INSERT INTO #Batch
    SELECT TOP (@BatchSize)
        *
    FROM OldTable
    WHERE ID > @CurrentId
    ORDER BY ID;

    -- I'd recommend being specific with columns, you might also need to turn on 'SET IDENTITY_INSERT <Table> ON'
    INSERT INTO NewTable
    SELECT *
    FROM OldTable
    INNER JOIN #Batch ON #Batch.ID = OldTable.ID
    LEFT JOIN NewTable ON NewTable.ID = OldTable.ID
    WHERE NewTable.ID IS NULL;

    IF (@@ERROR <> 0)
        BREAK

    SELECT TOP 1
        @CurrentId = ID
    FROM #Batch
    ORDER BY ID DESC;
END