并行文件下载损坏
Parallel file download corruption
我在下面有这段代码,它能够并行下载文件的多个部分并使用内存映射文件写入它们。问题出现在 DownloadFile()
函数中。该文件可以正常开始下载,但在下载过程中会损坏。例如,如果我尝试下载图像,它的某些部分将被损坏。我不确定这是否来自代码中的某种竞争传导,或者它是否与零件的内容范围计算有关。任何有关问题发生的方式或发生方式的帮助将不胜感激,谢谢!
最小的、可重现的例子:
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Globalization;
using System.IO;
using System.IO.MemoryMappedFiles;
using System.Net;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Threading;
using System.Threading.Tasks;
using static System.Console;
namespace ZenTester
{
internal class FileChunk
{
public long Start { get; set; }
public long End { get; set; }
public FileChunk(){}
public int Id { get; set; }
public FileChunk(long startByte, long endByte)
{
Start = startByte;
End = endByte;
}
}
internal class RetryHandler : DelegatingHandler
{
private int _maxRetries = 3;
public RetryHandler(HttpMessageHandler innerHandler) : base(innerHandler) { }
public RetryHandler(HttpMessageHandler innerHandler, int maxRetries) : base(innerHandler)
{
_maxRetries = maxRetries;
}
protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
{
HttpResponseMessage response = null;
for (var i = 0; i < _maxRetries; i++)
{
response = await base.SendAsync(request, cancellationToken);
if (response.IsSuccessStatusCode)
{
return response;
}
}
return response;
}
}
public static class ZenTester
{
private static async Task DownloadFile(string url, int parts, string outFile = null!)
{
var responseLength = (await WebRequest.Create(url).GetResponseAsync()).ContentLength;
var partSize = (long)Math.Floor(responseLength / (parts + 0.0));
var pieces = new List<FileChunk>();
var uri = new Uri(url);
WriteLine(responseLength.ToString(CultureInfo.InvariantCulture) + " TOTAL SIZE");
WriteLine(partSize.ToString(CultureInfo.InvariantCulture) + " PART SIZE" + "\n");
string filename = outFile ?? Path.GetFileName(uri.LocalPath);
var mmf = MemoryMappedFile.CreateFromFile(filename, FileMode.OpenOrCreate, null, responseLength);
var httpPool = new HttpClient(new RetryHandler(new HttpClientHandler(), 10)) {MaxResponseContentBufferSize = 1000000000};
//Loop to add all the events to the queue
for (long i = 0; i < responseLength; i += partSize)
{
pieces.Add(i + partSize < responseLength
? new FileChunk(i, i + partSize)
: new FileChunk(i, responseLength));
}
await Parallel.ForEachAsync(pieces, parallelOptions, async (piece, cancellationToken) =>
{
var client = httpPool.Get();
var request = new HttpRequestMessage { RequestUri = new Uri(url) };
request.Headers.Range = new RangeHeaderValue(piece.Start, piece.End);
var message = client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead).Result;
if (message.IsSuccessStatusCode)
{
await using var streamToRead = await message.Content.ReadAsStreamAsync();
var streams = mmf.CreateViewStream(piece.Start, piece.End-piece.Start - 1);
var T = streamToRead.CopyToAsync(streams);
T.Wait();
if (T.IsCompletedSuccessfully)
{
streams.Flush();
streams.Close();
}
}
});
}
public static void Main(string[] args)
{
var url = "https://wallpaperaccess.com/full/2159447.jpg";
var s = DownloadFile(url, 8);
s.Wait();
}
}
}
我发现发生了损坏,因为某些范围会大于开始-结束。我解决了这个问题,但向内存映射文件和视图流添加了一个读写修饰符。然后我把视图流的大小改成了sendAsync请求返回的内容的长度:
var streams = mmf.CreateViewStream(piece.Start, message.Content.Headers.ContentLength!.Value,
MemoryMappedFileAccess.ReadWrite);
我在下面有这段代码,它能够并行下载文件的多个部分并使用内存映射文件写入它们。问题出现在 DownloadFile()
函数中。该文件可以正常开始下载,但在下载过程中会损坏。例如,如果我尝试下载图像,它的某些部分将被损坏。我不确定这是否来自代码中的某种竞争传导,或者它是否与零件的内容范围计算有关。任何有关问题发生的方式或发生方式的帮助将不胜感激,谢谢!
最小的、可重现的例子:
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Globalization;
using System.IO;
using System.IO.MemoryMappedFiles;
using System.Net;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Threading;
using System.Threading.Tasks;
using static System.Console;
namespace ZenTester
{
internal class FileChunk
{
public long Start { get; set; }
public long End { get; set; }
public FileChunk(){}
public int Id { get; set; }
public FileChunk(long startByte, long endByte)
{
Start = startByte;
End = endByte;
}
}
internal class RetryHandler : DelegatingHandler
{
private int _maxRetries = 3;
public RetryHandler(HttpMessageHandler innerHandler) : base(innerHandler) { }
public RetryHandler(HttpMessageHandler innerHandler, int maxRetries) : base(innerHandler)
{
_maxRetries = maxRetries;
}
protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
{
HttpResponseMessage response = null;
for (var i = 0; i < _maxRetries; i++)
{
response = await base.SendAsync(request, cancellationToken);
if (response.IsSuccessStatusCode)
{
return response;
}
}
return response;
}
}
public static class ZenTester
{
private static async Task DownloadFile(string url, int parts, string outFile = null!)
{
var responseLength = (await WebRequest.Create(url).GetResponseAsync()).ContentLength;
var partSize = (long)Math.Floor(responseLength / (parts + 0.0));
var pieces = new List<FileChunk>();
var uri = new Uri(url);
WriteLine(responseLength.ToString(CultureInfo.InvariantCulture) + " TOTAL SIZE");
WriteLine(partSize.ToString(CultureInfo.InvariantCulture) + " PART SIZE" + "\n");
string filename = outFile ?? Path.GetFileName(uri.LocalPath);
var mmf = MemoryMappedFile.CreateFromFile(filename, FileMode.OpenOrCreate, null, responseLength);
var httpPool = new HttpClient(new RetryHandler(new HttpClientHandler(), 10)) {MaxResponseContentBufferSize = 1000000000};
//Loop to add all the events to the queue
for (long i = 0; i < responseLength; i += partSize)
{
pieces.Add(i + partSize < responseLength
? new FileChunk(i, i + partSize)
: new FileChunk(i, responseLength));
}
await Parallel.ForEachAsync(pieces, parallelOptions, async (piece, cancellationToken) =>
{
var client = httpPool.Get();
var request = new HttpRequestMessage { RequestUri = new Uri(url) };
request.Headers.Range = new RangeHeaderValue(piece.Start, piece.End);
var message = client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead).Result;
if (message.IsSuccessStatusCode)
{
await using var streamToRead = await message.Content.ReadAsStreamAsync();
var streams = mmf.CreateViewStream(piece.Start, piece.End-piece.Start - 1);
var T = streamToRead.CopyToAsync(streams);
T.Wait();
if (T.IsCompletedSuccessfully)
{
streams.Flush();
streams.Close();
}
}
});
}
public static void Main(string[] args)
{
var url = "https://wallpaperaccess.com/full/2159447.jpg";
var s = DownloadFile(url, 8);
s.Wait();
}
}
}
我发现发生了损坏,因为某些范围会大于开始-结束。我解决了这个问题,但向内存映射文件和视图流添加了一个读写修饰符。然后我把视图流的大小改成了sendAsync请求返回的内容的长度:
var streams = mmf.CreateViewStream(piece.Start, message.Content.Headers.ContentLength!.Value,
MemoryMappedFileAccess.ReadWrite);