理解ZMQ的HWM
Understanding ZMQ's HWM
我在理解 ZeroMQ 高水位线 (HWM) 队列的工作原理时遇到一些问题。
我在下面附上了两个脚本,重现了以下内容。
- 建立 PUSH/PULL 连接,将所有 HWM 队列设置为 1。
- 让 puller 休眠一段时间。
- 从推送器发送 2200 条消息。
- puller唤醒后,接收2200条消息并打印
我得到的结果是拉取器能够成功接收(打印)所有消息。此外,推送器似乎几乎立即完成执行。根据 ZMQ official documentation 我期望的是推动器不会在拉动器醒来之前完成执行,因为由于达到 HWM 而在第二次 send(...)
调用时被阻止。我还尝试在每次 send(...)
调用之间添加 0.001 秒的睡眠,结果相同。
那么,我的问题是:
- 为什么在达到 HWM(大小 1)后,推送器在对
send(...)
的第二次调用中没有阻塞?
- 推送器和拉取器中的消息存储在哪里?
- HWM 大小与存储的消息数量之间是否存在直接关系?
脚本:
pusher.py
import zmq
context = zmq.Context()
push_socket = context.socket(zmq.PUSH)
push_socket.setsockopt(zmq.SNDHWM, 1)
push_socket.setsockopt(zmq.RCVHWM, 1)
push_socket.bind("tcp://127.0.0.1:5557")
print(push_socket.get_hwm()) # Prints 1
print('Sending all messages')
for i in range(2200):
push_socket.send(str(i).encode('ascii'))
print('Finished execution...')
puller.py
import zmq
import time
context = zmq.Context()
pull_socket = context.socket(zmq.PULL)
pull_socket.setsockopt(zmq.RCVHWM, 1)
pull_socket.setsockopt(zmq.SNDHWM, 1)
pull_socket.connect("tcp://127.0.0.1:5557")
print(pull_socket.get_hwm()) # Prints 1
print('Connected, but not receiving yet... (Sleep 4s)')
time.sleep(4)
print('Receiving everything now!')
rec = ''
for i in range(2200):
rec += '{} '.format(pull_socket.recv().decode('ascii'))
print(rec) # Prints `0 1 2 ... 2198 2199 `
为了重现我的测试用例,打开两个终端并在一个终端中先启动 puller.py,然后在另一个终端中快速启动(4 秒 window)pusher.py。
这里至少涉及4个缓冲区:zmq发送缓冲区,OS写入tcp缓冲区,OS读取tcp缓冲区,zmq接收缓冲区。
当消息成功写入 OS tcp 写入缓冲区时,zmq io 线程将消息标记为 "sent"。这些消息现在被视为 "in transit"。
然后网络堆栈负责将尽可能多的数据传输到其他进程的匹配 OS recv 缓冲区中,
最后,接收 zmq io 线程一次最多从该缓冲区读取 HWM 消息到 ZMQ 接收队列。
OS 缓冲区默认情况下通常在 10-100kb 左右,并且在 ZMQ 甚至注意到对方没有使用任何消息之前,这两个缓冲区都可以完全填满 "in transit" 消息.出于性能原因,这些缓冲区是必需的 - 您不能只是摆脱它们。
您的问题的解决方案可能涉及 req/rep 套接字和显式应用程序级确认,即指南中的懒惰盗版模式。
我在理解 ZeroMQ 高水位线 (HWM) 队列的工作原理时遇到一些问题。
我在下面附上了两个脚本,重现了以下内容。
- 建立 PUSH/PULL 连接,将所有 HWM 队列设置为 1。
- 让 puller 休眠一段时间。
- 从推送器发送 2200 条消息。
- puller唤醒后,接收2200条消息并打印
我得到的结果是拉取器能够成功接收(打印)所有消息。此外,推送器似乎几乎立即完成执行。根据 ZMQ official documentation 我期望的是推动器不会在拉动器醒来之前完成执行,因为由于达到 HWM 而在第二次 send(...)
调用时被阻止。我还尝试在每次 send(...)
调用之间添加 0.001 秒的睡眠,结果相同。
那么,我的问题是:
- 为什么在达到 HWM(大小 1)后,推送器在对
send(...)
的第二次调用中没有阻塞? - 推送器和拉取器中的消息存储在哪里?
- HWM 大小与存储的消息数量之间是否存在直接关系?
脚本:
pusher.py
import zmq
context = zmq.Context()
push_socket = context.socket(zmq.PUSH)
push_socket.setsockopt(zmq.SNDHWM, 1)
push_socket.setsockopt(zmq.RCVHWM, 1)
push_socket.bind("tcp://127.0.0.1:5557")
print(push_socket.get_hwm()) # Prints 1
print('Sending all messages')
for i in range(2200):
push_socket.send(str(i).encode('ascii'))
print('Finished execution...')
puller.py
import zmq
import time
context = zmq.Context()
pull_socket = context.socket(zmq.PULL)
pull_socket.setsockopt(zmq.RCVHWM, 1)
pull_socket.setsockopt(zmq.SNDHWM, 1)
pull_socket.connect("tcp://127.0.0.1:5557")
print(pull_socket.get_hwm()) # Prints 1
print('Connected, but not receiving yet... (Sleep 4s)')
time.sleep(4)
print('Receiving everything now!')
rec = ''
for i in range(2200):
rec += '{} '.format(pull_socket.recv().decode('ascii'))
print(rec) # Prints `0 1 2 ... 2198 2199 `
为了重现我的测试用例,打开两个终端并在一个终端中先启动 puller.py,然后在另一个终端中快速启动(4 秒 window)pusher.py。
这里至少涉及4个缓冲区:zmq发送缓冲区,OS写入tcp缓冲区,OS读取tcp缓冲区,zmq接收缓冲区。
当消息成功写入 OS tcp 写入缓冲区时,zmq io 线程将消息标记为 "sent"。这些消息现在被视为 "in transit"。
然后网络堆栈负责将尽可能多的数据传输到其他进程的匹配 OS recv 缓冲区中, 最后,接收 zmq io 线程一次最多从该缓冲区读取 HWM 消息到 ZMQ 接收队列。
OS 缓冲区默认情况下通常在 10-100kb 左右,并且在 ZMQ 甚至注意到对方没有使用任何消息之前,这两个缓冲区都可以完全填满 "in transit" 消息.出于性能原因,这些缓冲区是必需的 - 您不能只是摆脱它们。
您的问题的解决方案可能涉及 req/rep 套接字和显式应用程序级确认,即指南中的懒惰盗版模式。