ZeroMQ C++ 中的多个发布者这是一个好的选择吗?
Multiple publisher in ZeroMQ C++ Is this a good choice or not?
我是 ZeroMQ 的新手。我想创建多个发布者,每个发布者都发布特定的数据,例如:
- 发布者 1:发布图像数据
- 发布者 2:发布音频数据
- 发布者 3:发布文本数据
基本上,我的要求是从多个发布者发布数据,并在另一端使用多个接收者接收。
请看下面的示例代码:
data_publisher.cpp
// Prepare our context and all publishers
zmq::context_t context(1);
zmq::socket_t publisher1(context, ZMQ_PUB);
zmq::socket_t publisher2(context, ZMQ_PUB);
zmq::socket_t publisher3(context, ZMQ_PUB);
zmq::socket_t publisher4(context, ZMQ_PUB);
publisher1.bind("tcp://*:5556");
publisher2.bind("tcp://*:5557");
publisher3.bind("tcp://*:5558");
publisher4.bind("tcp://*:5559");
// Initialize random number generator
srandom((unsigned)time(NULL));
while (1) {
// sample data
int zipcode1 = within(100000);
int zipcode2 = within(100000);
int zipcode3 = within(100000);
int zipcode4 = within(100000);
int temperature1 = within(215) - 80;
int temperature2 = within(215) - 80;
int temperature3 = within(215) - 80;
int temperature4 = within(215) - 80;
int relhumidity1 = within(50) + 10;
int relhumidity2 = within(50) + 10;
int relhumidity3 = within(50) + 10;
int relhumidity4 = within(50) + 10;
zmq::message_t message1(20);
zmq::message_t message2(20);
zmq::message_t message3(20);
zmq::message_t message4(20);
snprintf((char*)message1.data(), 20, "%05d %d %d", zipcode1, temperature1, relhumidity1);
snprintf((char*)message2.data(), 20, "%05d %d %d", zipcode2, temperature2, relhumidity2);
snprintf((char*)message3.data(), 20, "%05d %d %d", zipcode3, temperature3, relhumidity3);
snprintf((char*)message4.data(), 20, "%05d %d %d", zipcode4, temperature4, relhumidity4);
publisher1.send(message1);
publisher2.send(message2);
publisher3.send(message3);
publisher4.send(message4);
}
data_receiver.cpp
zmq::context_t context(1);
// Socket to talk to server
zmq::socket_t subscriber1(context, ZMQ_SUB);
zmq::socket_t subscriber2(context, ZMQ_SUB);
zmq::socket_t subscriber3(context, ZMQ_SUB);
zmq::socket_t subscriber4(context, ZMQ_SUB);
subscriber1.connect("tcp://localhost:5556");
subscriber2.connect("tcp://localhost:5557");
subscriber3.connect("tcp://localhost:5558");
subscriber4.connect("tcp://localhost:5559");
const char* filter = (argc > 1) ? argv[1] : "10001 ";
subscriber1.setsockopt(ZMQ_SUBSCRIBE, filter, strlen(filter));
subscriber2.setsockopt(ZMQ_SUBSCRIBE, filter, strlen(filter));
subscriber3.setsockopt(ZMQ_SUBSCRIBE, filter, strlen(filter));
subscriber4.setsockopt(ZMQ_SUBSCRIBE, filter, strlen(filter));
// Process 100 updates
int update_nbr;
long total_temp1 = 0;
long total_temp2 = 0;
long total_temp3 = 0;
long total_temp4 = 0;
for (update_nbr = 0; update_nbr < 100; update_nbr++)
{
zmq::message_t update1;
zmq::message_t update2;
zmq::message_t update3;
zmq::message_t update4;
int zipcode1, temperature1, relhumidity1;
int zipcode2, temperature2, relhumidity2;
int zipcode3, temperature3, relhumidity3;
int zipcode4, temperature4, relhumidity4;
subscriber1.recv(&update1);
subscriber2.recv(&update2);
subscriber3.recv(&update3);
subscriber4.recv(&update4);
std::istringstream iss1(static_cast<char*>(update1.data()));
std::istringstream iss2(static_cast<char*>(update2.data()));
std::istringstream iss3(static_cast<char*>(update3.data()));
std::istringstream iss4(static_cast<char*>(update4.data()));
iss1 >> zipcode1 >> temperature1 >> relhumidity1;
iss2 >> zipcode2 >> temperature2 >> relhumidity2;
iss3 >> zipcode3 >> temperature3 >> relhumidity3;
iss4 >> zipcode4 >> temperature4 >> relhumidity4;
total_temp1 += temperature1;
total_temp2 += temperature2;
total_temp3 += temperature3;
total_temp4 += temperature4;
}
std::cout << "Average temperature for zipcode '" << filter << "' was "
<< (int)(total_temp1 / update_nbr) << "F" << std::endl;
std::cout << "Average temperature for zipcode '" << filter << "' was "
<< (int)(total_temp2 / update_nbr) << "F" << std::endl;
std::cout << "Average temperature for zipcode '" << filter << "' was "
<< (int)(total_temp3 / update_nbr) << "F" << std::endl;
std::cout << "Average temperature for zipcode '" << filter << "' was "
<< (int)(total_temp4 / update_nbr) << "F" << std::endl;
请注意,以上代码是获取建议/建议的示例代码。
我想知道上面的示例代码是不是一个好的选择?
仍在等待任何量化事实,但让我们开始吧:
ZeroMQ 是一个使用智能启用工具的概念,而低级系统编程被 ZeroMQ 核心元素隐藏,Context
-引擎。
这就是说,正式化为可扩展正式通信模式原型的高级工具提供了某种模仿人类的行为——PUB
出版商确实 "publish"、 SUB
订阅者可以 "subscribe", REQ
请求者可以 "request", REP
回帖确实可以"reply",等等
这些具有行为的接入点可以 .bind()/.connect()
进入某种分布式行为基础设施,一旦被授予一些基本规则。这样的规则之一就是不要理会实际的传输-类',所有这些确实都是功能丰富的技术,目前跨越 { inproc:// | ipc:// | tcp:// | pgm:// | epgm:// | vmci:// }
的景观,低级细节,Context()
-实例将处理所有这些
对您的高层行为透明。忘记这个吧。另一条规则是,您可以确定发送的每条消息要么没有错误地发送,要么根本没有发送 - 没有妥协,没有发送过折磨的垃圾来愚弄或崩溃收件人的 AccessPoint post-processing。
由于未能理解这一点,ZeroMQ 并未最大限度地为我们提供设计到这个豪华工具中的舒适性和强大功能。
回到你的困境:
说了上面几点,你的初级架构还不是很清楚,这里还是可以帮到你的。
ZeroMQ 抽象的分布式行为套接字工具主要是一个纯[SERIAL]
调度设备。这意味着,none 与套接字关联的任何接收接入点 { .bind() | .connect() }
可以期望任意重新排序消息的纯顺序流。
这意味着,在任何情况下,如果 "just"-[CONCURRENT]
进程调度,或者在极端情况下,真正-[PARALLEL]
process-scheduling 是技术上编排的,单个 "pure"-[SERIAL]
交付渠道不允许{ [CONCURRENT] | [PARALLEL] }
-系统继续提供这种流程调度模式,并将事件/处理流程分成 "pure"-[SERIAL]
序列留言。
A ) 这确实 可能是引入多个独立操作的 ZeroMQ 分布式行为 Socket 实例的原因和必须。
B ) 另一方面,由于对全球分布式系统的行为一无所知,还没有人能确定 ,进入多个独立操作的 Socket 实例是否不仅仅是浪费时间和资源,是否由于极度错误或完全缺失的初始工程决策而提供不合理的低于平均水平或不可接受的不良端到端系统行为性能。
性能?
不要在这个域中猜测,永远不要。而是首先从定量声明的需求开始,在此基础上,技术上合理的设计将能够继续并定义资源映射和性能调整到平台限制所需的所有步骤。
ZeroMQ 在过去的 20 年里以极致的性能特征出色地完成了这项工作,设计与工程团队在完善可扩展性和性能范围方面做了大量工作,同时将延迟保持在难以实现的水平由一个临时程序员。确实是隐藏在 ZeroMQ 地下室中的一个很棒的系统编程。
"数据量巨大" -- 好的,定义 大小 -- 交付1E+9
大小为 1 [B]
的消息除了传递具有 的 1E+3 条消息之外还有其他性能调整1.000.000 [B]
大小。
"as fast as possible" -- 好的,定义fast 给定大小 和预期的消息节奏 1/s ~ 1 [Hz]
, 10/s ~ 10 [Hz]
, 1000/s ~ 1 [kHz]
当然,在某些特定情况下,这种混合需求可能会超出当代计算设备的能力范围。在开始任何编程之前,必须对其进行最好的审查,因为否则你只是破坏了一些永远不会飞的东西的编程工作,所以最好有一个解决方案架构在一个范围内可行和可行的积极证据可接受的资源和成本范围。
因此,如果您的项目需要某些东西,首先定义并定量指定实际是什么,接下来解决方案架构可以开始整理并提供决策,哪些工具和哪些工具配置可以匹配定义的功能目标级别和性能目标。
盖房子,从加高屋顶开始永远不会回答如何布置地下室墙的问题,什么是足够但又不过分设计的钢筋混凝土铠装厚度,这将承载数量不详的高层建筑。已经盖好屋顶很容易出现,但与系统严谨的设计和工程实践无关。
我是 ZeroMQ 的新手。我想创建多个发布者,每个发布者都发布特定的数据,例如:
- 发布者 1:发布图像数据
- 发布者 2:发布音频数据
- 发布者 3:发布文本数据
基本上,我的要求是从多个发布者发布数据,并在另一端使用多个接收者接收。
请看下面的示例代码:
data_publisher.cpp
// Prepare our context and all publishers
zmq::context_t context(1);
zmq::socket_t publisher1(context, ZMQ_PUB);
zmq::socket_t publisher2(context, ZMQ_PUB);
zmq::socket_t publisher3(context, ZMQ_PUB);
zmq::socket_t publisher4(context, ZMQ_PUB);
publisher1.bind("tcp://*:5556");
publisher2.bind("tcp://*:5557");
publisher3.bind("tcp://*:5558");
publisher4.bind("tcp://*:5559");
// Initialize random number generator
srandom((unsigned)time(NULL));
while (1) {
// sample data
int zipcode1 = within(100000);
int zipcode2 = within(100000);
int zipcode3 = within(100000);
int zipcode4 = within(100000);
int temperature1 = within(215) - 80;
int temperature2 = within(215) - 80;
int temperature3 = within(215) - 80;
int temperature4 = within(215) - 80;
int relhumidity1 = within(50) + 10;
int relhumidity2 = within(50) + 10;
int relhumidity3 = within(50) + 10;
int relhumidity4 = within(50) + 10;
zmq::message_t message1(20);
zmq::message_t message2(20);
zmq::message_t message3(20);
zmq::message_t message4(20);
snprintf((char*)message1.data(), 20, "%05d %d %d", zipcode1, temperature1, relhumidity1);
snprintf((char*)message2.data(), 20, "%05d %d %d", zipcode2, temperature2, relhumidity2);
snprintf((char*)message3.data(), 20, "%05d %d %d", zipcode3, temperature3, relhumidity3);
snprintf((char*)message4.data(), 20, "%05d %d %d", zipcode4, temperature4, relhumidity4);
publisher1.send(message1);
publisher2.send(message2);
publisher3.send(message3);
publisher4.send(message4);
}
data_receiver.cpp
zmq::context_t context(1);
// Socket to talk to server
zmq::socket_t subscriber1(context, ZMQ_SUB);
zmq::socket_t subscriber2(context, ZMQ_SUB);
zmq::socket_t subscriber3(context, ZMQ_SUB);
zmq::socket_t subscriber4(context, ZMQ_SUB);
subscriber1.connect("tcp://localhost:5556");
subscriber2.connect("tcp://localhost:5557");
subscriber3.connect("tcp://localhost:5558");
subscriber4.connect("tcp://localhost:5559");
const char* filter = (argc > 1) ? argv[1] : "10001 ";
subscriber1.setsockopt(ZMQ_SUBSCRIBE, filter, strlen(filter));
subscriber2.setsockopt(ZMQ_SUBSCRIBE, filter, strlen(filter));
subscriber3.setsockopt(ZMQ_SUBSCRIBE, filter, strlen(filter));
subscriber4.setsockopt(ZMQ_SUBSCRIBE, filter, strlen(filter));
// Process 100 updates
int update_nbr;
long total_temp1 = 0;
long total_temp2 = 0;
long total_temp3 = 0;
long total_temp4 = 0;
for (update_nbr = 0; update_nbr < 100; update_nbr++)
{
zmq::message_t update1;
zmq::message_t update2;
zmq::message_t update3;
zmq::message_t update4;
int zipcode1, temperature1, relhumidity1;
int zipcode2, temperature2, relhumidity2;
int zipcode3, temperature3, relhumidity3;
int zipcode4, temperature4, relhumidity4;
subscriber1.recv(&update1);
subscriber2.recv(&update2);
subscriber3.recv(&update3);
subscriber4.recv(&update4);
std::istringstream iss1(static_cast<char*>(update1.data()));
std::istringstream iss2(static_cast<char*>(update2.data()));
std::istringstream iss3(static_cast<char*>(update3.data()));
std::istringstream iss4(static_cast<char*>(update4.data()));
iss1 >> zipcode1 >> temperature1 >> relhumidity1;
iss2 >> zipcode2 >> temperature2 >> relhumidity2;
iss3 >> zipcode3 >> temperature3 >> relhumidity3;
iss4 >> zipcode4 >> temperature4 >> relhumidity4;
total_temp1 += temperature1;
total_temp2 += temperature2;
total_temp3 += temperature3;
total_temp4 += temperature4;
}
std::cout << "Average temperature for zipcode '" << filter << "' was "
<< (int)(total_temp1 / update_nbr) << "F" << std::endl;
std::cout << "Average temperature for zipcode '" << filter << "' was "
<< (int)(total_temp2 / update_nbr) << "F" << std::endl;
std::cout << "Average temperature for zipcode '" << filter << "' was "
<< (int)(total_temp3 / update_nbr) << "F" << std::endl;
std::cout << "Average temperature for zipcode '" << filter << "' was "
<< (int)(total_temp4 / update_nbr) << "F" << std::endl;
请注意,以上代码是获取建议/建议的示例代码。
我想知道上面的示例代码是不是一个好的选择?
仍在等待任何量化事实,但让我们开始吧:
ZeroMQ 是一个使用智能启用工具的概念,而低级系统编程被 ZeroMQ 核心元素隐藏,Context
-引擎。
这就是说,正式化为可扩展正式通信模式原型的高级工具提供了某种模仿人类的行为——PUB
出版商确实 "publish"、 SUB
订阅者可以 "subscribe", REQ
请求者可以 "request", REP
回帖确实可以"reply",等等
这些具有行为的接入点可以 .bind()/.connect()
进入某种分布式行为基础设施,一旦被授予一些基本规则。这样的规则之一就是不要理会实际的传输-类',所有这些确实都是功能丰富的技术,目前跨越 { inproc:// | ipc:// | tcp:// | pgm:// | epgm:// | vmci:// }
的景观,低级细节,Context()
-实例将处理所有这些
对您的高层行为透明。忘记这个吧。另一条规则是,您可以确定发送的每条消息要么没有错误地发送,要么根本没有发送 - 没有妥协,没有发送过折磨的垃圾来愚弄或崩溃收件人的 AccessPoint post-processing。
由于未能理解这一点,ZeroMQ 并未最大限度地为我们提供设计到这个豪华工具中的舒适性和强大功能。
回到你的困境:
说了上面几点,你的初级架构还不是很清楚,这里还是可以帮到你的。
ZeroMQ 抽象的分布式行为套接字工具主要是一个纯[SERIAL]
调度设备。这意味着,none 与套接字关联的任何接收接入点 { .bind() | .connect() }
可以期望任意重新排序消息的纯顺序流。
这意味着,在任何情况下,如果 "just"-[CONCURRENT]
进程调度,或者在极端情况下,真正-[PARALLEL]
process-scheduling 是技术上编排的,单个 "pure"-[SERIAL]
交付渠道不允许{ [CONCURRENT] | [PARALLEL] }
-系统继续提供这种流程调度模式,并将事件/处理流程分成 "pure"-[SERIAL]
序列留言。
A ) 这确实 可能是引入多个独立操作的 ZeroMQ 分布式行为 Socket 实例的原因和必须。
B ) 另一方面,由于对全球分布式系统的行为一无所知,还没有人能确定 ,进入多个独立操作的 Socket 实例是否不仅仅是浪费时间和资源,是否由于极度错误或完全缺失的初始工程决策而提供不合理的低于平均水平或不可接受的不良端到端系统行为性能。
性能?
不要在这个域中猜测,永远不要。而是首先从定量声明的需求开始,在此基础上,技术上合理的设计将能够继续并定义资源映射和性能调整到平台限制所需的所有步骤。
ZeroMQ 在过去的 20 年里以极致的性能特征出色地完成了这项工作,设计与工程团队在完善可扩展性和性能范围方面做了大量工作,同时将延迟保持在难以实现的水平由一个临时程序员。确实是隐藏在 ZeroMQ 地下室中的一个很棒的系统编程。
"数据量巨大" -- 好的,定义 大小 -- 交付1E+9
大小为 1 [B]
的消息除了传递具有 的 1E+3 条消息之外还有其他性能调整1.000.000 [B]
大小。
"as fast as possible" -- 好的,定义fast 给定大小 和预期的消息节奏 1/s ~ 1 [Hz]
, 10/s ~ 10 [Hz]
, 1000/s ~ 1 [kHz]
当然,在某些特定情况下,这种混合需求可能会超出当代计算设备的能力范围。在开始任何编程之前,必须对其进行最好的审查,因为否则你只是破坏了一些永远不会飞的东西的编程工作,所以最好有一个解决方案架构在一个范围内可行和可行的积极证据可接受的资源和成本范围。
因此,如果您的项目需要某些东西,首先定义并定量指定实际是什么,接下来解决方案架构可以开始整理并提供决策,哪些工具和哪些工具配置可以匹配定义的功能目标级别和性能目标。
盖房子,从加高屋顶开始永远不会回答如何布置地下室墙的问题,什么是足够但又不过分设计的钢筋混凝土铠装厚度,这将承载数量不详的高层建筑。已经盖好屋顶很容易出现,但与系统严谨的设计和工程实践无关。