ZeroMQ:每种数据类型的套接字还是只有一个套接字?

ZeroMQ: socket per data type or just one socket?

我有一个程序可以从大约 10 个其他(传感器读取)程序(全部由我自己控制)接收信息。我现在想让他们使用 ZeroMQ.

进行交流 对于大多数队列来说,重要的是中央接收程序总是有最新的传感器数据,所有旧消息都不再重要了。如果有几条消息丢失了,我不在乎。因此,对于所有这些,我从一个单独的 PUB/SUB 套接字开始;每个程序一个。但我不确定这是否是正确的方法。据我了解,我有两个选择:

  1. 为每个程序制作一个单独的套接字并循环读取它们。这样我就可以通过套接字知道我收到的信息是什么(我通常只是发送一个 int)。
  2. 创建一个所有程序都连接到的套接字,并在每条消息中发送一个 string 告诉接收端消息的内容。

所有的连接都是在PUB/SUB的基础上,所以创建一个套接字就可以了。我只是不确定这是否是最有效的方法。

欢迎所有提示!

除非您遇到严格的实时要求,否则拥有不必要的套接字没有多大意义。 ZMQ 的公平排队应该注意给予每个传感器程序同等的关注(参见 the guide 中的图 6)

如果您的传感器程序位于通过以太网连接的其他设备上,则您的程序的最终性能将受到计算机中以太网 NIC 带宽的限制。处理单个 PULL 套接字的单线程程序很有可能能够比传输 NIC 更快地处理传入的数据。

如果是这样,那么您不妨坚持使用单个套接字并享受更简单的代码。处理多个套接字并不难,但处理一个要容易得多。例如,对于一个单一的套接字,您不必告诉每个传感器程序要连接到哪个网络端口——它可以是一个常量。

PUSH/PULL 听起来比 PUB/SUB 更适合您的情况,但这不会有太大区别。

持久度

持久性将是你的(潜在)问题。像 ZMQ 这样的事情的全部意义在于它们将按照发送消息的顺序传递消息。因此,您阅读了一条消息,就收件人而言,根据定义 "last" 消息。收件人不知道在途中是否还有另一封邮件。

这是 Actor 模型架构的一个特性(ZMQ 就是这样)。消息在传输中被缓冲,并且没有关于消息在阅读时要了解的新信息。你所知道的是它是提前发送的。没有与发送方的执行会合。

现在,您要么将其作为最后一条消息进行处理,要么等待一段时间以查看是否有另一条消息出现,然后再进行处理。最简单的做法是简单地处理每条消息,就好像它是最后一条消息一样。

将此与 通信顺序进程 架构进行对比。它与 Actor 模型架构基本相同,只是传输不缓冲消息。消息发送块,直到收件人调用消息已读。

因此,当您阅读邮件时,收件人知道这是发件人发送的最后一封邮件。发件人知道收件人在那一刻收到了它发送的消息。所以最后的知识是绝对的——收到的消息确实是最后发送的消息。

但是,除非您有相当重量级的事情要进行,否则我不会担心。即使您正在阅读的消息不是队列中的最新消息,您也很可能能够跟上传感器数据流。

你可以接近通过将发送端套接字的高水位限制设置为1来使ZMQ成为CSP。这意味着你最多可以缓冲1条消息。这与 0 不同,不幸的是,将 HWM 设置为 0 意味着 "unlimited size buffer"。

- PUB/SUB is fine and allows an easy conversion from N-sensors:1-logger into N-sensors:2+-loggers
- one might also benefit from a conceptual separation of a socket from an access-port, where more than one sockets may get connected

如何始终获得实际(最后)传感器读数:

如果由于系统集成限制而未绑定到某些早期的 ZeroMQ API,则有 一个可爱的功能正是针对此 通过 .setsockopt( ZMQ_CONFLATE, True )方法:

ZMQ_CONFLATE: Keep only last message

If set, a socket shall keep only one message in its inbound/outbound queue, this message being the last message received/the last message to be sent.
Ignores ZMQ_RCVHWM and ZMQ_SNDHWM options. Does not support multi-part messages, in particular, only one part of it is kept in the socket internal queue.


关于设计困境:

除非您的实时控制稳定性引入了一些硬实时限制,否则 PUB 端可以自由决定,指示新值的频率.send()SUB(-s)。这里不需要魔法,在管理的内部传出队列上设置 ZMQ_CONFLATE 选项越少。

SUB(-s) 端接收方也将受益于在管理的内部传入队列上设置的 ZMQ_CONFLATE 选项,但是给定一组单独的 .bind()-s 实例化单独的着陆端口以传送不同的单独感官读数,您的 "last" 值将始终保持 "last"-读数。如果所有读数都进入一个共同的着陆台,您的接收过程将屏蔽掉(丢失)所有读数,但只是在 .recv()[ 之前不小心 "last" 的读数。 =50=] 发生了,这没有多大帮助,是吗?

如果需要一些 I/O-performance 相关的调整,.Context( n_IO_threads ) + ZMQ_AFFINITY-映射选项可能会增加 ioDataPump 可以利用的资源并确定其优先级,以提高 IO 性能