ZeroMQ Pub/Sub 操作队列中的最后一个元素和其他元素

ZeroMQ Pub/Sub action last element in queue an other elements

我开始使用 zeromq 和 python 以及 Publisher/Subscriber 参考。但是,我没有找到任何关于如何处理队列中消息的文档。我想将最后收到的消息与队列中的其他元素区别对待。

例子

publisher.py

import zmq
import random
import time

port = "5556"
topic = "1"

context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:%s" % port)

while True:
    messagedata = random.randrange(1,215)
    print "%s %d" % (topic, messagedata)
    socket.send("%s %d" % (topic, messagedata))
    time.sleep(.2)

subscriber.py

import zmq

port = "5556"
topic = "1"

context = zmq.Context()
socket = context.socket(zmq.SUB)

print "Connecting..."
socket.connect ("tcp://localhost:%s" % port)
socket.setsockopt(zmq.SUBSCRIBE,topic)

while True:
    if isLastMessage(): # probably based on socket.recv()
         analysis_function() # time consuming function
    else:
         simple_function()  # something simple like print and save in memory

我只想知道如何创建 subscriber.py 文件中描述的 isLastMessage() 函数。如果直接在 zeromq 或解决方法中有一些东西。

如果 isLastMessage() 旨在识别 publisher.py 生成的消息流中的最后一条消息,那么这是不可能的,因为没有最后一条消息。 publisher.py 产生无限量的消息!

然而,如果 publisher.py 知道它的最后一个 "real" 消息,即没有 while True:,它可以在之后发送 "I am done" 消息。在 subscriber.py 中识别是微不足道的。

对不起,我会保留这个问题以供参考。我刚刚找到答案,在文档中有一个 NOBLOCK 标志,您可以将其添加到接收器。这样 recv 命令就不会阻塞。从 answer 的一部分中提取的一个简单解决方法如下:

while True:
    try:
        #check for a message, this will not block
        message = socket.recv(flags=zmq.NOBLOCK)

        #a message has been received
        print "Message received:", message

    except zmq.Again as e:
        print "No message received yet"

至于真正的实现,人们不确定这是最后一次使用标志 NOBLOCK 的调用,并且一旦您进入 exception 块。 Wich 翻译成如下内容:

msg = subscribe(in_socket)
is_last = False
while True:
    if is_last:
        msg = subscribe(in_socket)
        is_last = False
    else:
        try:
            old_msg = msg
            msg = subscribe(in_socket,flags=zmq.NOBLOCK)
            # if new message was received, then process the old message
            process_not_last(old_msg)
        except zmq.Again as e:
            process_last(msg)
            is_last = True  # it is probably the last message

欢迎来到非阻塞消息/信令的世界

这是任何严肃的分布式系统设计的基本特征。

如果您假设 "last" 消息通过管道中没有另一个消息,那么 Poller() 实例可能有助于您的主事件循环,您可以在其中控制时间"wait"-a-bit 在考虑管道 "empty" 之前,不要用零等待自旋循环破坏你的 IO 资源。

显式信号总是更好(如果你可以设计远程端行为)

在接收方有零知识,接收到的"last"消息的上下文是什么(并且建议从消息发送方广播显式信号),但是有是与此相反的功能——指示 ZeroMQ 原型 "internally"-丢弃所有此类消息,而不是 "last"-消息,从而减少接收方处理以纠正 "last"-有消息。

aQuoteStreamMESSAGE.setsockopt( zmq.CONFLATE, 1 )

如果您可能想阅读更多关于 ZeroMQ 模式和反模式的内容,请不要错过 Pieter HINTJENS 的精彩著作 "Code Connected, Volume 1"(也是 pdf 格式)并且可能喜欢 using 的更广阔视野