如何使用 cppzmq 将 ZeroMQ 消息从 ROUTER 套接字发送到特定的 DEALER 套接字?
How can I send a ZeroMQ message from a ROUTER socket to a specific DEALER socket using cppzmq?
我把这个最小的例子放在一起,以便从路由器套接字发送消息到特定的 DEALER socker(它有它的标识集)。当 运行 这两个程序似乎挂在 ROUTER 上等待 DEALER 的回复,并且 DEALER 挂起等待来自 ROUTER 的请求。所以看起来 ROUTER 发送的消息永远不会到达 DEALER.
Router.cpp
#include <iostream>
#include <zmq.hpp>
#include <string>
#include <thread>
#include <chrono>
int main() {
zmq::context_t context;
zmq::socket_t socket (context, zmq::socket_type::router);
// Enforce sending routable messages only
socket.setsockopt(ZMQ_ROUTER_MANDATORY, 1);
socket.bind("tcp://*:5555");
try {
std::string jobRequest = "ExampleJobRequest";
std::cout << "Router: Sending msg: " << jobRequest << std::endl;
// Set the address, then the empty delimiter and then the request itself
socket.send("PEER2", ZMQ_SNDMORE);
//socket.send(zmq::message_t(), ZMQ_SNDMORE);
socket.send(zmq::str_buffer("ExampleJobRequest")) ;
// Set the address, then the empty delimiter and then the request itself
socket.send("PEER2", ZMQ_SNDMORE);
//socket.send(zmq::message_t(), ZMQ_SNDMORE);
socket.send(zmq::str_buffer("ExampleJobRequest")) ;
// Receive the reply from the camera
std::cout << "Router: Waiting for reply from camera " << std::endl;
zmq::message_t reply;
socket.recv(&reply);
std::cout << "Router: Received " << std::string(static_cast<char*>(reply.data()), reply.size()) << std::endl;
} catch (std::exception e) {
std::cout << "Router Error: " << e.what();
}
std::this_thread::sleep_for(std::chrono::seconds(1));
socket.close();
context.close();
}
Dealer.cpp
#include <zmq.hpp>
#include <string>
#include <iostream>
#include <thread>
int main (void)
{
// Prepare our context and socket
zmq::context_t context;
zmq::socket_t socket (context, zmq::socket_type::dealer);
std::cout << "Dealer: Connecting to RunJob server… \n";
socket.setsockopt(ZMQ_IDENTITY, "PEER2", 5);
socket.connect ("tcp://localhost:5555");
while(true) {
try {
// Wait for next request from client
std::cout << "Dealer: Waiting for request" << std::endl;
zmq::message_t request;
zmq::message_t empty;
// Receive request
socket.recv(&request);
std::string requestString = std::string(static_cast<char*>(request.data()), request.size());
std::cout << "Dealer: Received request" << std::endl;
std::cout << requestString << std::endl;
// ZMQ_SNDMORE - "Specifies that the message being sent is a multi-part message, and that further message parts are to follow"
socket.send(zmq::str_buffer("Job completed"), zmq::send_flags::dontwait);
}catch (std::exception e) {
std::cout << "Router Error: " << e.what();
}
}
// Used to set various 0MQ Socket Settings
// ZMQ_Linger - Set linger period for socket shutdown
socket.setsockopt(ZMQ_LINGER, 0);
socket.close();
context.close();
return 0;
}
我原本认为我应该在消息前加上一个空分隔符,socket.send(zmq::message_t(), ZMQ_SNDMORE);
,但这导致了错误。同样使用以下内容也会导致在 try/catch 块中抛出错误。该错误只是打印 'Unknown error':
zmq::message_t delimiter(0);
socket.send(delimiter, ZMQ_SNDMORE);
使用以下创建定界符也会导致同样的错误:
socket.send(zmq::message_t(), ZMQ_SNDMORE);
据我所知,当使用 cppzmq 时,您不需要添加空分隔符(我对此可能是错误的,但在阅读和查看其他人的示例并测试我自己的代码后,这就是我确定)。
这是一个非常基本的图表,包含此设计的最终目标:
在我的研究中,我还没有找到这段代码的好例子。 Cppzmq github 的文档和示例很少。
以下是我看过的其他一些来源:
How to send message with zeroMq using Dealer
关于 ROUTER/DEALER 模式的主要思想是它是 REPLY/REQUEST 的异步概括。然而,您正试图反转模式中的套接字,发现它不适合并扭曲代码以尝试使其适合。不要那样做。
你需要做的是"go with the flow"。在存在示例的简单方法中,经销商应发送第一条消息。 ROUTER 然后对此做出响应。
下一级是经销商在其启动消息中标识自己。 ROUTER 然后可以给那个 DEALER 一个特定的响应。
在下一级别,您可以实现真正的异步。 ROUTER 可以获取每个 DEALER 的标识消息的副本,并使用消息副本随时向任何 DEALER 发送异步消息。识别消息的一份副本将附加 "PEER2" 帧并发送给经销商。这是有效的,因为消息的副本包括路由帧。理想情况下,您还可以去除 'message' 帧,只在副本中留下路由帧。
警告 - 我不使用 cppzmq,我使用 CZMQ。我可以说使用 CZMQ 这种帧操作非常容易。
我把这个最小的例子放在一起,以便从路由器套接字发送消息到特定的 DEALER socker(它有它的标识集)。当 运行 这两个程序似乎挂在 ROUTER 上等待 DEALER 的回复,并且 DEALER 挂起等待来自 ROUTER 的请求。所以看起来 ROUTER 发送的消息永远不会到达 DEALER.
Router.cpp
#include <iostream>
#include <zmq.hpp>
#include <string>
#include <thread>
#include <chrono>
int main() {
zmq::context_t context;
zmq::socket_t socket (context, zmq::socket_type::router);
// Enforce sending routable messages only
socket.setsockopt(ZMQ_ROUTER_MANDATORY, 1);
socket.bind("tcp://*:5555");
try {
std::string jobRequest = "ExampleJobRequest";
std::cout << "Router: Sending msg: " << jobRequest << std::endl;
// Set the address, then the empty delimiter and then the request itself
socket.send("PEER2", ZMQ_SNDMORE);
//socket.send(zmq::message_t(), ZMQ_SNDMORE);
socket.send(zmq::str_buffer("ExampleJobRequest")) ;
// Set the address, then the empty delimiter and then the request itself
socket.send("PEER2", ZMQ_SNDMORE);
//socket.send(zmq::message_t(), ZMQ_SNDMORE);
socket.send(zmq::str_buffer("ExampleJobRequest")) ;
// Receive the reply from the camera
std::cout << "Router: Waiting for reply from camera " << std::endl;
zmq::message_t reply;
socket.recv(&reply);
std::cout << "Router: Received " << std::string(static_cast<char*>(reply.data()), reply.size()) << std::endl;
} catch (std::exception e) {
std::cout << "Router Error: " << e.what();
}
std::this_thread::sleep_for(std::chrono::seconds(1));
socket.close();
context.close();
}
Dealer.cpp
#include <zmq.hpp>
#include <string>
#include <iostream>
#include <thread>
int main (void)
{
// Prepare our context and socket
zmq::context_t context;
zmq::socket_t socket (context, zmq::socket_type::dealer);
std::cout << "Dealer: Connecting to RunJob server… \n";
socket.setsockopt(ZMQ_IDENTITY, "PEER2", 5);
socket.connect ("tcp://localhost:5555");
while(true) {
try {
// Wait for next request from client
std::cout << "Dealer: Waiting for request" << std::endl;
zmq::message_t request;
zmq::message_t empty;
// Receive request
socket.recv(&request);
std::string requestString = std::string(static_cast<char*>(request.data()), request.size());
std::cout << "Dealer: Received request" << std::endl;
std::cout << requestString << std::endl;
// ZMQ_SNDMORE - "Specifies that the message being sent is a multi-part message, and that further message parts are to follow"
socket.send(zmq::str_buffer("Job completed"), zmq::send_flags::dontwait);
}catch (std::exception e) {
std::cout << "Router Error: " << e.what();
}
}
// Used to set various 0MQ Socket Settings
// ZMQ_Linger - Set linger period for socket shutdown
socket.setsockopt(ZMQ_LINGER, 0);
socket.close();
context.close();
return 0;
}
我原本认为我应该在消息前加上一个空分隔符,socket.send(zmq::message_t(), ZMQ_SNDMORE);
,但这导致了错误。同样使用以下内容也会导致在 try/catch 块中抛出错误。该错误只是打印 'Unknown error':
zmq::message_t delimiter(0);
socket.send(delimiter, ZMQ_SNDMORE);
使用以下创建定界符也会导致同样的错误:
socket.send(zmq::message_t(), ZMQ_SNDMORE);
据我所知,当使用 cppzmq 时,您不需要添加空分隔符(我对此可能是错误的,但在阅读和查看其他人的示例并测试我自己的代码后,这就是我确定)。
这是一个非常基本的图表,包含此设计的最终目标:
在我的研究中,我还没有找到这段代码的好例子。 Cppzmq github 的文档和示例很少。
以下是我看过的其他一些来源:
How to send message with zeroMq using Dealer
关于 ROUTER/DEALER 模式的主要思想是它是 REPLY/REQUEST 的异步概括。然而,您正试图反转模式中的套接字,发现它不适合并扭曲代码以尝试使其适合。不要那样做。
你需要做的是"go with the flow"。在存在示例的简单方法中,经销商应发送第一条消息。 ROUTER 然后对此做出响应。
下一级是经销商在其启动消息中标识自己。 ROUTER 然后可以给那个 DEALER 一个特定的响应。
在下一级别,您可以实现真正的异步。 ROUTER 可以获取每个 DEALER 的标识消息的副本,并使用消息副本随时向任何 DEALER 发送异步消息。识别消息的一份副本将附加 "PEER2" 帧并发送给经销商。这是有效的,因为消息的副本包括路由帧。理想情况下,您还可以去除 'message' 帧,只在副本中留下路由帧。
警告 - 我不使用 cppzmq,我使用 CZMQ。我可以说使用 CZMQ 这种帧操作非常容易。