在 C# 中扩展流以允许来自 http 的无缝多流

Extend a stream to allow seamless multistreams from http in C#

目前我正在使用 HttpWebRequest 和 HttpWebResponse 检查 StatusCode 并下载文件。

我正在下载的文件是 .zip 文件(我首先尝试)或.zip 文件的一部分(.z01、.z02 等)。由于部分文件的大小始终为 20 兆字节(在我的例子中),我可以检查我是否恰好下载了 20 兆字节,如果是,则尝试继续下一部分。如果没有,我假设我正在下载最后一部分。

为了在 Java 中实现这一点,我将扩展 InputStream 并覆盖其读取方法和 return 其中的下一个流,从当前流读取或关闭它并使用 HttpURLConnection 检索下一个流 - 如果没有下一个流,则假设它是最后一个流。

尝试在 c# 中扩展 Stream,但是由于 Stream-class 是抽象的,我必须完全实现一些我真的不需要/不想实现的方法。 BufferedStream被密封。所以我唯一的选择是扩展 MemoryStream 如果我不想实现 Stream-class 的所有方法。

在 c# 中执行此操作的好方法是什么?

我需要流的原因是,我正在处理流 (unzipping/decoding) 它 - 它已经可以完美地处理单个 .zip 文件。因为我还可以指定偏移量,所以我可以从偏移量下载。
对于多部分流,我会将给定的偏移量除以 20 兆字节,将其转换为正确的部分文件名。

编辑: 我拒绝将 zip 和 decode 添加为标签,因为我下载的文件类型并不重要(顺便说一句:这不是真正的zip,而是一个 LZMA 编码的文件,我对其进行了解码)。

只需在不需要实际实现的方法中执行此操作...

throw new NotImplementedException();

我的方法可能不是最好的方法,我也没有测试 Position 属性,但它可以完成工作。

我从 Stream 派生了一个 class 并围绕它实现了一个包装器,它处理设置 currentStream-object.

服务器上的文件通常以 .zip 或 .z01、.z02、.z03 等格式保存。MultiPartStream(string) 构造函数让 MultiPartStream "guess" 正确 URL。 GetPart() 通常会在检查响应时抑制错误。 MultiPartStream(string, string) 构造函数只允许下载文件,同时还返回 Transferspeed。

我可能会随时更改此 class,但这已经满足了我的需要,并且由于我不喜欢有一个开放的问题,所以我决定提供一个对我有用的回复。如果我最终改变了很多,我可能会稍后更新它。

虽然考虑到这一点,但我可能会改为 classed NetworkStream。也许我会改变它,但到目前为止,这个 MultiPartStream 运行良好,它实际上使用了我拥有的所有带宽。此外,它还有 TotalStreamSize 和 Transferspeed。

MultiPartStream.cs:

using MyProject.Logic;
using System;
using System.Collections.Generic;
using System.Collections.Specialized;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading.Tasks;

namespace MyProject.Model
{
    public class MultiPartStream : Stream
    {
        private HttpWebResponse response;
        private Stream currentStream;
        private List<KeyValuePair<string, long>> parts = new List<KeyValuePair<string,long>>();

        private Stopwatch timer = new Stopwatch();
        private long totalStreamSize;
        private long bytesRead;
        private string file;
        private int currentPart = 0;

        public MultiPartStream(string file)
        {
            this.file = file;
            GetParts();
            NextStream();
        }

        public MultiPartStream(string file, string url)
        {
            this.file = file;
            GetPart(url, false);
            NextStream();
        }

        public override int Read(byte[] buffer, int offset, int count)
        {
            if (!timer.IsRunning)
                timer.Start();

            int bytesRead = currentStream.Read(buffer, offset, count);
            if (bytesRead <= 0 && currentPart < parts.Count - 1)
            {
                // if we cannot read from the Stream anymore, we open the next Stream if one is available
                currentPart++;
                NextStream();
                bytesRead = Read(buffer, offset, count);
            }
            this.bytesRead += bytesRead;

            return (int)bytesRead;
        }

        protected override void Dispose(bool disposing)
        {
            timer.Stop();
            base.Dispose(disposing);
        }

        private void GetParts()
        {
            if (!this.GetPart(DownloadHelper.Instance.GetSingleFileUrl(file)))
            {
                int filePart = 1;
                while (this.GetPart(DownloadHelper.Instance.GetMultiFileUrl(file, filePart)))
                {
                    filePart++;
                }
            }
        }

        private bool GetPart(string url, bool suppressApplicationException = true)
        {
            bool thisPartAdded = false;

            HttpWebRequest request = HttpWebRequest.CreateHttp(url);

            try
            {
                HttpWebResponse response = (HttpWebResponse)request.GetResponse();
                if (request.HaveResponse && response.StatusCode == HttpStatusCode.OK)
                {
                    parts.Add(new KeyValuePair<string, long>(url, response.ContentLength));
                    totalStreamSize += response.ContentLength;
                    thisPartAdded = true;
                }
                response.Close();
            }
            catch (WebException e)
            {
                if (!suppressApplicationException)
                    throw new ApplicationException(String.Format("Error: cannot access file '{0}' ({1}: {2})", url, e.Status, e.Message));
            }

            return thisPartAdded;
        }

        private void NextStream()
        {
            if (response != null)
            {
                response.Close();
                response = null;
            }

            HttpWebRequest request = HttpWebRequest.CreateHttp(parts[currentPart].Key);

            try
            {
                response = (HttpWebResponse)request.GetResponse();
                if (request.HaveResponse && response.StatusCode == HttpStatusCode.OK)
                {
                    currentStream = response.GetResponseStream();
                }
            }
            catch (WebException e)
            {
                throw new ApplicationException(String.Format("Error: could not open file '{0}' for streaming ({1}: {2})", parts[currentPart].Key, e.Status, e.Message));
            }
        }

        public long TotalStreamSize
        {
            get { return totalStreamSize; }
        }

        public string TransferSpeed
        {
            get
            {
                float transferSpeed = (float)(bytesRead / 1024f) / (timer.ElapsedMilliseconds / 1000f);
                if (transferSpeed > 1024f)
                {
                    return "@ " + Math.Round(transferSpeed / 1024f, 2) + " mb / s";
                }
                else
                {
                    return "@ " + Math.Round(transferSpeed, 2) + " kb / s";
                }
            }
        }

        #region Remaining Stream-methods
        public override long Position
        {
            get
            {
                long valueToCompare = 0;
                for (int index = 0; index < currentPart; index++)
                {
                    valueToCompare += parts[index].Value;
                }
                return valueToCompare + currentStream.Position;
            }
            set
            {
                long valueToCompare = 0;
                for (int index = 0; index < parts.Count; index++)
                {
                    valueToCompare += parts[index].Value;
                    if (value <= valueToCompare)
                    {
                        currentPart = index;
                        NextStream();
                        currentStream.Position = value - (valueToCompare - parts[index].Value);
                    }
                }
            }
        }

        public override bool CanRead
        {
            get { return true; }
        }

        public override bool CanSeek
        {
            get { return false; }
        }

        public override bool CanWrite
        {
            get { return false; }
        }

        public override void Flush()
        {
            if (currentPart > 0)
            {
                currentPart = 0;
                NextStream();
            }
            else if (currentPart == 0)
            {
                currentStream.Flush();
            }
        }

        public override long Length
        {
            get { return totalStreamSize; }
        }

        public override long Seek(long offset, SeekOrigin origin)
        {
            throw new NotSupportedException();
        }

        public override void SetLength(long value)
        {
            throw new NotSupportedException();
        }

        public override void Write(byte[] buffer, int offset, int count)
        {
            throw new NotSupportedException();
        }
        #endregion
    }
}

其他相关方法:

public string GetSingleFileUrl(string file)
{
    return String.Format("{0}/Zip/{1}.zip", this.GetBaseUrl(), file.Replace(@"\", "/"));
}

public string GetMultiFileUrl(string file, int part)
{
    return String.Format("{0}/Zip/{1}.z{2:D2}", this.GetBaseUrl(), file.Replace(@"\", "/"), part);
}

这个 class 并不完美,因为它是为特定文件源严格编写的,但它完成了它的工作。如果我最终将它改进到我真正满意的程度,我一定会更新我的回复。