将可观察字节数组转换为对象

Transformation of observable byte array to objects

我第一次在一个项目中使用 Reactive,我 运行 遇到了一个性能非常重要的问题。

概览:

我正在通过 TCP 套接字检索大量数据,我必须将这些数据解析为对象并插入到数据库中。每条消息都具有以下签名:

<payload-size> <payload>

其中 size 是 uint32 (4kb),它以字节为单位描述了以下有效负载的大小。

问题:

我想使用 Reactive Framework 提供的功能来并行执行以下步骤(见下文)以最大限度地提高性能并避免成为瓶颈。此外,我要求 'best practices' 来实现这个。

TCP Socket ---> Observable (ArraySegment<byte>) --> Observable (Message)

我已经实现了以下代码,它为我提供了 Observable (ArraySegment<byte>)

IObservable<TcpClient> observableTcpClient = endPoint.ListenerObservable(1);
IObservable<ArraySegment<byte>> observableSocket = observableTcpClient
.SelectMany(client => client.ToClientObservable(bufferSize));

我现在想运行将Observable (ArraySegment<byte>)变成Observable (Message)。我的第一个解决方案看起来有点像这样,因为我可以像流一样使用可观察对象。

Read continous bytestream from Stream using TcpClient and Reactive Extensions

问题:

是否可以(以及如何)使用以下方法创建可观察对象?或者有没有更好的方法可以推荐?我真的很感激一个很好的例子。

注意:Observable (ArraySegment) 的行为就像一个流,所以我不知道它推送给我的数据大小。 (我是否需要实施某种缓冲区,或者 Reactive Framework 可以帮助我吗?)

    Observable (ArraySegment<byte>) 
    --> Buffer(4kb) 
    --> ReadSize --> Buffer(payload-size) 
    --> ReadPayload 
    --> Parse Payload
    --> (Start over)

提前致谢! :)

编辑: 在 Dimitri 发表评论后,我在下面提出了修改后的解决方案。有一行需要拼命重构,但它似乎工作..

Window 使用了重载,因此我们可以编写自定义缓冲。

var hinge = new Subject<Unit>();

observableSocket
.SelectMany(i => i) // to IObservable<byte> 
.Window(() => hinge) // kinda-like-buffer
.Select(buff =>
{    
    return
        from size in buff.Buffer(SIZEOFUINT32).Select(ConvertToUINT32)
        from payload in buff.Buffer(size)
        //Refactor line below! Window must be closed somehow..
        from foo in Observable.Return(Unit.Default).Do( _ => hinge.OnNext(Unit.Default)) 
        select payload;                     
})
.SelectMany(i=>i)
.ObserveOn(ThreadPoolScheduler.Instance)
.Select(ConvertToMessage);

编辑 2: 删除旧解决方案

这是我最终使用的解决方案。请随意评论可能的改进。

    public static IObservable<DataMessage> Convert(IObservable<ArraySegment<byte>> bytes)
            {
                const int headerSize = 12; // bytes

                return bytes.Scan(
                    new
                    {
                        Leftovers = new byte[0],
                        Messages = new List<DataMessage>(),
                        Header = (Header) null
                    },
                    (saved, current) =>
                    {
                        var data = ConcatdArrays(saved.Leftovers, current.ToArray());
                        var messages = new List<DataMessage>();
                        var header = saved.Header;

                        while (true)
                        {
                            // Header
                            if (header == null && data.Count >= headerSize)
                            {
                                header = ReadHeader(ref data, headerSize);
                            }

                            // Payload
                            else if (header != null)
                            {
                                var type = header.Type;
                                var size = DataItem.Size(type);

                                if (data.Count < size) break; // Still missing data

                                // Create new message with the gathered data
                                var payload = ReadPayload(ref data, size);
                                messages.Add(new DataMessage(header, payload));
                                header = null;
                            }

                            // Can't do more with the available data - try again next round.
                            else
                            {
                                break;
                            }
                        }

                        return new
                        {
                            Leftovers = data.ToArray(),
                            Messages = messages,
                            Header = header
                        };
                    }).SelectMany(list => list.Messages);