如何使用 CZMQ-4.0.2 新的 zsock API 创建发布/订阅架构?
How to create a Publish / Subscribe architecture using the CZMQ-4.0.2 new zsock API?
我想使用 CZMQ-4.0.2 创建发布/订阅架构,但我无法理解如何使用新的 zsock
API .
谁能给我指出一些使用新 API 的示例?
tldr;
Examples are on the bottom of the site
一点解释
我假设您是在询问 CZMQ 的特定用法,而不是如何使用 ZeroMQ 套接字,以及 PUB/SUB 模式的怪癖是什么。
使用 CZMQ 时,您无需担心上下文,它是在内部完成的。 zsock_new
函数族 returns 指向 zsock_t
的指针,套接字的不透明标识符。您需要记得在完成后调用 zsock_destroy(&socket)
,以避免内存泄漏。
在最常见的用法中,您无需担心连接和绑定,因为 zsock_new_XXX
会处理这些。要了解采取了什么操作,您可以查看 manual.
// Create a PUB socket. Default action is bind.
CZMQ_EXPORT zsock_t *
zsock_new_pub (const char *endpoint);
// Create a SUB socket, and optionally subscribe to some prefix string. Default
// action is connect.
CZMQ_EXPORT zsock_t *
zsock_new_sub (const char *endpoint, const char *subscribe);
如果您打算做一些不寻常的事情 binding/connecting,您可以为 endpoint
添加一个前缀。 @
表示绑定,>
连接。
zsock_t *sock = zsock_new_push("@ipc://test");
现在,您可以使用多种方法发送消息 (zsock_send
、zmsg_send
、zstr_send
、zstr_sendx
、zstr_sendf
、zframe_send
),最通用的是 zsock_send
。它有类似 printf 的原型,您需要在其中传递消息的 图片 。此字符串中的每个字符代表消息中的单个帧(或多个帧,因为您还可以传递另一条消息)。它在 here:
中描述
// Send a 'picture' message to the socket (or actor). The picture is a
// string that defines the type of each frame. This makes it easy to send
// a complex multiframe message in one call. The picture can contain any
// of these characters, each corresponding to one or two arguments:
//
// i = int (signed)
// 1 = uint8_t
// 2 = uint16_t
// 4 = uint32_t
// 8 = uint64_t
// s = char *
// b = byte *, size_t (2 arguments)
// c = zchunk_t *
// f = zframe_t *
// h = zhashx_t *
// U = zuuid_t *
// p = void * (sends the pointer value, only meaningful over inproc)
// m = zmsg_t * (sends all frames in the zmsg)
// z = sends zero-sized frame (0 arguments)
// u = uint (deprecated)
//
// Note that s, b, c, and f are encoded the same way and the choice is
// offered as a convenience to the sender, which may or may not already
// have data in a zchunk or zframe. Does not change or take ownership of
// any arguments. Returns 0 if successful, -1 if sending failed for any
// reason.
CZMQ_EXPORT int
zsock_send (void *self, const char *picture, ...);
一个可能不清楚的是这个void *self
,它实际上是我们从zsock_new返回的zsock_t *
。在原型中,它被声明为 void *
因为这个函数也接受 zactor_t *
.
重要:Does not change or take ownership of any arguments.
。发送后需要free/destroy数据
接收看起来非常相似。它就像 sscanf,并且 zsock_recv
创建对象,所以同样,您需要注意内存。
ZeroMQ 和 CZMQ 在行为上的最大区别是 LINGER 套接字选项。对于 ZeroMQ,它是无限的 (-1),其中 CZMQ 的默认值为 0(无阻塞)。因此,当 zsock_send
后跟 zsock_destroy
时,您的邮件可能无法送达。可以使用 zsock_set_linger
或全局 zsys_set_linger
.
为套接字单独设置延迟值
发布者示例
#include <czmq.h>
int main(int argc, char ** argv) {
zsock_t *socket = zsock_new_pub("ipc://example.sock");
assert(socket);
while(!zsys_interrupted) {
zsys_info("Publishing");
zsock_send(socket, "sss", "TOPIC", "MESSAGE PART", "ANOTHER");
zclock_sleep(1000);
}
zsock_destroy(&socket);
return 0;
}
订阅者示例
#include <czmq.h>
int main(int argc, char ** argv) {
zsock_t *socket = zsock_new_sub("ipc://example.sock", "TOPIC");
assert(socket);
char *topic;
char *frame;
zmsg_t *msg;
int rc = zsock_recv(socket, "sm", &topic, &msg);
assert(rc == 0);
zsys_info("Recv on %s", topic);
while(frame = zmsg_popstr(msg)) {
zsys_info("> %s", frame);
free(frame);
}
free(topic);
zmsg_destroy(&msg);
zsock_destroy(&socket);
return 0;
}
我想使用 CZMQ-4.0.2 创建发布/订阅架构,但我无法理解如何使用新的 zsock
API .
谁能给我指出一些使用新 API 的示例?
tldr;
Examples are on the bottom of the site
一点解释
我假设您是在询问 CZMQ 的特定用法,而不是如何使用 ZeroMQ 套接字,以及 PUB/SUB 模式的怪癖是什么。
使用 CZMQ 时,您无需担心上下文,它是在内部完成的。 zsock_new
函数族 returns 指向 zsock_t
的指针,套接字的不透明标识符。您需要记得在完成后调用 zsock_destroy(&socket)
,以避免内存泄漏。
在最常见的用法中,您无需担心连接和绑定,因为 zsock_new_XXX
会处理这些。要了解采取了什么操作,您可以查看 manual.
// Create a PUB socket. Default action is bind. CZMQ_EXPORT zsock_t * zsock_new_pub (const char *endpoint); // Create a SUB socket, and optionally subscribe to some prefix string. Default // action is connect. CZMQ_EXPORT zsock_t * zsock_new_sub (const char *endpoint, const char *subscribe);
如果您打算做一些不寻常的事情 binding/connecting,您可以为 endpoint
添加一个前缀。 @
表示绑定,>
连接。
zsock_t *sock = zsock_new_push("@ipc://test");
现在,您可以使用多种方法发送消息 (zsock_send
、zmsg_send
、zstr_send
、zstr_sendx
、zstr_sendf
、zframe_send
),最通用的是 zsock_send
。它有类似 printf 的原型,您需要在其中传递消息的 图片 。此字符串中的每个字符代表消息中的单个帧(或多个帧,因为您还可以传递另一条消息)。它在 here:
// Send a 'picture' message to the socket (or actor). The picture is a // string that defines the type of each frame. This makes it easy to send // a complex multiframe message in one call. The picture can contain any // of these characters, each corresponding to one or two arguments: // // i = int (signed) // 1 = uint8_t // 2 = uint16_t // 4 = uint32_t // 8 = uint64_t // s = char * // b = byte *, size_t (2 arguments) // c = zchunk_t * // f = zframe_t * // h = zhashx_t * // U = zuuid_t * // p = void * (sends the pointer value, only meaningful over inproc) // m = zmsg_t * (sends all frames in the zmsg) // z = sends zero-sized frame (0 arguments) // u = uint (deprecated) // // Note that s, b, c, and f are encoded the same way and the choice is // offered as a convenience to the sender, which may or may not already // have data in a zchunk or zframe. Does not change or take ownership of // any arguments. Returns 0 if successful, -1 if sending failed for any // reason. CZMQ_EXPORT int zsock_send (void *self, const char *picture, ...);
一个可能不清楚的是这个void *self
,它实际上是我们从zsock_new返回的zsock_t *
。在原型中,它被声明为 void *
因为这个函数也接受 zactor_t *
.
重要:Does not change or take ownership of any arguments.
。发送后需要free/destroy数据
接收看起来非常相似。它就像 sscanf,并且 zsock_recv
创建对象,所以同样,您需要注意内存。
ZeroMQ 和 CZMQ 在行为上的最大区别是 LINGER 套接字选项。对于 ZeroMQ,它是无限的 (-1),其中 CZMQ 的默认值为 0(无阻塞)。因此,当 zsock_send
后跟 zsock_destroy
时,您的邮件可能无法送达。可以使用 zsock_set_linger
或全局 zsys_set_linger
.
发布者示例
#include <czmq.h>
int main(int argc, char ** argv) {
zsock_t *socket = zsock_new_pub("ipc://example.sock");
assert(socket);
while(!zsys_interrupted) {
zsys_info("Publishing");
zsock_send(socket, "sss", "TOPIC", "MESSAGE PART", "ANOTHER");
zclock_sleep(1000);
}
zsock_destroy(&socket);
return 0;
}
订阅者示例
#include <czmq.h>
int main(int argc, char ** argv) {
zsock_t *socket = zsock_new_sub("ipc://example.sock", "TOPIC");
assert(socket);
char *topic;
char *frame;
zmsg_t *msg;
int rc = zsock_recv(socket, "sm", &topic, &msg);
assert(rc == 0);
zsys_info("Recv on %s", topic);
while(frame = zmsg_popstr(msg)) {
zsys_info("> %s", frame);
free(frame);
}
free(topic);
zmsg_destroy(&msg);
zsock_destroy(&socket);
return 0;
}