从事件中处理组合的可观察量

Disposing combined observables from events

我正在编写一个套接字服务器,它应该处理从连接到它的每个客户端接收到的每条消息。

所有消息都在一个可观察对象中排队,因此可以在外部订阅和观察。

为了确保所有客户端套接字消息都放在同一个可观察对象上,我使用了以下代码片段:

private Subject<IObservable<Msg>> _msgSubject;
public IObservable<Msg> Messages { get; private set; }

public SocketServer()
{
    // ...
    // Initialization of server socket
    // ...
    _msgSubject = new Subject<IObservable<Msg>>();
    Messages = _msgSubject.Merge();
}

private void OnNewConnection(ISocket socket)
{
    var evtObservable = Observable.FromEvent<Action<byte[]>, byte[]>(action => action.Invoke, h => socket.OnMessage += h, h => socket.OnMessage -= h);

    _msgSubject.OnNext(evtObservable);
}

现在,我检查了内存的(取消)分配,问题是即使正确关闭了套接字,仍然有相关可观察对象的引用添加到主题中;此外,永远不会调用事件的注销。

所以,问题来了:有没有办法强制从主题中删除 "socket observable"?

也许触发 socket observable 的 OnComplete 的东西应该可以完成这项工作,但是怎么做呢?

目前,您无论如何都不会公开以表示内部 Observable 序列的终止(OnCompleteOnError)。 如果消费者碰巧处理订阅,您允许注销事件处理程序,但这是消费者驱动的。

您是否有任何在 ISocket 接口上引发的表示套接字关闭的事件? 如果是这样,您可以在其周围添加一个 Observable.FromEvent 包装器,然后使用该可观察序列来终止 OnMessage 序列。

var closeObservable = Observable.FromEvent<Action<byte[]>, byte[]>(
    action => action.Invoke, 
    h => socket.OnClose+= h, 
    h => socket.OnClose -= h);

var msgObservable = Observable.FromEvent<Action<byte[]>, byte[]>(
    action => action.Invoke, 
    h => socket.OnMessage += h, 
    h => socket.OnMessage -= h);

msgObservable.TakeUntil(closeObservable)