当供应商可以 added/removed 列时将 CSV 加载到数据库
Load CSV to a database when columns can be added/removed by the vendor
我有一些 SSIS 程序包,它们使用来自供应商的 CSV 文件并将它们放入我们的本地数据库中。我遇到的问题是,有时供应商会添加或删除列,而我们没有时间在下一个 运行 之前更新我们的包,这会导致 SSIS 包异常终止。我想以某种方式防止这种情况发生。
我试过逐行读取 CSV 文件,删除新列,然后使用插入语句将更改后的行放入 table,但这比我们当前的时间要长得多过程(CSV 文件可以有数千或数十万条记录)。
我已经开始研究使用 ADO 连接,但我的本地计算机既没有 ACE 也没有 JET 提供程序,而且我认为部署包的服务器也缺少这些提供程序(我怀疑我能否将它们安装在部署服务器)。
我不知如何做才能加载 tables 并能够忽略新添加或删除的列(尽管如果 CSV 文件缺少列 table 有,这没什么大不了的)快速可靠。有什么想法吗?
查看 BiML,它会在 运行 时根据元数据动态构建和执行您的 SSIS 包。
基于此评论:
I've tried reading in the CSV files line by line, stripping out new
columns, and then using an insert statement to put the altered line
into the table, but that takes far longer than our current process
(the CSV files can have thousands or hundreds of thousands of
records).
还有这个:
I used a csvreader to read the file. The insert was via a sqlcommand
object.
乍一看,瓶颈不在平面文件源,而在目标。 OLEDB 命令以逐行方式执行,每个输入行一个语句。通过将其更改为 OLEDB 目标,它将把过程转换为批量插入操作。要对此进行测试,只需使用平面文件源并将其连接到派生列。 运行 并检查速度。如果速度更快,请更改为 oledb 目标并重试。它还有助于插入堆(无聚集或非聚集索引)并使用 tablock。
但是,这并不能解决您整个文件多样化的问题。如果您在设计时最初配置它的方式少了一列或多列,我不知道平面文件源会做什么。它可能会失败,或者它可能会以某种参差不齐的形式导入行,其中下一行的一部分被分配给当前行的最后一列。那可能会是一团糟。
但是,我知道当平面文件源获得额外的列时会发生什么。我为此添加了这个连接项,但遗憾地被拒绝了:https://connect.microsoft.com/SQLServer/feedback/details/963631/ssis-parses-flat-files-incorrectly-when-the-source-file-contains-unexpected-extra-columns
发生的情况是额外的列连接到最后一列。如果您计划这样做,您可以使最后一列变大,然后从暂存 table 中解析 SQL。此外,您可以将整行塞入 SQL 并从那里解析每一列。虽然这有点笨拙,因为您将有很多 CHARINDEX() 检查所有位置的值的位置。
一个更简单的选择可能是在脚本任务中使用 split() 的某些组合在 .Net 中解析它以获取所有值并检查数组中值的计数以了解您有多少列。这还允许您根据找到的内容将行定向到不同的缓冲区。
最后,您可以要求供应商承诺一种格式。固定数量的列或使用处理变化的格式,如 XML.
我有一个源脚本组件的 C# 解决方案(我没有检查过,但我认为它有效)。
它将使用拆分将 header 读入数组。
然后对每个数据行使用相同的拆分函数并使用 header 值检查列并使用 rowval 设置输出。
您需要将所有输出列放入输出区域。
所有不存在的列在退出时都将具有空值。
public override void CreateNewOutputRows()
{
using (System.IO.StreamReader sr = new System.IO.StreamReader(@"[filepath and name]"))
{
while (!sr.EndOfStream)
{
string FullText = sr.ReadToEnd().ToString();
string[] rows = FullText.Split('\n');
//Get header values
string[] header = rows[0].Split(',');
for (int i = 1; i < rows.Length - 1; i++)
{
string[] rowVals = rows[i].Split(',');
for (int j = 0; j < rowVals.Length - 1; j++)
{
Output0Buffer.AddRow();
//Deal with each known header name
switch (header[j])
{
case "Field 1 Name": //this is where you use known column names
Output0Buffer.FieldOneName = rowVals[j]; //Cast if not string
break;
case "Field 2 Name":
Output0Buffer.FieldTwoName = rowVals[j]; //Cast if not string
break;
//continue this pattern for all column names
}
}
}
}
}
}
我采用了不同的方法,这似乎有效(在我解决了一些问题之后)。我所做的是获取 CSV 文件行并将它们放入临时数据 table。完成后,我从 datatable 批量复制到我的数据库。为了处理丢失的或新的列,我确定了哪些列对于 CSV 和 table 是共同的,并且只处理那些共同的列(新列已记录在日志文件中,以便以后添加)。这是我的 BulkCopy 模块:
Private Sub BulkCopy(csvFile As String)
Dim i As Integer
Dim rowCount As Int32 = 0
Dim colCount As Int32 = 0
Dim writeThis As ArrayList = New ArrayList
tempTable = New DataTable()
Try
'1) Set up the columns in the temporary data table, using commonColumns
For i = 0 To commonColumns.Count - 1
tempTable.Columns.Add(New DataColumn(commonColumns(i).ToString))
tempTable.Columns(i).DataType = GetDataType(commonColumns(i).ToString)
Next
'2) Start adding data from the csv file to the temporary data table
While Not csvReader.EndOfData
currentRow = csvReader.ReadFields() 'Read the next row of the csv file
rowCount += 1
writeThis.Clear()
For index = 0 To UBound(currentRow)
If commonColumns.Contains(csvColumns(index)) Then
Dim location As Integer = tableColumns.IndexOf(csvColumns(index))
Dim columnType As String = tableColumnTypes(location).ToString
If currentRow(index).Length = 0 Then
writeThis.Add(DBNull.Value)
Else
writeThis.Add(currentRow(index))
End If
'End Select
End If
Next
Dim row As DataRow = tempTable.NewRow()
row.ItemArray = writeThis.ToArray
tempTable.Rows.Add(row)
End While
csvReader.Close()
'3) Bulk copy the temporary data table to the database table.
Using copy As New SqlBulkCopy(dbConnection)
'3.1) Set up the column mappings
For i = 0 To commonColumns.Count - 1
copy.ColumnMappings.Add(commonColumns(i).ToString, commonColumns(i).ToString)
Next
'3.2) Set the destination table name
copy.DestinationTableName = tableName
'3.3) Copy the temporary data table to the database table
copy.WriteToServer(tempTable)
End Using
Catch ex As Exception
message = "*****ERROR*****" + vbNewLine
message += "BulkCopy: Encountered an exception of type " + ex.GetType.ToString()
message += ": " + ex.Message + vbNewLine + "***************" + vbNewLine
LogThis(message)
End Try
End Sub
可能还有更优雅的东西,但到目前为止这似乎可行。
我有一些 SSIS 程序包,它们使用来自供应商的 CSV 文件并将它们放入我们的本地数据库中。我遇到的问题是,有时供应商会添加或删除列,而我们没有时间在下一个 运行 之前更新我们的包,这会导致 SSIS 包异常终止。我想以某种方式防止这种情况发生。
我试过逐行读取 CSV 文件,删除新列,然后使用插入语句将更改后的行放入 table,但这比我们当前的时间要长得多过程(CSV 文件可以有数千或数十万条记录)。
我已经开始研究使用 ADO 连接,但我的本地计算机既没有 ACE 也没有 JET 提供程序,而且我认为部署包的服务器也缺少这些提供程序(我怀疑我能否将它们安装在部署服务器)。
我不知如何做才能加载 tables 并能够忽略新添加或删除的列(尽管如果 CSV 文件缺少列 table 有,这没什么大不了的)快速可靠。有什么想法吗?
查看 BiML,它会在 运行 时根据元数据动态构建和执行您的 SSIS 包。
基于此评论:
I've tried reading in the CSV files line by line, stripping out new columns, and then using an insert statement to put the altered line into the table, but that takes far longer than our current process (the CSV files can have thousands or hundreds of thousands of records).
还有这个:
I used a csvreader to read the file. The insert was via a sqlcommand object.
乍一看,瓶颈不在平面文件源,而在目标。 OLEDB 命令以逐行方式执行,每个输入行一个语句。通过将其更改为 OLEDB 目标,它将把过程转换为批量插入操作。要对此进行测试,只需使用平面文件源并将其连接到派生列。 运行 并检查速度。如果速度更快,请更改为 oledb 目标并重试。它还有助于插入堆(无聚集或非聚集索引)并使用 tablock。
但是,这并不能解决您整个文件多样化的问题。如果您在设计时最初配置它的方式少了一列或多列,我不知道平面文件源会做什么。它可能会失败,或者它可能会以某种参差不齐的形式导入行,其中下一行的一部分被分配给当前行的最后一列。那可能会是一团糟。
但是,我知道当平面文件源获得额外的列时会发生什么。我为此添加了这个连接项,但遗憾地被拒绝了:https://connect.microsoft.com/SQLServer/feedback/details/963631/ssis-parses-flat-files-incorrectly-when-the-source-file-contains-unexpected-extra-columns
发生的情况是额外的列连接到最后一列。如果您计划这样做,您可以使最后一列变大,然后从暂存 table 中解析 SQL。此外,您可以将整行塞入 SQL 并从那里解析每一列。虽然这有点笨拙,因为您将有很多 CHARINDEX() 检查所有位置的值的位置。
一个更简单的选择可能是在脚本任务中使用 split() 的某些组合在 .Net 中解析它以获取所有值并检查数组中值的计数以了解您有多少列。这还允许您根据找到的内容将行定向到不同的缓冲区。
最后,您可以要求供应商承诺一种格式。固定数量的列或使用处理变化的格式,如 XML.
我有一个源脚本组件的 C# 解决方案(我没有检查过,但我认为它有效)。
它将使用拆分将 header 读入数组。
然后对每个数据行使用相同的拆分函数并使用 header 值检查列并使用 rowval 设置输出。
您需要将所有输出列放入输出区域。
所有不存在的列在退出时都将具有空值。
public override void CreateNewOutputRows()
{
using (System.IO.StreamReader sr = new System.IO.StreamReader(@"[filepath and name]"))
{
while (!sr.EndOfStream)
{
string FullText = sr.ReadToEnd().ToString();
string[] rows = FullText.Split('\n');
//Get header values
string[] header = rows[0].Split(',');
for (int i = 1; i < rows.Length - 1; i++)
{
string[] rowVals = rows[i].Split(',');
for (int j = 0; j < rowVals.Length - 1; j++)
{
Output0Buffer.AddRow();
//Deal with each known header name
switch (header[j])
{
case "Field 1 Name": //this is where you use known column names
Output0Buffer.FieldOneName = rowVals[j]; //Cast if not string
break;
case "Field 2 Name":
Output0Buffer.FieldTwoName = rowVals[j]; //Cast if not string
break;
//continue this pattern for all column names
}
}
}
}
}
}
我采用了不同的方法,这似乎有效(在我解决了一些问题之后)。我所做的是获取 CSV 文件行并将它们放入临时数据 table。完成后,我从 datatable 批量复制到我的数据库。为了处理丢失的或新的列,我确定了哪些列对于 CSV 和 table 是共同的,并且只处理那些共同的列(新列已记录在日志文件中,以便以后添加)。这是我的 BulkCopy 模块:
Private Sub BulkCopy(csvFile As String)
Dim i As Integer
Dim rowCount As Int32 = 0
Dim colCount As Int32 = 0
Dim writeThis As ArrayList = New ArrayList
tempTable = New DataTable()
Try
'1) Set up the columns in the temporary data table, using commonColumns
For i = 0 To commonColumns.Count - 1
tempTable.Columns.Add(New DataColumn(commonColumns(i).ToString))
tempTable.Columns(i).DataType = GetDataType(commonColumns(i).ToString)
Next
'2) Start adding data from the csv file to the temporary data table
While Not csvReader.EndOfData
currentRow = csvReader.ReadFields() 'Read the next row of the csv file
rowCount += 1
writeThis.Clear()
For index = 0 To UBound(currentRow)
If commonColumns.Contains(csvColumns(index)) Then
Dim location As Integer = tableColumns.IndexOf(csvColumns(index))
Dim columnType As String = tableColumnTypes(location).ToString
If currentRow(index).Length = 0 Then
writeThis.Add(DBNull.Value)
Else
writeThis.Add(currentRow(index))
End If
'End Select
End If
Next
Dim row As DataRow = tempTable.NewRow()
row.ItemArray = writeThis.ToArray
tempTable.Rows.Add(row)
End While
csvReader.Close()
'3) Bulk copy the temporary data table to the database table.
Using copy As New SqlBulkCopy(dbConnection)
'3.1) Set up the column mappings
For i = 0 To commonColumns.Count - 1
copy.ColumnMappings.Add(commonColumns(i).ToString, commonColumns(i).ToString)
Next
'3.2) Set the destination table name
copy.DestinationTableName = tableName
'3.3) Copy the temporary data table to the database table
copy.WriteToServer(tempTable)
End Using
Catch ex As Exception
message = "*****ERROR*****" + vbNewLine
message += "BulkCopy: Encountered an exception of type " + ex.GetType.ToString()
message += ": " + ex.Message + vbNewLine + "***************" + vbNewLine
LogThis(message)
End Try
End Sub
可能还有更优雅的东西,但到目前为止这似乎可行。