从事件中处理组合的可观察量
Disposing combined observables from events
我正在编写一个套接字服务器,它应该处理从连接到它的每个客户端接收到的每条消息。
所有消息都在一个可观察对象中排队,因此可以在外部订阅和观察。
为了确保所有客户端套接字消息都放在同一个可观察对象上,我使用了以下代码片段:
private Subject<IObservable<Msg>> _msgSubject;
public IObservable<Msg> Messages { get; private set; }
public SocketServer()
{
// ...
// Initialization of server socket
// ...
_msgSubject = new Subject<IObservable<Msg>>();
Messages = _msgSubject.Merge();
}
private void OnNewConnection(ISocket socket)
{
var evtObservable = Observable.FromEvent<Action<byte[]>, byte[]>(action => action.Invoke, h => socket.OnMessage += h, h => socket.OnMessage -= h);
_msgSubject.OnNext(evtObservable);
}
现在,我检查了内存的(取消)分配,问题是即使正确关闭了套接字,仍然有相关可观察对象的引用添加到主题中;此外,永远不会调用事件的注销。
所以,问题来了:有没有办法强制从主题中删除 "socket observable"?
也许触发 socket observable 的 OnComplete 的东西应该可以完成这项工作,但是怎么做呢?
目前,您无论如何都不会公开以表示内部 Observable 序列的终止(OnComplete
或 OnError
)。
如果消费者碰巧处理订阅,您允许注销事件处理程序,但这是消费者驱动的。
您是否有任何在 ISocket
接口上引发的表示套接字关闭的事件?
如果是这样,您可以在其周围添加一个 Observable.FromEvent
包装器,然后使用该可观察序列来终止 OnMessage
序列。
var closeObservable = Observable.FromEvent<Action<byte[]>, byte[]>(
action => action.Invoke,
h => socket.OnClose+= h,
h => socket.OnClose -= h);
var msgObservable = Observable.FromEvent<Action<byte[]>, byte[]>(
action => action.Invoke,
h => socket.OnMessage += h,
h => socket.OnMessage -= h);
msgObservable.TakeUntil(closeObservable)
我正在编写一个套接字服务器,它应该处理从连接到它的每个客户端接收到的每条消息。
所有消息都在一个可观察对象中排队,因此可以在外部订阅和观察。
为了确保所有客户端套接字消息都放在同一个可观察对象上,我使用了以下代码片段:
private Subject<IObservable<Msg>> _msgSubject;
public IObservable<Msg> Messages { get; private set; }
public SocketServer()
{
// ...
// Initialization of server socket
// ...
_msgSubject = new Subject<IObservable<Msg>>();
Messages = _msgSubject.Merge();
}
private void OnNewConnection(ISocket socket)
{
var evtObservable = Observable.FromEvent<Action<byte[]>, byte[]>(action => action.Invoke, h => socket.OnMessage += h, h => socket.OnMessage -= h);
_msgSubject.OnNext(evtObservable);
}
现在,我检查了内存的(取消)分配,问题是即使正确关闭了套接字,仍然有相关可观察对象的引用添加到主题中;此外,永远不会调用事件的注销。
所以,问题来了:有没有办法强制从主题中删除 "socket observable"?
也许触发 socket observable 的 OnComplete 的东西应该可以完成这项工作,但是怎么做呢?
目前,您无论如何都不会公开以表示内部 Observable 序列的终止(OnComplete
或 OnError
)。
如果消费者碰巧处理订阅,您允许注销事件处理程序,但这是消费者驱动的。
您是否有任何在 ISocket
接口上引发的表示套接字关闭的事件?
如果是这样,您可以在其周围添加一个 Observable.FromEvent
包装器,然后使用该可观察序列来终止 OnMessage
序列。
var closeObservable = Observable.FromEvent<Action<byte[]>, byte[]>(
action => action.Invoke,
h => socket.OnClose+= h,
h => socket.OnClose -= h);
var msgObservable = Observable.FromEvent<Action<byte[]>, byte[]>(
action => action.Invoke,
h => socket.OnMessage += h,
h => socket.OnMessage -= h);
msgObservable.TakeUntil(closeObservable)