从 Azure 存储异步下载 blob 并将它们保存在 DataTable 中
Download blobs from azure storage asynchronously and save them in DataTable
以下代码显示了我如何从 Azure Blob 存储下载 Blob 并将它们保存到数据表中:
foreach (var currIndexGroup in blobsGroupedByIndex)
{
DataRow dr = dtResult.NewRow();
foreach (var currIndex in currIndexGroup)
{
long fileByteLength = currIndex.Properties.Length;
byte[] serializedAndCompressedResult = new byte[fileByteLength];
currIndex.DownloadToByteArray(serializedAndCompressedResult, 0);
dr[currIndex.Metadata["columnName"]] = DeflateStream.UncompressString(serializedAndCompressedResult);
}
dtResult.Rows.Add(dr);
}
问题是,下载速度很慢。下载 1000 个真正的小 blob 大约需要 20 秒。如果我尝试使用 currIndex.DownloadToByteArrayAsync(serializedAndCompressedResult, 0);
异步地 运行 它,后续行会抛出异常 Bad state (invalid stored block lengths)
.
异步填充此数据表的正确方法是什么?
//the plan here is to make a model that holds your currIndex and byte array so you can return that model from a task
public class MyModel
{
public CloudBlockBlob CurrIndex {get;set;}
public byte[] FileBytes {get;set;}
}
foreach (var currIndexGroup in blobsGroupedByIndex)
{
var myTasks = new List<Task<MyModel>>();
foreach (var currIndex in currIndexGroup)
{
myTasks.Add(Task<MyModel>.Factory.StartNew(() =>
{
var myModel = new MyModel();
myModel.CurrIndex = currIndex;
long fileByteLength = myModel.CurrIndex.Properties.Length;
myModel.FileBytes = new byte[fileByteLength];
currIndex.DownloadToByteArray(myModel.FileBytes, 0);
return myModel;
});
}
Task.WaitAll(myTasks.ToArray());
foreach (var task in myTasks)
{
MyModel myModel = task.Result;
DataRow dr = dtResult.NewRow();
dr[myModel.CurrIndex.Metadata["columnName"]] = DeflateStream.UncompressString(myModel.FileBytes);
dtResult.Rows.Add(dr);
}
}
您可以通过在外部 foreach 循环上使用 Parallel.ForEach
来进一步提高并行性。您必须锁定 dtResult
以使其线程安全。
以下代码显示了我如何从 Azure Blob 存储下载 Blob 并将它们保存到数据表中:
foreach (var currIndexGroup in blobsGroupedByIndex)
{
DataRow dr = dtResult.NewRow();
foreach (var currIndex in currIndexGroup)
{
long fileByteLength = currIndex.Properties.Length;
byte[] serializedAndCompressedResult = new byte[fileByteLength];
currIndex.DownloadToByteArray(serializedAndCompressedResult, 0);
dr[currIndex.Metadata["columnName"]] = DeflateStream.UncompressString(serializedAndCompressedResult);
}
dtResult.Rows.Add(dr);
}
问题是,下载速度很慢。下载 1000 个真正的小 blob 大约需要 20 秒。如果我尝试使用 currIndex.DownloadToByteArrayAsync(serializedAndCompressedResult, 0);
异步地 运行 它,后续行会抛出异常 Bad state (invalid stored block lengths)
.
异步填充此数据表的正确方法是什么?
//the plan here is to make a model that holds your currIndex and byte array so you can return that model from a task
public class MyModel
{
public CloudBlockBlob CurrIndex {get;set;}
public byte[] FileBytes {get;set;}
}
foreach (var currIndexGroup in blobsGroupedByIndex)
{
var myTasks = new List<Task<MyModel>>();
foreach (var currIndex in currIndexGroup)
{
myTasks.Add(Task<MyModel>.Factory.StartNew(() =>
{
var myModel = new MyModel();
myModel.CurrIndex = currIndex;
long fileByteLength = myModel.CurrIndex.Properties.Length;
myModel.FileBytes = new byte[fileByteLength];
currIndex.DownloadToByteArray(myModel.FileBytes, 0);
return myModel;
});
}
Task.WaitAll(myTasks.ToArray());
foreach (var task in myTasks)
{
MyModel myModel = task.Result;
DataRow dr = dtResult.NewRow();
dr[myModel.CurrIndex.Metadata["columnName"]] = DeflateStream.UncompressString(myModel.FileBytes);
dtResult.Rows.Add(dr);
}
}
您可以通过在外部 foreach 循环上使用 Parallel.ForEach
来进一步提高并行性。您必须锁定 dtResult
以使其线程安全。