ZeroMQ (cppzmq) 订阅者跳过第一条消息
ZeroMQ (cppzmq) subscriber skips first message
我正在尝试使用 ZMQ with the CPPZMQ C++ wrapper, as it seems it is the one suggested in C++ Bindings。
client/server (REQ/REP) 似乎工作正常。
当尝试实现一对 publish/subscribe 程序时,第一条消息似乎在订阅者中丢失了。为什么?
publisher.cpp:
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/thread/thread.hpp>
#include <boost/format.hpp>
#include <zmq.hpp>
#include <string>
#include <iostream>
int main()
{
zmq::context_t context(1);
zmq::socket_t publisher(context, ZMQ_PUB);
publisher.bind("tcp://*:5555");
for(int n = 0; n < 3; n++) {
zmq::message_t env1(1);
memcpy(env1.data(), "A", 1);
std::string msg1_str = (boost::format("Hello-%i") % (n + 1)).str();
zmq::message_t msg1(msg1_str.size());
memcpy(msg1.data(), msg1_str.c_str(), msg1_str.size());
std::cout << "Sending '" << msg1_str << "' on topic A" << std::endl;
publisher.send(env1, ZMQ_SNDMORE);
publisher.send(msg1);
zmq::message_t env2(1);
memcpy(env2.data(), "B", 1);
std::string msg2_str = (boost::format("World-%i") % (n + 1)).str();
zmq::message_t msg2(msg2_str.size());
memcpy(msg2.data(), msg2_str.c_str(), msg2_str.size());
std::cout << "Sending '" << msg2_str << "' on topic B" << std::endl;
publisher.send(env2, ZMQ_SNDMORE);
publisher.send(msg2);
boost::this_thread::sleep(boost::posix_time::milliseconds(1000));
}
return 0;
}
subscriber.cpp:
#include <zmq.hpp>
#include <string>
#include <iostream>
int main()
{
zmq::context_t context(1);
zmq::socket_t subscriber(context, ZMQ_SUB);
subscriber.connect("tcp://localhost:5555");
subscriber.setsockopt(ZMQ_SUBSCRIBE, "B", 1);
while(true)
{
zmq::message_t env;
subscriber.recv(&env);
std::string env_str = std::string(static_cast<char*>(env.data()), env.size());
std::cout << "Received envelope '" << env_str << "'" << std::endl;
zmq::message_t msg;
subscriber.recv(&msg);
std::string msg_str = std::string(static_cast<char*>(msg.data()), msg.size());
std::cout << "Received '" << msg_str << "'" << std::endl;
}
return 0;
}
程序输出:
$ ./publisher
Sending 'Hello-1' on topic A
Sending 'World-1' on topic B
Sending 'Hello-2' on topic A
Sending 'World-2' on topic B
Sending 'Hello-3' on topic A
Sending 'World-3' on topic B
$ ./subscriber
Received envelope 'B'
Received 'World-2'
Received envelope 'B'
Received 'World-3'
(注意:订阅者在执行发布者之前执行)
奖金问题: 顺便问一下,是我的印象还是这个 C++ 包装器很低级?我看不到对 std::string 的直接支持,传输简单字符串的代码看起来很冗长。
奖励答案:
ZeroMQ 专为高性能消息传递/信号而设计,因此具有一些设计准则,核心部分已围绕这些准则开发。
零复制和零共享是更广为人知的,零(几乎)延迟可能(有点)挑衅,零保证可能是一个,你最不想听说。
是的,ZeroMQ 不努力提供任何明确的保证(自然地,由于分布式系统世界中常见的许多原因),但是然而它为您提供了一种此类保证 -- 任何消息要么以原子方式传递(即完整、无错误)--要么根本不传递(因此人们确实永远不必支付任何额外费用,与检测和丢弃任何残缺件相关 and/or 损坏的消息负载 ).
所以可能宁愿忘记担心任何未送达的数据包,以及如果这些已送达怎么办等等。您只需尽可能多地获得,其余的不受您的影响( "Late-joiner" 案例可能是被视为边界,其中(如果)一个人处于能够为 "slow-joiner"(s) 强制执行更多时间的位置,那么 none 这种可观察到的差异将改变代码设计,所以宁可尝试将分布式系统设计为对(主要)可能未传递的信号/消息具有鲁棒性。
API?包装纸...
如果对这种详细程度感兴趣,建议阅读 API,因为一些 v2.x,这样人们可以更好地理解所有的想法,这些想法都被放在了努力的背后最高性能(零拷贝驱动的消息准备步骤集,高级 API-消息调用,将被重新发送,内存泄漏预防,高级 IO-thread-Pool 映射以提高 IO 吞吐量/减少延迟/相对优先级等)。
在此之后,人们可能会回顾任何相应的非本地语言绑定(包装器)将这些初始设计努力反映到交叉移植的编程环境中的效果如何(或有多差)。
大多数这样的努力都遇到了麻烦,无法在用户编程舒适度、目标编程环境表达能力约束和最小化内存泄漏或 API-binding/wrapper 质量受损之间找到合理的平衡。 =10=]
公平地说,设计非本地语言绑定是一些最具挑战性的任务之一。因此,我们应该容忍如此勇敢的团队决定涉足这一领域(有时未能反映出所有本土 API 的优势,而不会降低性能 and/or 最初意图的清晰度——不用说,许多本机 API 功能甚至可能被排除在环境之外,无法在此类非本机语言表达能力范围内提供无缝集成,因此在评估 API-binding/wrapper 时要小心(和原始 native-API 将始终有助于找到 ZeroMQ 原始权力的根源)-无论如何-在大多数极端情况下,人们可能会尝试在关键部分内联)。
在the ZeroMQ Guide中找到答案:
There is one more important thing to know about PUB-SUB sockets: you
do not know precisely when a subscriber starts to get messages. Even
if you start a subscriber, wait a while, and then start the publisher,
the subscriber will always miss the first messages that the publisher
sends. This is because as the subscriber connects to the publisher
(something that takes a small but non-zero time), the publisher may
already be sending messages out.
This "slow joiner" symptom hits enough people often enough that we're
going to explain it in detail. Remember that ZeroMQ does asynchronous
I/O, i.e., in the background. Say you have two nodes doing this, in
this order:
Subscriber connects to an endpoint and receives and counts messages.
Publisher binds to an endpoint and immediately sends 1,000 messages.
Then the subscriber will most likely not receive anything. You'll
blink, check that you set a correct filter and try again, and the
subscriber will still not receive anything.
Making a TCP connection involves to and from handshaking that takes
several milliseconds depending on your network and the number of hops
between peers. In that time, ZeroMQ can send many messages. For sake
of argument assume it takes 5 msecs to establish a connection, and
that same link can handle 1M messages per second. During the 5 msecs
that the subscriber is connecting to the publisher, it takes the
publisher only 1 msec to send out those 1K messages.
In Chapter 2 - Sockets and Patterns we'll explain how to synchronize a
publisher and subscribers so that you don't start to publish data
until the subscribers really are connected and ready. There is a
simple and stupid way to delay the publisher, which is to sleep. Don't
do this in a real application, though, because it is extremely fragile
as well as inelegant and slow. Use sleeps to prove to yourself what's
happening, and then wait for Chapter 2 - Sockets and Patterns to see
how to do this right.
The alternative to synchronization is to simply assume that the
published data stream is infinite and has no start and no end. One
also assumes that the subscriber doesn't care what transpired before
it started up. This is how we built our weather client example.
So the client subscribes to its chosen zip code and collects 100
updates for that zip code. That means about ten million updates from
the server, if zip codes are randomly distributed. You can start the
client, and then the server, and the client will keep working. You can
stop and restart the server as often as you like, and the client will
keep working. When the client has collected its hundred updates, it
calculates the average, prints it, and exits.
我正在尝试使用 ZMQ with the CPPZMQ C++ wrapper, as it seems it is the one suggested in C++ Bindings。
client/server (REQ/REP) 似乎工作正常。 当尝试实现一对 publish/subscribe 程序时,第一条消息似乎在订阅者中丢失了。为什么?
publisher.cpp:
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/thread/thread.hpp>
#include <boost/format.hpp>
#include <zmq.hpp>
#include <string>
#include <iostream>
int main()
{
zmq::context_t context(1);
zmq::socket_t publisher(context, ZMQ_PUB);
publisher.bind("tcp://*:5555");
for(int n = 0; n < 3; n++) {
zmq::message_t env1(1);
memcpy(env1.data(), "A", 1);
std::string msg1_str = (boost::format("Hello-%i") % (n + 1)).str();
zmq::message_t msg1(msg1_str.size());
memcpy(msg1.data(), msg1_str.c_str(), msg1_str.size());
std::cout << "Sending '" << msg1_str << "' on topic A" << std::endl;
publisher.send(env1, ZMQ_SNDMORE);
publisher.send(msg1);
zmq::message_t env2(1);
memcpy(env2.data(), "B", 1);
std::string msg2_str = (boost::format("World-%i") % (n + 1)).str();
zmq::message_t msg2(msg2_str.size());
memcpy(msg2.data(), msg2_str.c_str(), msg2_str.size());
std::cout << "Sending '" << msg2_str << "' on topic B" << std::endl;
publisher.send(env2, ZMQ_SNDMORE);
publisher.send(msg2);
boost::this_thread::sleep(boost::posix_time::milliseconds(1000));
}
return 0;
}
subscriber.cpp:
#include <zmq.hpp>
#include <string>
#include <iostream>
int main()
{
zmq::context_t context(1);
zmq::socket_t subscriber(context, ZMQ_SUB);
subscriber.connect("tcp://localhost:5555");
subscriber.setsockopt(ZMQ_SUBSCRIBE, "B", 1);
while(true)
{
zmq::message_t env;
subscriber.recv(&env);
std::string env_str = std::string(static_cast<char*>(env.data()), env.size());
std::cout << "Received envelope '" << env_str << "'" << std::endl;
zmq::message_t msg;
subscriber.recv(&msg);
std::string msg_str = std::string(static_cast<char*>(msg.data()), msg.size());
std::cout << "Received '" << msg_str << "'" << std::endl;
}
return 0;
}
程序输出:
$ ./publisher
Sending 'Hello-1' on topic A
Sending 'World-1' on topic B
Sending 'Hello-2' on topic A
Sending 'World-2' on topic B
Sending 'Hello-3' on topic A
Sending 'World-3' on topic B
$ ./subscriber
Received envelope 'B'
Received 'World-2'
Received envelope 'B'
Received 'World-3'
(注意:订阅者在执行发布者之前执行)
奖金问题: 顺便问一下,是我的印象还是这个 C++ 包装器很低级?我看不到对 std::string 的直接支持,传输简单字符串的代码看起来很冗长。
奖励答案:
ZeroMQ 专为高性能消息传递/信号而设计,因此具有一些设计准则,核心部分已围绕这些准则开发。
零复制和零共享是更广为人知的,零(几乎)延迟可能(有点)挑衅,零保证可能是一个,你最不想听说。
是的,ZeroMQ 不努力提供任何明确的保证(自然地,由于分布式系统世界中常见的许多原因),但是然而它为您提供了一种此类保证 -- 任何消息要么以原子方式传递(即完整、无错误)--要么根本不传递(因此人们确实永远不必支付任何额外费用,与检测和丢弃任何残缺件相关 and/or 损坏的消息负载 ).
所以可能宁愿忘记担心任何未送达的数据包,以及如果这些已送达怎么办等等。您只需尽可能多地获得,其余的不受您的影响( "Late-joiner" 案例可能是被视为边界,其中(如果)一个人处于能够为 "slow-joiner"(s) 强制执行更多时间的位置,那么 none 这种可观察到的差异将改变代码设计,所以宁可尝试将分布式系统设计为对(主要)可能未传递的信号/消息具有鲁棒性。
API?包装纸...
如果对这种详细程度感兴趣,建议阅读 API,因为一些 v2.x,这样人们可以更好地理解所有的想法,这些想法都被放在了努力的背后最高性能(零拷贝驱动的消息准备步骤集,高级 API-消息调用,将被重新发送,内存泄漏预防,高级 IO-thread-Pool 映射以提高 IO 吞吐量/减少延迟/相对优先级等)。
在此之后,人们可能会回顾任何相应的非本地语言绑定(包装器)将这些初始设计努力反映到交叉移植的编程环境中的效果如何(或有多差)。
大多数这样的努力都遇到了麻烦,无法在用户编程舒适度、目标编程环境表达能力约束和最小化内存泄漏或 API-binding/wrapper 质量受损之间找到合理的平衡。 =10=]
公平地说,设计非本地语言绑定是一些最具挑战性的任务之一。因此,我们应该容忍如此勇敢的团队决定涉足这一领域(有时未能反映出所有本土 API 的优势,而不会降低性能 and/or 最初意图的清晰度——不用说,许多本机 API 功能甚至可能被排除在环境之外,无法在此类非本机语言表达能力范围内提供无缝集成,因此在评估 API-binding/wrapper 时要小心(和原始 native-API 将始终有助于找到 ZeroMQ 原始权力的根源)-无论如何-在大多数极端情况下,人们可能会尝试在关键部分内联)。
在the ZeroMQ Guide中找到答案:
There is one more important thing to know about PUB-SUB sockets: you do not know precisely when a subscriber starts to get messages. Even if you start a subscriber, wait a while, and then start the publisher, the subscriber will always miss the first messages that the publisher sends. This is because as the subscriber connects to the publisher (something that takes a small but non-zero time), the publisher may already be sending messages out.
This "slow joiner" symptom hits enough people often enough that we're going to explain it in detail. Remember that ZeroMQ does asynchronous I/O, i.e., in the background. Say you have two nodes doing this, in this order:
Subscriber connects to an endpoint and receives and counts messages. Publisher binds to an endpoint and immediately sends 1,000 messages. Then the subscriber will most likely not receive anything. You'll blink, check that you set a correct filter and try again, and the subscriber will still not receive anything.
Making a TCP connection involves to and from handshaking that takes several milliseconds depending on your network and the number of hops between peers. In that time, ZeroMQ can send many messages. For sake of argument assume it takes 5 msecs to establish a connection, and that same link can handle 1M messages per second. During the 5 msecs that the subscriber is connecting to the publisher, it takes the publisher only 1 msec to send out those 1K messages.
In Chapter 2 - Sockets and Patterns we'll explain how to synchronize a publisher and subscribers so that you don't start to publish data until the subscribers really are connected and ready. There is a simple and stupid way to delay the publisher, which is to sleep. Don't do this in a real application, though, because it is extremely fragile as well as inelegant and slow. Use sleeps to prove to yourself what's happening, and then wait for Chapter 2 - Sockets and Patterns to see how to do this right.
The alternative to synchronization is to simply assume that the published data stream is infinite and has no start and no end. One also assumes that the subscriber doesn't care what transpired before it started up. This is how we built our weather client example.
So the client subscribes to its chosen zip code and collects 100 updates for that zip code. That means about ten million updates from the server, if zip codes are randomly distributed. You can start the client, and then the server, and the client will keep working. You can stop and restart the server as often as you like, and the client will keep working. When the client has collected its hundred updates, it calculates the average, prints it, and exits.