Observable TcpListener 在单连接后终止
Observable TcpListener terminates after single connection
我是 Rx 的新手,所以我可能在这里犯了一些基本错误。
我想创建一个非常简单的套接字服务器,它可以使用 Observables 从客户端接收消息。为此,我使用了 Rxx,它在 System.Net.Sockets 命名空间中提供了扩展方法,还提供了 ObserableTcpListener 静态工厂 class.
这是我目前所拥有的,几乎是从各种来源偷来的:
IPEndPoint endpoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), 9001);
TcpListener listener = new TcpListener(endpoint);
IObservable<TcpClient> clients = listener
.StartSocketObservable(1)
.SelectMany<Socket, TcpClient>(socket => SocketToTcpClient(socket));
.Finally(listener.Stop)
clients.Subscribe(client =>
{
OnConnect(client).Subscribe(
message => OnMessage(client, message),
ex => OnException(client, ex),
() => OnCompleted(client));
});
private static IObservable<TcpClient> SocketToTcpClient(Socket socket)
{
TcpClient client = new TcpClient();
client.Client = socket;
return Observable.Return<TcpClient>(client);
}
private static IObservable<byte[]> OnConnect(TcpClient client)
{
return client.Client.ReceiveUntilCompleted(SocketFlags.None);
}
private static void OnMessage(TcpClient client, byte[] message)
{
Console.WriteLine("Mesage Received! - {0}", Encoding.UTF8.GetString(message));
}
private static void OnCompleted(TcpClient client)
{
Console.WriteLine("Completed.");
}
private static void OnException(TcpClient client, Exception ex)
{
Console.WriteLine("Exception: {0}", ex.ToString());
}
这在某种程度上是有效的。我可以建立一个客户端连接。一旦该连接终止,看起来 Observable 序列就会终止并且 .Finally(listener.Stop)
被调用。显然,这不是我想要的。
我尝试使用 ObserableTcpListener.Start()
工厂 class,但得到的结果完全相同。
IObservable<TcpClient> sockets = ObservableTcpListener.Start(endpoint);
sockets.Subscribe(client =>
{
OnConnect(client).Subscribe(
message => OnMessage(client, message),
ex => OnException(client, ex),
() => OnCompleted(client));
});
我想我确实理解这里的问题:clients
可观察序列在第一个客户端终止后只是空的,因此调用了 .Finally(listener.Stop)
。
我需要做什么来避免这种情况?我怎样才能继续监听传入的连接?
让你的Observable
热门并在有订阅的情况下坚持下去。
IObservable<TcpClient> clients = listener
.StartSocketObservable(1)
.SelectMany<Socket, TcpClient>(socket => SocketToTcpClient(socket))
.Finally(listener.Stop)
.Publish().RefCount();
我是 Rx 的新手,所以我可能在这里犯了一些基本错误。
我想创建一个非常简单的套接字服务器,它可以使用 Observables 从客户端接收消息。为此,我使用了 Rxx,它在 System.Net.Sockets 命名空间中提供了扩展方法,还提供了 ObserableTcpListener 静态工厂 class.
这是我目前所拥有的,几乎是从各种来源偷来的:
IPEndPoint endpoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), 9001);
TcpListener listener = new TcpListener(endpoint);
IObservable<TcpClient> clients = listener
.StartSocketObservable(1)
.SelectMany<Socket, TcpClient>(socket => SocketToTcpClient(socket));
.Finally(listener.Stop)
clients.Subscribe(client =>
{
OnConnect(client).Subscribe(
message => OnMessage(client, message),
ex => OnException(client, ex),
() => OnCompleted(client));
});
private static IObservable<TcpClient> SocketToTcpClient(Socket socket)
{
TcpClient client = new TcpClient();
client.Client = socket;
return Observable.Return<TcpClient>(client);
}
private static IObservable<byte[]> OnConnect(TcpClient client)
{
return client.Client.ReceiveUntilCompleted(SocketFlags.None);
}
private static void OnMessage(TcpClient client, byte[] message)
{
Console.WriteLine("Mesage Received! - {0}", Encoding.UTF8.GetString(message));
}
private static void OnCompleted(TcpClient client)
{
Console.WriteLine("Completed.");
}
private static void OnException(TcpClient client, Exception ex)
{
Console.WriteLine("Exception: {0}", ex.ToString());
}
这在某种程度上是有效的。我可以建立一个客户端连接。一旦该连接终止,看起来 Observable 序列就会终止并且 .Finally(listener.Stop)
被调用。显然,这不是我想要的。
我尝试使用 ObserableTcpListener.Start()
工厂 class,但得到的结果完全相同。
IObservable<TcpClient> sockets = ObservableTcpListener.Start(endpoint);
sockets.Subscribe(client =>
{
OnConnect(client).Subscribe(
message => OnMessage(client, message),
ex => OnException(client, ex),
() => OnCompleted(client));
});
我想我确实理解这里的问题:clients
可观察序列在第一个客户端终止后只是空的,因此调用了 .Finally(listener.Stop)
。
我需要做什么来避免这种情况?我怎样才能继续监听传入的连接?
让你的Observable
热门并在有订阅的情况下坚持下去。
IObservable<TcpClient> clients = listener
.StartSocketObservable(1)
.SelectMany<Socket, TcpClient>(socket => SocketToTcpClient(socket))
.Finally(listener.Stop)
.Publish().RefCount();