在 C# 中扩展流以允许来自 http 的无缝多流
Extend a stream to allow seamless multistreams from http in C#
目前我正在使用 HttpWebRequest 和 HttpWebResponse 检查 StatusCode 并下载文件。
我正在下载的文件是 .zip 文件(我首先尝试)或.zip 文件的一部分(.z01、.z02 等)。由于部分文件的大小始终为 20 兆字节(在我的例子中),我可以检查我是否恰好下载了 20 兆字节,如果是,则尝试继续下一部分。如果没有,我假设我正在下载最后一部分。
为了在 Java 中实现这一点,我将扩展 InputStream 并覆盖其读取方法和 return 其中的下一个流,从当前流读取或关闭它并使用 HttpURLConnection 检索下一个流 - 如果没有下一个流,则假设它是最后一个流。
我 尝试在 c# 中扩展 Stream,但是由于 Stream-class 是抽象的,我必须完全实现一些我真的不需要/不想实现的方法。 BufferedStream被密封。所以我唯一的选择是扩展 MemoryStream 如果我不想实现 Stream-class 的所有方法。
在 c# 中执行此操作的好方法是什么?
我需要流的原因是,我正在处理流 (unzipping/decoding) 它 - 它已经可以完美地处理单个 .zip 文件。因为我还可以指定偏移量,所以我可以从偏移量下载。
对于多部分流,我会将给定的偏移量除以 20 兆字节,将其转换为正确的部分文件名。
编辑: 我拒绝将 zip 和 decode 添加为标签,因为我下载的文件类型并不重要(顺便说一句:这不是真正的zip,而是一个 LZMA 编码的文件,我对其进行了解码)。
只需在不需要实际实现的方法中执行此操作...
throw new NotImplementedException();
我的方法可能不是最好的方法,我也没有测试 Position 属性,但它可以完成工作。
我从 Stream 派生了一个 class 并围绕它实现了一个包装器,它处理设置 currentStream-object.
服务器上的文件通常以 .zip 或 .z01、.z02、.z03 等格式保存。MultiPartStream(string) 构造函数让 MultiPartStream "guess" 正确 URL。 GetPart() 通常会在检查响应时抑制错误。 MultiPartStream(string, string) 构造函数只允许下载文件,同时还返回 Transferspeed。
我可能会随时更改此 class,但这已经满足了我的需要,并且由于我不喜欢有一个开放的问题,所以我决定提供一个对我有用的回复。如果我最终改变了很多,我可能会稍后更新它。
虽然考虑到这一点,但我可能会改为 classed NetworkStream。也许我会改变它,但到目前为止,这个 MultiPartStream 运行良好,它实际上使用了我拥有的所有带宽。此外,它还有 TotalStreamSize 和 Transferspeed。
MultiPartStream.cs:
using MyProject.Logic;
using System;
using System.Collections.Generic;
using System.Collections.Specialized;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading.Tasks;
namespace MyProject.Model
{
public class MultiPartStream : Stream
{
private HttpWebResponse response;
private Stream currentStream;
private List<KeyValuePair<string, long>> parts = new List<KeyValuePair<string,long>>();
private Stopwatch timer = new Stopwatch();
private long totalStreamSize;
private long bytesRead;
private string file;
private int currentPart = 0;
public MultiPartStream(string file)
{
this.file = file;
GetParts();
NextStream();
}
public MultiPartStream(string file, string url)
{
this.file = file;
GetPart(url, false);
NextStream();
}
public override int Read(byte[] buffer, int offset, int count)
{
if (!timer.IsRunning)
timer.Start();
int bytesRead = currentStream.Read(buffer, offset, count);
if (bytesRead <= 0 && currentPart < parts.Count - 1)
{
// if we cannot read from the Stream anymore, we open the next Stream if one is available
currentPart++;
NextStream();
bytesRead = Read(buffer, offset, count);
}
this.bytesRead += bytesRead;
return (int)bytesRead;
}
protected override void Dispose(bool disposing)
{
timer.Stop();
base.Dispose(disposing);
}
private void GetParts()
{
if (!this.GetPart(DownloadHelper.Instance.GetSingleFileUrl(file)))
{
int filePart = 1;
while (this.GetPart(DownloadHelper.Instance.GetMultiFileUrl(file, filePart)))
{
filePart++;
}
}
}
private bool GetPart(string url, bool suppressApplicationException = true)
{
bool thisPartAdded = false;
HttpWebRequest request = HttpWebRequest.CreateHttp(url);
try
{
HttpWebResponse response = (HttpWebResponse)request.GetResponse();
if (request.HaveResponse && response.StatusCode == HttpStatusCode.OK)
{
parts.Add(new KeyValuePair<string, long>(url, response.ContentLength));
totalStreamSize += response.ContentLength;
thisPartAdded = true;
}
response.Close();
}
catch (WebException e)
{
if (!suppressApplicationException)
throw new ApplicationException(String.Format("Error: cannot access file '{0}' ({1}: {2})", url, e.Status, e.Message));
}
return thisPartAdded;
}
private void NextStream()
{
if (response != null)
{
response.Close();
response = null;
}
HttpWebRequest request = HttpWebRequest.CreateHttp(parts[currentPart].Key);
try
{
response = (HttpWebResponse)request.GetResponse();
if (request.HaveResponse && response.StatusCode == HttpStatusCode.OK)
{
currentStream = response.GetResponseStream();
}
}
catch (WebException e)
{
throw new ApplicationException(String.Format("Error: could not open file '{0}' for streaming ({1}: {2})", parts[currentPart].Key, e.Status, e.Message));
}
}
public long TotalStreamSize
{
get { return totalStreamSize; }
}
public string TransferSpeed
{
get
{
float transferSpeed = (float)(bytesRead / 1024f) / (timer.ElapsedMilliseconds / 1000f);
if (transferSpeed > 1024f)
{
return "@ " + Math.Round(transferSpeed / 1024f, 2) + " mb / s";
}
else
{
return "@ " + Math.Round(transferSpeed, 2) + " kb / s";
}
}
}
#region Remaining Stream-methods
public override long Position
{
get
{
long valueToCompare = 0;
for (int index = 0; index < currentPart; index++)
{
valueToCompare += parts[index].Value;
}
return valueToCompare + currentStream.Position;
}
set
{
long valueToCompare = 0;
for (int index = 0; index < parts.Count; index++)
{
valueToCompare += parts[index].Value;
if (value <= valueToCompare)
{
currentPart = index;
NextStream();
currentStream.Position = value - (valueToCompare - parts[index].Value);
}
}
}
}
public override bool CanRead
{
get { return true; }
}
public override bool CanSeek
{
get { return false; }
}
public override bool CanWrite
{
get { return false; }
}
public override void Flush()
{
if (currentPart > 0)
{
currentPart = 0;
NextStream();
}
else if (currentPart == 0)
{
currentStream.Flush();
}
}
public override long Length
{
get { return totalStreamSize; }
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException();
}
public override void SetLength(long value)
{
throw new NotSupportedException();
}
public override void Write(byte[] buffer, int offset, int count)
{
throw new NotSupportedException();
}
#endregion
}
}
其他相关方法:
public string GetSingleFileUrl(string file)
{
return String.Format("{0}/Zip/{1}.zip", this.GetBaseUrl(), file.Replace(@"\", "/"));
}
public string GetMultiFileUrl(string file, int part)
{
return String.Format("{0}/Zip/{1}.z{2:D2}", this.GetBaseUrl(), file.Replace(@"\", "/"), part);
}
这个 class 并不完美,因为它是为特定文件源严格编写的,但它完成了它的工作。如果我最终将它改进到我真正满意的程度,我一定会更新我的回复。
目前我正在使用 HttpWebRequest 和 HttpWebResponse 检查 StatusCode 并下载文件。
我正在下载的文件是 .zip 文件(我首先尝试)或.zip 文件的一部分(.z01、.z02 等)。由于部分文件的大小始终为 20 兆字节(在我的例子中),我可以检查我是否恰好下载了 20 兆字节,如果是,则尝试继续下一部分。如果没有,我假设我正在下载最后一部分。
为了在 Java 中实现这一点,我将扩展 InputStream 并覆盖其读取方法和 return 其中的下一个流,从当前流读取或关闭它并使用 HttpURLConnection 检索下一个流 - 如果没有下一个流,则假设它是最后一个流。
我 尝试在 c# 中扩展 Stream,但是由于 Stream-class 是抽象的,我必须完全实现一些我真的不需要/不想实现的方法。 BufferedStream被密封。所以我唯一的选择是扩展 MemoryStream 如果我不想实现 Stream-class 的所有方法。
在 c# 中执行此操作的好方法是什么?
我需要流的原因是,我正在处理流 (unzipping/decoding) 它 - 它已经可以完美地处理单个 .zip 文件。因为我还可以指定偏移量,所以我可以从偏移量下载。
对于多部分流,我会将给定的偏移量除以 20 兆字节,将其转换为正确的部分文件名。
编辑: 我拒绝将 zip 和 decode 添加为标签,因为我下载的文件类型并不重要(顺便说一句:这不是真正的zip,而是一个 LZMA 编码的文件,我对其进行了解码)。
只需在不需要实际实现的方法中执行此操作...
throw new NotImplementedException();
我的方法可能不是最好的方法,我也没有测试 Position 属性,但它可以完成工作。
我从 Stream 派生了一个 class 并围绕它实现了一个包装器,它处理设置 currentStream-object.
服务器上的文件通常以 .zip 或 .z01、.z02、.z03 等格式保存。MultiPartStream(string) 构造函数让 MultiPartStream "guess" 正确 URL。 GetPart() 通常会在检查响应时抑制错误。 MultiPartStream(string, string) 构造函数只允许下载文件,同时还返回 Transferspeed。
我可能会随时更改此 class,但这已经满足了我的需要,并且由于我不喜欢有一个开放的问题,所以我决定提供一个对我有用的回复。如果我最终改变了很多,我可能会稍后更新它。
虽然考虑到这一点,但我可能会改为 classed NetworkStream。也许我会改变它,但到目前为止,这个 MultiPartStream 运行良好,它实际上使用了我拥有的所有带宽。此外,它还有 TotalStreamSize 和 Transferspeed。
MultiPartStream.cs:
using MyProject.Logic;
using System;
using System.Collections.Generic;
using System.Collections.Specialized;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading.Tasks;
namespace MyProject.Model
{
public class MultiPartStream : Stream
{
private HttpWebResponse response;
private Stream currentStream;
private List<KeyValuePair<string, long>> parts = new List<KeyValuePair<string,long>>();
private Stopwatch timer = new Stopwatch();
private long totalStreamSize;
private long bytesRead;
private string file;
private int currentPart = 0;
public MultiPartStream(string file)
{
this.file = file;
GetParts();
NextStream();
}
public MultiPartStream(string file, string url)
{
this.file = file;
GetPart(url, false);
NextStream();
}
public override int Read(byte[] buffer, int offset, int count)
{
if (!timer.IsRunning)
timer.Start();
int bytesRead = currentStream.Read(buffer, offset, count);
if (bytesRead <= 0 && currentPart < parts.Count - 1)
{
// if we cannot read from the Stream anymore, we open the next Stream if one is available
currentPart++;
NextStream();
bytesRead = Read(buffer, offset, count);
}
this.bytesRead += bytesRead;
return (int)bytesRead;
}
protected override void Dispose(bool disposing)
{
timer.Stop();
base.Dispose(disposing);
}
private void GetParts()
{
if (!this.GetPart(DownloadHelper.Instance.GetSingleFileUrl(file)))
{
int filePart = 1;
while (this.GetPart(DownloadHelper.Instance.GetMultiFileUrl(file, filePart)))
{
filePart++;
}
}
}
private bool GetPart(string url, bool suppressApplicationException = true)
{
bool thisPartAdded = false;
HttpWebRequest request = HttpWebRequest.CreateHttp(url);
try
{
HttpWebResponse response = (HttpWebResponse)request.GetResponse();
if (request.HaveResponse && response.StatusCode == HttpStatusCode.OK)
{
parts.Add(new KeyValuePair<string, long>(url, response.ContentLength));
totalStreamSize += response.ContentLength;
thisPartAdded = true;
}
response.Close();
}
catch (WebException e)
{
if (!suppressApplicationException)
throw new ApplicationException(String.Format("Error: cannot access file '{0}' ({1}: {2})", url, e.Status, e.Message));
}
return thisPartAdded;
}
private void NextStream()
{
if (response != null)
{
response.Close();
response = null;
}
HttpWebRequest request = HttpWebRequest.CreateHttp(parts[currentPart].Key);
try
{
response = (HttpWebResponse)request.GetResponse();
if (request.HaveResponse && response.StatusCode == HttpStatusCode.OK)
{
currentStream = response.GetResponseStream();
}
}
catch (WebException e)
{
throw new ApplicationException(String.Format("Error: could not open file '{0}' for streaming ({1}: {2})", parts[currentPart].Key, e.Status, e.Message));
}
}
public long TotalStreamSize
{
get { return totalStreamSize; }
}
public string TransferSpeed
{
get
{
float transferSpeed = (float)(bytesRead / 1024f) / (timer.ElapsedMilliseconds / 1000f);
if (transferSpeed > 1024f)
{
return "@ " + Math.Round(transferSpeed / 1024f, 2) + " mb / s";
}
else
{
return "@ " + Math.Round(transferSpeed, 2) + " kb / s";
}
}
}
#region Remaining Stream-methods
public override long Position
{
get
{
long valueToCompare = 0;
for (int index = 0; index < currentPart; index++)
{
valueToCompare += parts[index].Value;
}
return valueToCompare + currentStream.Position;
}
set
{
long valueToCompare = 0;
for (int index = 0; index < parts.Count; index++)
{
valueToCompare += parts[index].Value;
if (value <= valueToCompare)
{
currentPart = index;
NextStream();
currentStream.Position = value - (valueToCompare - parts[index].Value);
}
}
}
}
public override bool CanRead
{
get { return true; }
}
public override bool CanSeek
{
get { return false; }
}
public override bool CanWrite
{
get { return false; }
}
public override void Flush()
{
if (currentPart > 0)
{
currentPart = 0;
NextStream();
}
else if (currentPart == 0)
{
currentStream.Flush();
}
}
public override long Length
{
get { return totalStreamSize; }
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException();
}
public override void SetLength(long value)
{
throw new NotSupportedException();
}
public override void Write(byte[] buffer, int offset, int count)
{
throw new NotSupportedException();
}
#endregion
}
}
其他相关方法:
public string GetSingleFileUrl(string file)
{
return String.Format("{0}/Zip/{1}.zip", this.GetBaseUrl(), file.Replace(@"\", "/"));
}
public string GetMultiFileUrl(string file, int part)
{
return String.Format("{0}/Zip/{1}.z{2:D2}", this.GetBaseUrl(), file.Replace(@"\", "/"), part);
}
这个 class 并不完美,因为它是为特定文件源严格编写的,但它完成了它的工作。如果我最终将它改进到我真正满意的程度,我一定会更新我的回复。