如何使用 CZMQ-4.0.2 新的 zsock API 创建发布/订阅架构?

How to create a Publish / Subscribe architecture using the CZMQ-4.0.2 new zsock API?

我想使用 CZMQ-4.0.2 创建发布/订阅架构,但我无法理解如何使用新的 zsock API .

谁能给我指出一些使用新 API 的示例?

tldr;

Examples are on the bottom of the site

一点解释

我假设您是在询问 CZMQ 的特定用法,而不是如何使用 ZeroMQ 套接字,以及 PUB/SUB 模式的怪癖是什么。

使用 CZMQ 时,您无需担心上下文,它是在内部完成的。 zsock_new 函数族 returns 指向 zsock_t 的指针,套接字的不透明标识符。您需要记得在完成后调用 zsock_destroy(&socket),以避免内存泄漏。

在最常见的用法中,您无需担心连接和绑定,因为 zsock_new_XXX 会处理这些。要了解采取了什么操作,您可以查看 manual.

//  Create a PUB socket. Default action is bind.
CZMQ_EXPORT zsock_t *
    zsock_new_pub (const char *endpoint);
//  Create a SUB socket, and optionally subscribe to some prefix string. Default
//  action is connect.
CZMQ_EXPORT zsock_t *
    zsock_new_sub (const char *endpoint, const char *subscribe);

如果您打算做一些不寻常的事情 binding/connecting,您可以为 endpoint 添加一个前缀。 @表示绑定,>连接。

zsock_t *sock = zsock_new_push("@ipc://test");

现在,您可以使用多种方法发送消息 (zsock_sendzmsg_sendzstr_sendzstr_sendxzstr_sendfzframe_send),最通用的是 zsock_send。它有类似 printf 的原型,您需要在其中传递消息的 图片 。此字符串中的每个字符代表消息中的单个帧(或多个帧,因为您还可以传递另一条消息)。它在 here:

中描述
//  Send a 'picture' message to the socket (or actor). The picture is a
//  string that defines the type of each frame. This makes it easy to send
//  a complex multiframe message in one call. The picture can contain any
//  of these characters, each corresponding to one or two arguments:
//
//      i = int (signed)
//      1 = uint8_t
//      2 = uint16_t
//      4 = uint32_t
//      8 = uint64_t
//      s = char *
//      b = byte *, size_t (2 arguments)
//      c = zchunk_t *
//      f = zframe_t *
//      h = zhashx_t *
//      U = zuuid_t *
//      p = void * (sends the pointer value, only meaningful over inproc)
//      m = zmsg_t * (sends all frames in the zmsg)
//      z = sends zero-sized frame (0 arguments)
//      u = uint (deprecated)
//
//  Note that s, b, c, and f are encoded the same way and the choice is
//  offered as a convenience to the sender, which may or may not already
//  have data in a zchunk or zframe. Does not change or take ownership of
//  any arguments. Returns 0 if successful, -1 if sending failed for any
//  reason.
CZMQ_EXPORT int
zsock_send (void *self, const char *picture, ...);

一个可能不清楚的是这个void *self,它实际上是我们从zsock_new返回的zsock_t *。在原型中,它被声明为 void * 因为这个函数也接受 zactor_t *.

重要:Does not change or take ownership of any arguments.。发送后需要free/destroy数据

接收看起来非常相似。它就像 sscanf,并且 zsock_recv 创建对象,所以同样,您需要注意内存。

ZeroMQ 和 CZMQ 在行为上的最大区别是 LINGER 套接字选项。对于 ZeroMQ,它是无限的 (-1),其中 CZMQ 的默认值为 0(无阻塞)。因此,当 zsock_send 后跟 zsock_destroy 时,您的邮件可能无法送达。可以使用 zsock_set_linger 或全局 zsys_set_linger.

为套接字单独设置延迟值

发布者示例

#include <czmq.h>

int main(int argc, char ** argv) {
  zsock_t *socket = zsock_new_pub("ipc://example.sock");
  assert(socket);

  while(!zsys_interrupted) {
    zsys_info("Publishing");
    zsock_send(socket, "sss", "TOPIC", "MESSAGE PART", "ANOTHER");
    zclock_sleep(1000);
  }

  zsock_destroy(&socket);
  return 0;
}

订阅者示例

#include <czmq.h>

int main(int argc, char ** argv) {
  zsock_t *socket = zsock_new_sub("ipc://example.sock", "TOPIC");
  assert(socket);

  char *topic;
  char *frame;
  zmsg_t *msg;
  int rc = zsock_recv(socket, "sm", &topic, &msg);
  assert(rc == 0);

  zsys_info("Recv on %s", topic);
  while(frame = zmsg_popstr(msg)) {
    zsys_info("> %s", frame);
    free(frame);
  }
  free(topic);
  zmsg_destroy(&msg);

  zsock_destroy(&socket);
  return 0;
}