将 CSV 从 Azure blob 加载到 Azure 的 SSIS 步骤 SQL
SSIS steps to load CSV from Azure blob to Azure SQL
我需要连接到 Azure blob(源)中的 CSV 文件,然后将数据加载到 Azure SQL 服务器 table,然后将 CSV 文件移动到不同的(存档) 天蓝色斑点。
如果没有 Azure,我会创建一个到本地文件的平面文件连接,然后 Data Flow task
使用 Source Assistant 和 Destination Assistant 将数据加载到 SQL 服务器 table , 然后 File System task
in Control Flow
将文件移动到存档目录。
我想做类似的事情,直接连接到 Azure blob 中的文件,然后在数据流任务之后,执行一个文件从一个 Azure blob(源)移动到另一个 Azure Blob(存档).
我能想到的最好的方法是使用 Azure Blob Download task
将 CSV 移动到 SSIS 运行 所在的 Azure VM(顺便说一下,你能在没有 VM 的情况下获得 Azure SSIS 服务吗?),然后在下载后创建一个 flat file connection
& Data Flow
来加载数据,然后对 Archive Blob 执行一个 Azure Blob Upload task
。
似乎应该有一种方法可以连接到源 Azure blob 文件并直接从中读取,而无需先下载它。同样,似乎应该有一种方法可以在 Azure blob 容器之间移动文件。我能想到的最好的是 download/upload 选项,但这需要一个中间位置(本地目录),并且不会在下载后删除源文件。是否有 SSIS Azure 工具来完成这些任务?
Any ideas for the other half of the problem: moving the files from the Source blob to an Archive blob after processing them?
据我所知,没有内置任务可让您实现此目的。根据我的测试,我假设您可以利用 Script Task 并编写代码(VB 或 C#)来直接处理 blob。下面是我的详细步骤,大家可以参考一下:
1) 使用 数据流 下的 Azure Blob Source and OLE DB Destination 将 CSV 文件从 Azure Blob 加载到 Azure SQL 数据库中。
2) 将 CSV 数据成功加载到 SQL table 后,使用脚本任务将源 blob 移动到存档 blob。
我会调用 Blob 服务 REST API copy Blob and Delete Blob with Container SAS token, you could leverage Microsoft Azure Storage Explorer and follow this official tutorial 来为您的 blob 容器生成 SAS 令牌。
假设源blob和目标blob在同一个容器下,那么我如下添加三个变量(SourceBlobUrl
、ContainerSasToken
、ArchiveBlobUrl
)并将它们添加为ReadOnlyVariables 脚本任务编辑器,您可以参考此tutorial在脚本任务中使用变量。
单击脚本任务编辑器下的编辑脚本 按钮以启动您在其中编写自定义脚本的 VSTA 开发环境。下面是ScriptMain.cs
下的Main方法如下:
public async void Main()
{
// TODO: Add your code here
string sasToken = Dts.Variables["ContainerSasToken"].Value.ToString();
string sourceBlobUrl = Dts.Variables["SourceBlobUrl"].Value.ToString();
string archiveBlobUrl = Dts.Variables["ArchiveBlobUrl"].Value.ToString();
try
{
HttpClient client = new HttpClient();
client.DefaultRequestHeaders.Add("x-ms-copy-source", sourceBlobUrl + sasToken);
//copy source blob to archive blob
Dts.Log($"start copying blob from [{sourceBlobUrl}] to [{archiveBlobUrl}]...", 0, new byte[0]);
HttpResponseMessage response = await client.PutAsync(archiveBlobUrl + sasToken, null);
if (response.StatusCode == HttpStatusCode.Accepted || response.StatusCode == HttpStatusCode.Created)
{
client.DefaultRequestHeaders.Clear();
Dts.Log($"start deleting blob [{sourceBlobUrl}]...", 0, new byte[0]);
//delete source blob
HttpResponseMessage result = await client.DeleteAsync(sourceBlobUrl + sasToken);
if (result.StatusCode == HttpStatusCode.Accepted || result.StatusCode == HttpStatusCode.Created)
{
Dts.TaskResult = (int)ScriptResults.Success;
return;
}
}
Dts.TaskResult = (int)ScriptResults.Failure;
}
catch (Exception ex)
{
Dts.Events.FireError(-1, "Script Task - Move source blob to an archive blob", ex.Message + "\r" + ex.StackTrace, String.Empty, 0);
Dts.TaskResult = (int)ScriptResults.Failure;
}
}
结果
此外,您还可以利用 Microsoft Azure Storage Client Library for .NET to access storage blob, at this point, you need to load the assembly in a SSIS script task that is not in the GAC, for more details you could refer to this official blog.
根据 Bruce 的建议,我决定使用 Azure 存储客户端库,它与 Bruce 提供的解决方案不同,因此我将我的工作代码发布给任何想要采用这种方法的人。
为此我找到了两个很好的参考资料:
http://microsoft-ssis.blogspot.com/2015/10/azure-file-system-task-for-ssis.html
http://cc.davelozinski.com/code/csharp-azure-blob-storage-manager-class
首先,在脚本任务中,添加对Microsoft.WindowsAzure.Storage
程序集的引用(C:\Program Files\Microsoft SDKs\Azure.NET SDK\v2.9\ToolsRef )
其次,添加命名空间引用:
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Blob;
最后,这是在 Azure 存储帐户之间移动文件的完整主要方法:
public void Main()
{
// Get parameter values
string blobFile = Dts.Variables["$Package::NewClientFile"].Value.ToString();
string containerName = Dts.Variables["$Project::ClientContainer"].Value.ToString();
// get connections
string connStrSource = Dts.Connections["azureadhocstorage"].AcquireConnection(Dts.Transaction).ToString();
string connStrTarget = Dts.Connections["azurearchivestorage"].AcquireConnection(Dts.Transaction).ToString();
try
{
// Retrieve storage accounts from connection string.
CloudStorageAccount storageAcctSource = CloudStorageAccount.Parse(connStrSource);
CloudStorageAccount storageAcctTarget = CloudStorageAccount.Parse(connStrTarget);
// Create the blob clients
CloudBlobClient blobClientSource = storageAcctSource.CreateCloudBlobClient();
CloudBlobClient blobClientTarget = storageAcctTarget.CreateCloudBlobClient();
// Create a reference to the container you want to delete
CloudBlobContainer containerSource = blobClientSource.GetContainerReference(containerName);
CloudBlobContainer containerTarget = blobClientTarget.GetContainerReference(containerName);
// get blockblob (the files) references
CloudBlockBlob blobBlockSource = containerSource.GetBlockBlobReference(blobFile);
CloudBlockBlob blobBlockTarget = containerTarget.GetBlockBlobReference(blobFile);
// copy the source to the target, waiting for it to finish (it is Asynchronous between separate accounts)
blobBlockTarget.StartCopy(blobBlockSource);
while (blobBlockTarget.CopyState.Status == CopyStatus.Pending)
{
// not done copying yet, so go to sleep
System.Threading.Thread.Sleep(100);
// refresh the copy status
blobBlockTarget.FetchAttributes();
}
// delete the source
blobBlockSource.Delete();
// Show success in log
bool fireAgain = true;
Dts.Events.FireInformation(0, "Move Block Blob", containerName + ": " + blobFile + " was moved successfully", string.Empty, 0, ref fireAgain);
// Close Script Task with Success
Dts.TaskResult = (int)ScriptResults.Success;
}
catch (Exception ex)
{
// Show Failure in log
Dts.Events.FireError(0, "Move Block Blob", ex.Message, string.Empty, 0);
// Close Script Task with Failure
Dts.TaskResult = (int)ScriptResults.Failure;
}
}
我需要连接到 Azure blob(源)中的 CSV 文件,然后将数据加载到 Azure SQL 服务器 table,然后将 CSV 文件移动到不同的(存档) 天蓝色斑点。
如果没有 Azure,我会创建一个到本地文件的平面文件连接,然后 Data Flow task
使用 Source Assistant 和 Destination Assistant 将数据加载到 SQL 服务器 table , 然后 File System task
in Control Flow
将文件移动到存档目录。
我想做类似的事情,直接连接到 Azure blob 中的文件,然后在数据流任务之后,执行一个文件从一个 Azure blob(源)移动到另一个 Azure Blob(存档).
我能想到的最好的方法是使用 Azure Blob Download task
将 CSV 移动到 SSIS 运行 所在的 Azure VM(顺便说一下,你能在没有 VM 的情况下获得 Azure SSIS 服务吗?),然后在下载后创建一个 flat file connection
& Data Flow
来加载数据,然后对 Archive Blob 执行一个 Azure Blob Upload task
。
似乎应该有一种方法可以连接到源 Azure blob 文件并直接从中读取,而无需先下载它。同样,似乎应该有一种方法可以在 Azure blob 容器之间移动文件。我能想到的最好的是 download/upload 选项,但这需要一个中间位置(本地目录),并且不会在下载后删除源文件。是否有 SSIS Azure 工具来完成这些任务?
Any ideas for the other half of the problem: moving the files from the Source blob to an Archive blob after processing them?
据我所知,没有内置任务可让您实现此目的。根据我的测试,我假设您可以利用 Script Task 并编写代码(VB 或 C#)来直接处理 blob。下面是我的详细步骤,大家可以参考一下:
1) 使用 数据流 下的 Azure Blob Source and OLE DB Destination 将 CSV 文件从 Azure Blob 加载到 Azure SQL 数据库中。
2) 将 CSV 数据成功加载到 SQL table 后,使用脚本任务将源 blob 移动到存档 blob。
我会调用 Blob 服务 REST API copy Blob and Delete Blob with Container SAS token, you could leverage Microsoft Azure Storage Explorer and follow this official tutorial 来为您的 blob 容器生成 SAS 令牌。
假设源blob和目标blob在同一个容器下,那么我如下添加三个变量(SourceBlobUrl
、ContainerSasToken
、ArchiveBlobUrl
)并将它们添加为ReadOnlyVariables 脚本任务编辑器,您可以参考此tutorial在脚本任务中使用变量。
单击脚本任务编辑器下的编辑脚本 按钮以启动您在其中编写自定义脚本的 VSTA 开发环境。下面是ScriptMain.cs
下的Main方法如下:
public async void Main()
{
// TODO: Add your code here
string sasToken = Dts.Variables["ContainerSasToken"].Value.ToString();
string sourceBlobUrl = Dts.Variables["SourceBlobUrl"].Value.ToString();
string archiveBlobUrl = Dts.Variables["ArchiveBlobUrl"].Value.ToString();
try
{
HttpClient client = new HttpClient();
client.DefaultRequestHeaders.Add("x-ms-copy-source", sourceBlobUrl + sasToken);
//copy source blob to archive blob
Dts.Log($"start copying blob from [{sourceBlobUrl}] to [{archiveBlobUrl}]...", 0, new byte[0]);
HttpResponseMessage response = await client.PutAsync(archiveBlobUrl + sasToken, null);
if (response.StatusCode == HttpStatusCode.Accepted || response.StatusCode == HttpStatusCode.Created)
{
client.DefaultRequestHeaders.Clear();
Dts.Log($"start deleting blob [{sourceBlobUrl}]...", 0, new byte[0]);
//delete source blob
HttpResponseMessage result = await client.DeleteAsync(sourceBlobUrl + sasToken);
if (result.StatusCode == HttpStatusCode.Accepted || result.StatusCode == HttpStatusCode.Created)
{
Dts.TaskResult = (int)ScriptResults.Success;
return;
}
}
Dts.TaskResult = (int)ScriptResults.Failure;
}
catch (Exception ex)
{
Dts.Events.FireError(-1, "Script Task - Move source blob to an archive blob", ex.Message + "\r" + ex.StackTrace, String.Empty, 0);
Dts.TaskResult = (int)ScriptResults.Failure;
}
}
结果
此外,您还可以利用 Microsoft Azure Storage Client Library for .NET to access storage blob, at this point, you need to load the assembly in a SSIS script task that is not in the GAC, for more details you could refer to this official blog.
根据 Bruce 的建议,我决定使用 Azure 存储客户端库,它与 Bruce 提供的解决方案不同,因此我将我的工作代码发布给任何想要采用这种方法的人。
为此我找到了两个很好的参考资料:
http://microsoft-ssis.blogspot.com/2015/10/azure-file-system-task-for-ssis.html
http://cc.davelozinski.com/code/csharp-azure-blob-storage-manager-class
首先,在脚本任务中,添加对Microsoft.WindowsAzure.Storage
程序集的引用(C:\Program Files\Microsoft SDKs\Azure.NET SDK\v2.9\ToolsRef )
其次,添加命名空间引用:
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Blob;
最后,这是在 Azure 存储帐户之间移动文件的完整主要方法:
public void Main()
{
// Get parameter values
string blobFile = Dts.Variables["$Package::NewClientFile"].Value.ToString();
string containerName = Dts.Variables["$Project::ClientContainer"].Value.ToString();
// get connections
string connStrSource = Dts.Connections["azureadhocstorage"].AcquireConnection(Dts.Transaction).ToString();
string connStrTarget = Dts.Connections["azurearchivestorage"].AcquireConnection(Dts.Transaction).ToString();
try
{
// Retrieve storage accounts from connection string.
CloudStorageAccount storageAcctSource = CloudStorageAccount.Parse(connStrSource);
CloudStorageAccount storageAcctTarget = CloudStorageAccount.Parse(connStrTarget);
// Create the blob clients
CloudBlobClient blobClientSource = storageAcctSource.CreateCloudBlobClient();
CloudBlobClient blobClientTarget = storageAcctTarget.CreateCloudBlobClient();
// Create a reference to the container you want to delete
CloudBlobContainer containerSource = blobClientSource.GetContainerReference(containerName);
CloudBlobContainer containerTarget = blobClientTarget.GetContainerReference(containerName);
// get blockblob (the files) references
CloudBlockBlob blobBlockSource = containerSource.GetBlockBlobReference(blobFile);
CloudBlockBlob blobBlockTarget = containerTarget.GetBlockBlobReference(blobFile);
// copy the source to the target, waiting for it to finish (it is Asynchronous between separate accounts)
blobBlockTarget.StartCopy(blobBlockSource);
while (blobBlockTarget.CopyState.Status == CopyStatus.Pending)
{
// not done copying yet, so go to sleep
System.Threading.Thread.Sleep(100);
// refresh the copy status
blobBlockTarget.FetchAttributes();
}
// delete the source
blobBlockSource.Delete();
// Show success in log
bool fireAgain = true;
Dts.Events.FireInformation(0, "Move Block Blob", containerName + ": " + blobFile + " was moved successfully", string.Empty, 0, ref fireAgain);
// Close Script Task with Success
Dts.TaskResult = (int)ScriptResults.Success;
}
catch (Exception ex)
{
// Show Failure in log
Dts.Events.FireError(0, "Move Block Blob", ex.Message, string.Empty, 0);
// Close Script Task with Failure
Dts.TaskResult = (int)ScriptResults.Failure;
}
}