并行文件下载损坏

Parallel file download corruption

我在下面有这段代码,它能够并行下载文件的多个部分并使用内存映射文件写入它们。问题出现在 DownloadFile() 函数中。该文件可以正常开始下载,但在下载过程中会损坏。例如,如果我尝试下载图像,它的某些部分将被损坏。我不确定这是否来自代码中的某种竞争传导,或者它是否与零件的内容范围计算有关。任何有关问题发生的方式或发生方式的帮助将不胜感激,谢谢!

最小的、可重现的例子:

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Globalization;
using System.IO;
using System.IO.MemoryMappedFiles;
using System.Net;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Threading;
using System.Threading.Tasks;
using static System.Console;

namespace ZenTester
{
internal class FileChunk
{
    public long Start { get; set; }
    public long End { get; set; }
    public FileChunk(){}
    public int Id { get; set; }

    public FileChunk(long startByte, long endByte)
    {
        Start = startByte;
        End = endByte;
    }
}

internal class RetryHandler : DelegatingHandler
{
    private int _maxRetries = 3;

    public RetryHandler(HttpMessageHandler innerHandler) : base(innerHandler) { }

    public RetryHandler(HttpMessageHandler innerHandler, int maxRetries) : base(innerHandler)
    {
        _maxRetries = maxRetries;
    }

    protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
    {
        HttpResponseMessage response = null;
        for (var i = 0; i < _maxRetries; i++)
        {
            response = await base.SendAsync(request, cancellationToken);
            if (response.IsSuccessStatusCode)
            {
                return response;
            }
        }

        return response;
    }
}

public static class ZenTester
{
    private static async Task DownloadFile(string url, int parts, string outFile = null!)
    {
        var responseLength = (await WebRequest.Create(url).GetResponseAsync()).ContentLength;
        var partSize = (long)Math.Floor(responseLength / (parts + 0.0));
        var pieces = new List<FileChunk>();
        var uri = new Uri(url);
        
        WriteLine(responseLength.ToString(CultureInfo.InvariantCulture) + " TOTAL SIZE");
        WriteLine(partSize.ToString(CultureInfo.InvariantCulture) + " PART SIZE" + "\n");
        
        string filename = outFile ?? Path.GetFileName(uri.LocalPath);

        var mmf = MemoryMappedFile.CreateFromFile(filename, FileMode.OpenOrCreate, null, responseLength);

        var httpPool = new HttpClient(new RetryHandler(new HttpClientHandler(), 10)) {MaxResponseContentBufferSize = 1000000000};
        
        //Loop to add all the events to the queue
        for (long i = 0; i < responseLength; i += partSize)
        {
            pieces.Add(i + partSize < responseLength
                ? new FileChunk(i, i + partSize)
                : new FileChunk(i, responseLength));
        }

        await Parallel.ForEachAsync(pieces, parallelOptions, async (piece, cancellationToken) =>
        {
            var client = httpPool.Get();
            var request = new HttpRequestMessage { RequestUri = new Uri(url) };
            request.Headers.Range = new RangeHeaderValue(piece.Start, piece.End);
                
            var message = client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead).Result;

            if (message.IsSuccessStatusCode)
            {
                await using var streamToRead = await message.Content.ReadAsStreamAsync();
                var streams = mmf.CreateViewStream(piece.Start, piece.End-piece.Start - 1);
                var T = streamToRead.CopyToAsync(streams);
                T.Wait();
                if (T.IsCompletedSuccessfully)
                {
                    streams.Flush();
                    streams.Close();
                }
            }
        });
        
    }
    
    public static void Main(string[] args)
    {
        var url = "https://wallpaperaccess.com/full/2159447.jpg";

        var s = DownloadFile(url, 8);
        
        s.Wait();
    }
}
}

我发现发生了损坏,因为某些范围会大于开始-结束。我解决了这个问题,但向内存映射文件和视图流添加了一个读写修饰符。然后我把视图流的大小改成了sendAsync请求返回的内容的长度:

var streams = mmf.CreateViewStream(piece.Start, message.Content.Headers.ContentLength!.Value,
                        MemoryMappedFileAccess.ReadWrite);