如何将写入 Stream 1 的内容通过管道传输到 Stream 2?

How to pipe what is written to Stream 1 into Stream 2?

这是我的场景:

producer.WriteStream(stream);
consumer.ReadStream(stream);

我想要一些允许 producer 生成的字节逐步传输到 consumer 的东西。

我可以将所有内容写入 MemoryStream,然后倒带并在 consumer 上读取它,但这会导致大量内存消耗。

我怎样才能做到这一点?

使用管道作为数据的底层传输,您可以拥有一个允许这种通信机制的 "write stream"(服务器)和一个 "read stream"(客户端)。

这很简单,可以使用匿名管道或命名管道(如果您需要进程间通信)。要创建管道流:

AnonymousPipeServerStream pipeServer = new AnonymousPipeServerStream();
AnonymousPipeClientStream pipeClient =
  new AnonymousPipeClientStream(pipeServer.GetClientHandleAsString());

现在您可以使用这些作为读写:

producer.WriteStream(pipeServer);
// somewhere else...
consumer.ReadStream(pipeClient);

我只是为了好玩而把它放在一起,它未经测试,可能有一些错误。您只需将 ReaderStream 传递给 reader 并将 WriterStream 传递给作者。

public class LoopbackStream
{
    public Stream ReaderStream { get; }
    public Stream WriterStream { get;}

    private readonly BlockingCollection<byte[]> _buffer;

    public LoopbackStream()
    {
        _buffer = new BlockingCollection<byte[]>();
        ReaderStream = new ReaderStreamInternal(_buffer);
        WriterStream = new WriterStreamInternal(_buffer);
    }

    private class WriterStreamInternal : Stream
    {
        private readonly BlockingCollection<byte[]> _buffer;

        public WriterStreamInternal(BlockingCollection<byte[]> buffer)
        {
            _buffer = buffer;
            CanRead = false;
            CanWrite = false;
            CanSeek = false;
        }

        public override void Close()
        {
            _buffer.CompleteAdding();
        }

        public override int Read(byte[] buffer, int offset, int count)
        {
            throw new NotSupportedException();
        }

        public override void Write(byte[] buffer, int offset, int count)
        {
            var newData = new byte[count];
            Array.Copy(buffer, offset, newData, 0, count);
            _buffer.Add(newData);
        }

        public override void Flush()
        {
        }

        public override long Seek(long offset, SeekOrigin origin)
        {
            throw new NotSupportedException();
        }

        public override void SetLength(long value)
        {
            throw new NotSupportedException();
        }

        public override bool CanRead { get; }
        public override bool CanSeek { get; }
        public override bool CanWrite { get; }

        public override long Length
        {
            get { throw new NotSupportedException(); }
        }

        public override long Position
        {
            get { throw new NotSupportedException(); }
            set { throw new NotSupportedException(); }
        }
    }
    private class ReaderStreamInternal : Stream
    {
        private readonly BlockingCollection<byte[]> _buffer;
        private readonly IEnumerator<byte[]> _readerEnumerator;
        private byte[] _currentBuffer;
        private int _currentBufferIndex = 0;

        public ReaderStreamInternal(BlockingCollection<byte[]> buffer)
        {
            _buffer = buffer;
            CanRead = true;
            CanWrite = false;
            CanSeek = false;
            _readerEnumerator = _buffer.GetConsumingEnumerable().GetEnumerator();
        }

        protected override void Dispose(bool disposing)
        {
            if (disposing)
            {
                _readerEnumerator.Dispose();
            }
            base.Dispose(disposing);
        }

        public override int Read(byte[] buffer, int offset, int count)
        {
            if (_currentBuffer == null)
            {
                bool read = _readerEnumerator.MoveNext();
                if (!read)
                    return 0;
                _currentBuffer = _readerEnumerator.Current;
            }

            var remainingBytes = _currentBuffer.Length - _currentBufferIndex;
            var readBytes = Math.Min(remainingBytes, count);
            Array.Copy(_currentBuffer, _currentBufferIndex, buffer, offset, readBytes);
            _currentBufferIndex += readBytes;

            if (_currentBufferIndex == _currentBuffer.Length)
                _currentBuffer = null;

            return readBytes;
        }

        public override void Write(byte[] buffer, int offset, int count)
        {
            throw new NotSupportedException();
        }

        public override void Flush()
        {
        }

        public override long Seek(long offset, SeekOrigin origin)
        {
            throw new NotSupportedException();
        }

        public override void SetLength(long value)
        {
            throw new NotSupportedException();
        }

        public override bool CanRead { get; }
        public override bool CanSeek { get; }
        public override bool CanWrite { get; }

        public override long Length
        {
            get { throw new NotSupportedException(); }
        }

        public override long Position
        {
            get { throw new NotSupportedException(); }
            set { throw new NotSupportedException(); }
        }
    }
}