在 NetMQ 中通过 ROUTER-ROUTER 重新连接后未传递消息
Messages are not delivered after reconnect over ROUTER-ROUTER in NetMQ
我与 NetMQ 的 ROUTER-ROUTER 通信有问题。
两个应用程序,客户端和服务器。两者都使用 ROUTER 套接字。客户端 ROUTER 套接字没有明确设置它的身份,只有服务器。
第一次建立连接后,消息从客户端(ROUTER)路由到服务器(ROUTER)。
但是一旦客户端调用 Disconnect() 然后再次调用 Connect()(在很短的时间后,即 10 毫秒),服务器就不再接收消息了。
有趣的是,如果使用 ZeroMq C# lib,重新连接工作正常。
这是一个已知问题还是我做错了什么?
重现问题的最少代码:
public class PureNetMQ
{
private static readonly byte[] ServerIdentity = {1, 2, 3};
private static readonly byte[] ClientIdentity = {1, 2, 4};
private static readonly string Address = "tcp://127.0.0.1:5000";
private static void Main(string[] args)
{
var cancellationTokenSource = new CancellationTokenSource();
Task.Factory.StartNew(_ => RunServer(cancellationTokenSource.Token), cancellationTokenSource.Token, TaskCreationOptions.LongRunning);
Thread.Sleep(TimeSpan.FromSeconds(2));
RunClient();
Console.WriteLine("Done");
Console.ReadLine();
cancellationTokenSource.Cancel(true);
}
private static void RunClient()
{
using (var context = NetMQContext.Create())
{
using (NetMQSocket socket = context.CreateRouterSocket())
{
//socket.Options.Identity = Guid.NewGuid().ToByteArray();
//socket.Options.RouterMandatory = true;
socket.Options.Linger = TimeSpan.Zero;
socket.Connect(Address);
Thread.Sleep(TimeSpan.FromSeconds(2));
socket.SendMore(ServerIdentity);
socket.SendMore(Encoding.UTF8.GetBytes(""));
socket.Send(Encoding.UTF8.GetBytes(Guid.NewGuid().ToString()));
Console.WriteLine("Message sent...");
socket.Disconnect(Address);
Thread.Sleep(TimeSpan.FromSeconds(5));
Console.WriteLine("Disconnected");
//socket.Options.Identity = Guid.NewGuid().ToByteArray();
socket.Connect(Address);
Thread.Sleep(TimeSpan.FromSeconds(5));
Console.WriteLine("Reconnected");
socket.SendMore(ServerIdentity);
socket.SendMore(Encoding.UTF8.GetBytes(""));
socket.Send(Encoding.UTF8.GetBytes(Guid.NewGuid().ToString()));
Console.WriteLine("Message sent...");
Thread.Sleep(TimeSpan.FromSeconds(5));
}
}
}
private static void RunServer(CancellationToken token)
{
using (var context = NetMQContext.Create())
{
using (var socket = context.CreateRouterSocket())
{
socket.Options.Identity = ServerIdentity;
//socket.Options.RouterMandatory = true;
socket.Options.Linger = TimeSpan.Zero;
socket.Bind(Address);
while (!token.IsCancellationRequested)
{
var message = socket.ReceiveMessage();
for (var i = 2; i < message.FrameCount; i++)
{
Console.WriteLine("Message: {0}", Encoding.UTF8.GetString(message[i].Buffer));
}
}
}
}
}
}
Zeromq 对此进行了修复,但尚未移植到 NetMQ。我现在移植了,你可以使用它:
https://github.com/somdoron/netmq
它很快应该被合并到 master 中:
https://github.com/zeromq/netmq/pull/373
ZeroMQ 中解决的另一个相关问题是路由器中重复身份的处理。默认情况下,忽略具有标识的第二个套接字。 ZeroMQ 有切换选项,如果启用新套接字接管旧套接字。您应该考虑使用它或将其移植到 netmq。
https://github.com/zeromq/libzmq/pull/729/files
https://github.com/zeromq/libzmq/pull/731
我与 NetMQ 的 ROUTER-ROUTER 通信有问题。
两个应用程序,客户端和服务器。两者都使用 ROUTER 套接字。客户端 ROUTER 套接字没有明确设置它的身份,只有服务器。
第一次建立连接后,消息从客户端(ROUTER)路由到服务器(ROUTER)。 但是一旦客户端调用 Disconnect() 然后再次调用 Connect()(在很短的时间后,即 10 毫秒),服务器就不再接收消息了。
有趣的是,如果使用 ZeroMq C# lib,重新连接工作正常。
这是一个已知问题还是我做错了什么?
重现问题的最少代码:
public class PureNetMQ
{
private static readonly byte[] ServerIdentity = {1, 2, 3};
private static readonly byte[] ClientIdentity = {1, 2, 4};
private static readonly string Address = "tcp://127.0.0.1:5000";
private static void Main(string[] args)
{
var cancellationTokenSource = new CancellationTokenSource();
Task.Factory.StartNew(_ => RunServer(cancellationTokenSource.Token), cancellationTokenSource.Token, TaskCreationOptions.LongRunning);
Thread.Sleep(TimeSpan.FromSeconds(2));
RunClient();
Console.WriteLine("Done");
Console.ReadLine();
cancellationTokenSource.Cancel(true);
}
private static void RunClient()
{
using (var context = NetMQContext.Create())
{
using (NetMQSocket socket = context.CreateRouterSocket())
{
//socket.Options.Identity = Guid.NewGuid().ToByteArray();
//socket.Options.RouterMandatory = true;
socket.Options.Linger = TimeSpan.Zero;
socket.Connect(Address);
Thread.Sleep(TimeSpan.FromSeconds(2));
socket.SendMore(ServerIdentity);
socket.SendMore(Encoding.UTF8.GetBytes(""));
socket.Send(Encoding.UTF8.GetBytes(Guid.NewGuid().ToString()));
Console.WriteLine("Message sent...");
socket.Disconnect(Address);
Thread.Sleep(TimeSpan.FromSeconds(5));
Console.WriteLine("Disconnected");
//socket.Options.Identity = Guid.NewGuid().ToByteArray();
socket.Connect(Address);
Thread.Sleep(TimeSpan.FromSeconds(5));
Console.WriteLine("Reconnected");
socket.SendMore(ServerIdentity);
socket.SendMore(Encoding.UTF8.GetBytes(""));
socket.Send(Encoding.UTF8.GetBytes(Guid.NewGuid().ToString()));
Console.WriteLine("Message sent...");
Thread.Sleep(TimeSpan.FromSeconds(5));
}
}
}
private static void RunServer(CancellationToken token)
{
using (var context = NetMQContext.Create())
{
using (var socket = context.CreateRouterSocket())
{
socket.Options.Identity = ServerIdentity;
//socket.Options.RouterMandatory = true;
socket.Options.Linger = TimeSpan.Zero;
socket.Bind(Address);
while (!token.IsCancellationRequested)
{
var message = socket.ReceiveMessage();
for (var i = 2; i < message.FrameCount; i++)
{
Console.WriteLine("Message: {0}", Encoding.UTF8.GetString(message[i].Buffer));
}
}
}
}
}
}
Zeromq 对此进行了修复,但尚未移植到 NetMQ。我现在移植了,你可以使用它:
https://github.com/somdoron/netmq
它很快应该被合并到 master 中: https://github.com/zeromq/netmq/pull/373
ZeroMQ 中解决的另一个相关问题是路由器中重复身份的处理。默认情况下,忽略具有标识的第二个套接字。 ZeroMQ 有切换选项,如果启用新套接字接管旧套接字。您应该考虑使用它或将其移植到 netmq。
https://github.com/zeromq/libzmq/pull/729/files https://github.com/zeromq/libzmq/pull/731