SSIS 自定义数据流组件循环输入管道缓冲区不止一次
SSIS Custom Data Flow Component loop input pipeline buffer more than once
我仍在学习如何在 SSIS 中创建自定义组件。假设我有 500 行的输入行数,我需要从输入管道缓冲区一次 read/batch/process 100 行,然后将它们发送到第 3 方应用程序,一旦我得到结果,我需要更新包含新数据的管道缓冲区列,然后是 read/batch/process 接下来的 100 行,依此类推,直到我处理完所有 500 行。
我的问题是,我可以 loop/read 多次通过输入管道缓冲区,以便我可以使用从第 3 方应用程序返回的数据更新缓冲区吗?
我以为我读到过您可以读入所有数据并将其存储到缓存中,然后例如对数据进行排序,但我不确定如何将该数据从缓存中取回输出。我也不确定应该在哪里完成以及如何访问输入管道缓冲区、PrimeOutput 或 ProcessInput 或我不知道的其他覆盖方法?
我正在尝试创建一个自定义异步数据流组件来解决这个问题。
如有任何帮助或想法,我们将不胜感激and/or 为我指明正确的方向!
谢谢!
我很高兴我没有尝试写意,因为我忘记了很多要点。
这里有几点值得注意:我的两个数据结构 InData
和 OutData
您需要配置它们以跟踪 input/output 缓冲区。正如评论所述,可能有一种聪明的方法来克隆 Buffer 对象的属性,但我不知道如何去做。定义这些以匹配数据流中的数据类型,如果您像我一样懒惰,请使用相同的列名,您可以 copy/paste 通往成功之路。
ApiCall
是一个虚拟方法,它使用我们的缓存值来请求数据清理服务执行它的操作。它需要 return 清理后的数据,以便我们可以将输入和输出结合到一个统一的行中。可能有更好的方法来做到这一点,但希望它足以激发您的思维过程。
我创建了一个 SSIS 级别变量,@[User::ApiBatchSize]
,您可以将其初始化为 500。使用这种方法可以让您在不更改核心代码的情况下优化发送的批处理大小。我在 PreExecute
方法中初始化我们的本地成员变量,因为这是脚本组件的 constructor-ish。
通常,在异步脚本组件中,您使用的是 ProcessInputRow
方法,这就是我最初所做的,但是 运行 如果列表的大小,最终批次会出现问题apiBatchSize 的偶数倍。事实证明,EndOfRowset()
从未在该方法中设置为 True。不用担心,我们只需要使用 ProcessInput
方法即可。在“正常”世界中,处理输入方法会导致处理输入行处理一行,因此我们将跳过中间人并直接使用 ProcessInput 中的缓冲区。我很懒,没有将我的 Row
引用重命名为 Buffer
,因为 auto-generated 代码最初处理了参数。
这里的伪逻辑是
- 当有数据行时
-
- 如果我们达到了批量大小,请发送我们的数据收集进行处理
-
-
- 对于每个处理过的行,将一行添加到输出缓冲区并用干净的数据填充它
-
- 清空我们的收集桶(已经发送到下游)
- 将当前行添加到我们的收集桶
C# 本身
using System;
using System.Data;
using System.Collections.Generic;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using Microsoft.SqlServer.Dts.Runtime.Wrapper;
/// <summary>
/// There might be a clever way to re-use the metadata from the Input/OutputBuffer
/// definition but I don't know how to access it so I redefine it here
/// </summary>
public struct InData
{
public string AddressLine1 { get; set; }
}
/// <summary>
/// There might be a clever way to re-use the metadata from the Input/OutputBuffer
/// definition but I don't know how to access it so I redefine it here
/// </summary>
public struct OutData
{
public string AddressLine1Clean { get; set; }
public string AddressCityClean { get; set; }
public string AddressStateClean { get; set; }
public string AddressPostalCodeClean { get; set; }
}
/// <summary>
/// This is the class to which to add your code. Do not change the name, attributes, or parent
/// of this class.
/// </summary>
[Microsoft.SqlServer.Dts.Pipeline.SSISScriptComponentEntryPointAttribute]
public class ScriptMain : UserComponent
{
List<InData> mData;
int mBatchSize;
/// <summary>
/// This method is called once, before rows begin to be processed in the data flow.
///
/// You can remove this method if you don't need to do anything here.
/// </summary>
public override void PreExecute()
{
base.PreExecute();
this.mData = new List<InData>();
this.mBatchSize = this.Variables.ApiBatchSize;
}
/// <summary>
/// This method is called after all the rows have passed through this component.
///
/// You can delete this method if you don't need to do anything here.
/// </summary>
public override void PostExecute()
{
base.PostExecute();
}
/// <summary>
/// We're going to work with ProcessInput versus PorcessInputRow as it is
/// "closer to the bare metal" and we need that
/// </summary>
/// <param name="Buffer"></param>
public override void Input0_ProcessInput(Input0Buffer Row)
{
//base.Input0_ProcessInput(Buffer);
while (Row.NextRow())
{
if (this.mData.Count >= this.mBatchSize)
{
foreach (var item in ApiCall())
{
Output0Buffer.AddRow();
var inRow = item.Key;
var outRow = item.Value;
// fill columns with original data
Output0Buffer.AddressLine1 = inRow.AddressLine1;
// etc
// fill columns with clean data
Output0Buffer.AddressLine1Clean = outRow.AddressLine1Clean;
Output0Buffer.AddressCityClean = outRow.AddressCityClean;
Output0Buffer.AddressStateClean = outRow.AddressStateClean;
Output0Buffer.AddressPostalCodeClean = outRow.AddressPostalCodeClean;
// etc
}
// TODO Remove this for production, just ensuring batching is working as intended
bool fireAgain = false;
string status = "Batch released. Conditions => mDataCount := " + this.mData.Count;
this.ComponentMetaData.FireInformation(0, "ApiProcessing", status, "", 0, ref fireAgain);
// Reset for next iteration
this.mData.Clear();
}
this.mData.Add(new InData() { AddressLine1 = Row.AddressLine1 });
}
// Handle the final possible partial batch
if (this.mData.Count > 0)
{
foreach (var item in ApiCall())
{
Output0Buffer.AddRow();
var inRow = item.Key;
var outRow = item.Value;
// fill columns with original data
Output0Buffer.AddressLine1 = inRow.AddressLine1;
// etc
// fill columns with clean data
Output0Buffer.AddressLine1Clean = outRow.AddressLine1Clean;
Output0Buffer.AddressCityClean = outRow.AddressCityClean;
Output0Buffer.AddressStateClean = outRow.AddressStateClean;
Output0Buffer.AddressPostalCodeClean = outRow.AddressPostalCodeClean;
// etc
}
// TODO Remove this for production, just ensuring batching is working as intended
bool fireAgain = false;
string status = "Final batch released. Conditions => mDataCount := " + this.mData.Count;
this.ComponentMetaData.FireInformation(0, "ApiProcessing", status, "", 0, ref fireAgain);
// Reset for next iteration
this.mData.Clear();
}
}
///// <summary>
///// This method is called once for every row that passes through the component from Input0.
///// We need to preserve rows in our own memory allocation
///// We're not getting the EndOfRowset call in time to release the final
///// </summary>
///// <param name="Row">The row that is currently passing through the component</param>
//public override void Input0_ProcessInputRow(Input0Buffer Row)
//{
//}
public override void CreateNewOutputRows()
{
// I don't think we need to do anything special here
// but I'm leaving it in in case you have some weird case
}
/// <summary>
/// Simulate data cleaning
/// </summary>
/// <returns></returns>
public Dictionary<InData, OutData> ApiCall()
{
int macGuffin = 0;
Dictionary<InData, OutData> cleanData = new Dictionary<InData, OutData>();
foreach (var item in this.mData)
{
cleanData.Add(item, new OutData() { AddressLine1Clean = "Clean" + item.AddressLine1, AddressCityClean = "Clean", AddressPostalCodeClean = "12345-1234", AddressStateClean = "CL" });
macGuffin = macGuffin % this.mBatchSize;
}
return cleanData;
}
}
脚本组件的屏幕截图
这是我们让脚本组件可以使用 SSIS 级别变量的地方。我有 selected ApiBatchSize
在“输入列”选项卡中,我 select 编辑了所有需要通过的列,并将它们标记为只读使用类型。
在“输入和输出”选项卡中,我做的第一件事是导航到 Output 0
并将 SynchronousInputID
从“脚本 Component.Inputs[输入 0]”之类的内容更改为 None
定义您需要的所有列。我复制我的原始列 (AddressLine1),然后添加我的处理将能够填充的所有新列(AddressLine1Clean,city/state/postal 代码)。在输出 0 下,select 输出列集合并重复按“添加列”并进行配置。除了提供名称外,我在这里将所有数据类型更改为字符串 (DT_STR),因为这是我正在使用的。默认为 32 位整数类型 (DT_I4)
请注意,此屏幕截图中没有原始列,但您需要添加它才能使代码正常工作。
那里可能有更新的书,但是当我 我 在我 运行进入脚本问题。
The Rational Guide to Scripting SQL Server 2005 Integration Services Beta Preview (Rational Guides)
by Donald Farmer, Derek Farmer
Paperback, 192 Pages, Published 2005
ISBN-10: 1-932577-21-1 / 1932577211
ISBN-13: 978-1-932577-21-1 / 9781932577211
我仍在学习如何在 SSIS 中创建自定义组件。假设我有 500 行的输入行数,我需要从输入管道缓冲区一次 read/batch/process 100 行,然后将它们发送到第 3 方应用程序,一旦我得到结果,我需要更新包含新数据的管道缓冲区列,然后是 read/batch/process 接下来的 100 行,依此类推,直到我处理完所有 500 行。
我的问题是,我可以 loop/read 多次通过输入管道缓冲区,以便我可以使用从第 3 方应用程序返回的数据更新缓冲区吗?
我以为我读到过您可以读入所有数据并将其存储到缓存中,然后例如对数据进行排序,但我不确定如何将该数据从缓存中取回输出。我也不确定应该在哪里完成以及如何访问输入管道缓冲区、PrimeOutput 或 ProcessInput 或我不知道的其他覆盖方法?
我正在尝试创建一个自定义异步数据流组件来解决这个问题。
如有任何帮助或想法,我们将不胜感激and/or 为我指明正确的方向!
谢谢!
我很高兴我没有尝试写意,因为我忘记了很多要点。
这里有几点值得注意:我的两个数据结构 InData
和 OutData
您需要配置它们以跟踪 input/output 缓冲区。正如评论所述,可能有一种聪明的方法来克隆 Buffer 对象的属性,但我不知道如何去做。定义这些以匹配数据流中的数据类型,如果您像我一样懒惰,请使用相同的列名,您可以 copy/paste 通往成功之路。
ApiCall
是一个虚拟方法,它使用我们的缓存值来请求数据清理服务执行它的操作。它需要 return 清理后的数据,以便我们可以将输入和输出结合到一个统一的行中。可能有更好的方法来做到这一点,但希望它足以激发您的思维过程。
我创建了一个 SSIS 级别变量,@[User::ApiBatchSize]
,您可以将其初始化为 500。使用这种方法可以让您在不更改核心代码的情况下优化发送的批处理大小。我在 PreExecute
方法中初始化我们的本地成员变量,因为这是脚本组件的 constructor-ish。
通常,在异步脚本组件中,您使用的是 ProcessInputRow
方法,这就是我最初所做的,但是 运行 如果列表的大小,最终批次会出现问题apiBatchSize 的偶数倍。事实证明,EndOfRowset()
从未在该方法中设置为 True。不用担心,我们只需要使用 ProcessInput
方法即可。在“正常”世界中,处理输入方法会导致处理输入行处理一行,因此我们将跳过中间人并直接使用 ProcessInput 中的缓冲区。我很懒,没有将我的 Row
引用重命名为 Buffer
,因为 auto-generated 代码最初处理了参数。
这里的伪逻辑是
- 当有数据行时
-
- 如果我们达到了批量大小,请发送我们的数据收集进行处理
-
-
- 对于每个处理过的行,将一行添加到输出缓冲区并用干净的数据填充它
-
-
- 清空我们的收集桶(已经发送到下游)
- 将当前行添加到我们的收集桶
C# 本身
using System;
using System.Data;
using System.Collections.Generic;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using Microsoft.SqlServer.Dts.Runtime.Wrapper;
/// <summary>
/// There might be a clever way to re-use the metadata from the Input/OutputBuffer
/// definition but I don't know how to access it so I redefine it here
/// </summary>
public struct InData
{
public string AddressLine1 { get; set; }
}
/// <summary>
/// There might be a clever way to re-use the metadata from the Input/OutputBuffer
/// definition but I don't know how to access it so I redefine it here
/// </summary>
public struct OutData
{
public string AddressLine1Clean { get; set; }
public string AddressCityClean { get; set; }
public string AddressStateClean { get; set; }
public string AddressPostalCodeClean { get; set; }
}
/// <summary>
/// This is the class to which to add your code. Do not change the name, attributes, or parent
/// of this class.
/// </summary>
[Microsoft.SqlServer.Dts.Pipeline.SSISScriptComponentEntryPointAttribute]
public class ScriptMain : UserComponent
{
List<InData> mData;
int mBatchSize;
/// <summary>
/// This method is called once, before rows begin to be processed in the data flow.
///
/// You can remove this method if you don't need to do anything here.
/// </summary>
public override void PreExecute()
{
base.PreExecute();
this.mData = new List<InData>();
this.mBatchSize = this.Variables.ApiBatchSize;
}
/// <summary>
/// This method is called after all the rows have passed through this component.
///
/// You can delete this method if you don't need to do anything here.
/// </summary>
public override void PostExecute()
{
base.PostExecute();
}
/// <summary>
/// We're going to work with ProcessInput versus PorcessInputRow as it is
/// "closer to the bare metal" and we need that
/// </summary>
/// <param name="Buffer"></param>
public override void Input0_ProcessInput(Input0Buffer Row)
{
//base.Input0_ProcessInput(Buffer);
while (Row.NextRow())
{
if (this.mData.Count >= this.mBatchSize)
{
foreach (var item in ApiCall())
{
Output0Buffer.AddRow();
var inRow = item.Key;
var outRow = item.Value;
// fill columns with original data
Output0Buffer.AddressLine1 = inRow.AddressLine1;
// etc
// fill columns with clean data
Output0Buffer.AddressLine1Clean = outRow.AddressLine1Clean;
Output0Buffer.AddressCityClean = outRow.AddressCityClean;
Output0Buffer.AddressStateClean = outRow.AddressStateClean;
Output0Buffer.AddressPostalCodeClean = outRow.AddressPostalCodeClean;
// etc
}
// TODO Remove this for production, just ensuring batching is working as intended
bool fireAgain = false;
string status = "Batch released. Conditions => mDataCount := " + this.mData.Count;
this.ComponentMetaData.FireInformation(0, "ApiProcessing", status, "", 0, ref fireAgain);
// Reset for next iteration
this.mData.Clear();
}
this.mData.Add(new InData() { AddressLine1 = Row.AddressLine1 });
}
// Handle the final possible partial batch
if (this.mData.Count > 0)
{
foreach (var item in ApiCall())
{
Output0Buffer.AddRow();
var inRow = item.Key;
var outRow = item.Value;
// fill columns with original data
Output0Buffer.AddressLine1 = inRow.AddressLine1;
// etc
// fill columns with clean data
Output0Buffer.AddressLine1Clean = outRow.AddressLine1Clean;
Output0Buffer.AddressCityClean = outRow.AddressCityClean;
Output0Buffer.AddressStateClean = outRow.AddressStateClean;
Output0Buffer.AddressPostalCodeClean = outRow.AddressPostalCodeClean;
// etc
}
// TODO Remove this for production, just ensuring batching is working as intended
bool fireAgain = false;
string status = "Final batch released. Conditions => mDataCount := " + this.mData.Count;
this.ComponentMetaData.FireInformation(0, "ApiProcessing", status, "", 0, ref fireAgain);
// Reset for next iteration
this.mData.Clear();
}
}
///// <summary>
///// This method is called once for every row that passes through the component from Input0.
///// We need to preserve rows in our own memory allocation
///// We're not getting the EndOfRowset call in time to release the final
///// </summary>
///// <param name="Row">The row that is currently passing through the component</param>
//public override void Input0_ProcessInputRow(Input0Buffer Row)
//{
//}
public override void CreateNewOutputRows()
{
// I don't think we need to do anything special here
// but I'm leaving it in in case you have some weird case
}
/// <summary>
/// Simulate data cleaning
/// </summary>
/// <returns></returns>
public Dictionary<InData, OutData> ApiCall()
{
int macGuffin = 0;
Dictionary<InData, OutData> cleanData = new Dictionary<InData, OutData>();
foreach (var item in this.mData)
{
cleanData.Add(item, new OutData() { AddressLine1Clean = "Clean" + item.AddressLine1, AddressCityClean = "Clean", AddressPostalCodeClean = "12345-1234", AddressStateClean = "CL" });
macGuffin = macGuffin % this.mBatchSize;
}
return cleanData;
}
}
脚本组件的屏幕截图
这是我们让脚本组件可以使用 SSIS 级别变量的地方。我有 selected ApiBatchSize
在“输入列”选项卡中,我 select 编辑了所有需要通过的列,并将它们标记为只读使用类型。
在“输入和输出”选项卡中,我做的第一件事是导航到 Output 0
并将 SynchronousInputID
从“脚本 Component.Inputs[输入 0]”之类的内容更改为 None
定义您需要的所有列。我复制我的原始列 (AddressLine1),然后添加我的处理将能够填充的所有新列(AddressLine1Clean,city/state/postal 代码)。在输出 0 下,select 输出列集合并重复按“添加列”并进行配置。除了提供名称外,我在这里将所有数据类型更改为字符串 (DT_STR),因为这是我正在使用的。默认为 32 位整数类型 (DT_I4)
请注意,此屏幕截图中没有原始列,但您需要添加它才能使代码正常工作。
那里可能有更新的书,但是当我 我 在我 运行进入脚本问题。
The Rational Guide to Scripting SQL Server 2005 Integration Services Beta Preview (Rational Guides) by Donald Farmer, Derek Farmer Paperback, 192 Pages, Published 2005 ISBN-10: 1-932577-21-1 / 1932577211 ISBN-13: 978-1-932577-21-1 / 9781932577211