NetMQ 订阅者使用已发布的消息进行阻塞
NetMQ subscriber blocking with a published message
我有一个消息发布服务器class,它在构造时使用以下代码创建一个 PUB 套接字:
this.context = NetMQContext.Create();
this.pubSocket = this.context.CreatePublisherSocket();
var portNumber = this.installerSettings.PublisherPort;
this.pubSocket.Bind("tcp://127.0.0.1:" + portNumber);
使用 messagePublishingServer.Publish(message)
发送消息执行:
this.pubSocket.SendMoreFrame(string.Empty).SendFrame(message);
以下 xBehave 测试...
[Scenario]
public void PublishMessageScenario()
{
MessagePublishingServer messagePublishingServer = null;
NetMQContext context;
NetMQ.Sockets.SubscriberSocket subSocket = null;
string receivedMessage = null;
"Given a running message publishing server"._(() =>
{
var installerSettingsManager = A.Fake<IInstallerSettingsManager>();
var settings = new InstallerSettings { PublisherPort = "5348" };
A.CallTo(() => installerSettingsManager.Settings).Returns(settings);
messagePublishingServer = new MessagePublishingServer(installerSettingsManager);
});
"And a subscriber connected to the publishing server"._(() =>
{
context = NetMQContext.Create();
subSocket = context.CreateSubscriberSocket();
subSocket.Options.ReceiveHighWatermark = 1000;
subSocket.Connect("tcp://127.0.0.1:5348");
subSocket.Subscribe(string.Empty);
});
"When publishing a message"._(() =>
{
messagePublishingServer.Publish("test message");
// Receive the topic
subSocket.ReceiveFrameString();
// and the message
receivedMessage = subSocket.ReceiveFrameString();
});
"Then the subscriber must have received it"._(() =>
{
receivedMessage.Should().NotBeNullOrEmpty();
receivedMessage.Should().Be("test message");
});
}
...第一个 subSocket.ReceiveFrameString()
中的块,我发现这是意外的。订阅者套接字不应该在接收被调用之前对发布的消息进行排队吗?
来源(ReceivingSocketExtensions.cs
):
/// Receive a single frame from socket, blocking until one arrives, and decode as a string using ...
public static string ReceiveFrameString([NotNull] this IReceivingSocket socket)
和
/// If no message is immediately available, return <c>false</c>.
public static bool TryReceiveFrameString([NotNull] this IReceivingSocket socket, out string frameString)
发布者就像收音机,如果发布者发布时您没有连接和订阅,您就会错过消息。我的建议是在订阅者连接后休眠 100 毫秒(仅用于测试)。
我有一个消息发布服务器class,它在构造时使用以下代码创建一个 PUB 套接字:
this.context = NetMQContext.Create();
this.pubSocket = this.context.CreatePublisherSocket();
var portNumber = this.installerSettings.PublisherPort;
this.pubSocket.Bind("tcp://127.0.0.1:" + portNumber);
使用 messagePublishingServer.Publish(message)
发送消息执行:
this.pubSocket.SendMoreFrame(string.Empty).SendFrame(message);
以下 xBehave 测试...
[Scenario]
public void PublishMessageScenario()
{
MessagePublishingServer messagePublishingServer = null;
NetMQContext context;
NetMQ.Sockets.SubscriberSocket subSocket = null;
string receivedMessage = null;
"Given a running message publishing server"._(() =>
{
var installerSettingsManager = A.Fake<IInstallerSettingsManager>();
var settings = new InstallerSettings { PublisherPort = "5348" };
A.CallTo(() => installerSettingsManager.Settings).Returns(settings);
messagePublishingServer = new MessagePublishingServer(installerSettingsManager);
});
"And a subscriber connected to the publishing server"._(() =>
{
context = NetMQContext.Create();
subSocket = context.CreateSubscriberSocket();
subSocket.Options.ReceiveHighWatermark = 1000;
subSocket.Connect("tcp://127.0.0.1:5348");
subSocket.Subscribe(string.Empty);
});
"When publishing a message"._(() =>
{
messagePublishingServer.Publish("test message");
// Receive the topic
subSocket.ReceiveFrameString();
// and the message
receivedMessage = subSocket.ReceiveFrameString();
});
"Then the subscriber must have received it"._(() =>
{
receivedMessage.Should().NotBeNullOrEmpty();
receivedMessage.Should().Be("test message");
});
}
...第一个 subSocket.ReceiveFrameString()
中的块,我发现这是意外的。订阅者套接字不应该在接收被调用之前对发布的消息进行排队吗?
来源(ReceivingSocketExtensions.cs ):
/// Receive a single frame from socket, blocking until one arrives, and decode as a string using ...
public static string ReceiveFrameString([NotNull] this IReceivingSocket socket)
和
/// If no message is immediately available, return <c>false</c>.
public static bool TryReceiveFrameString([NotNull] this IReceivingSocket socket, out string frameString)
发布者就像收音机,如果发布者发布时您没有连接和订阅,您就会错过消息。我的建议是在订阅者连接后休眠 100 毫秒(仅用于测试)。