ZeroMQ Pub/Sub 操作队列中的最后一个元素和其他元素
ZeroMQ Pub/Sub action last element in queue an other elements
我开始使用 zeromq
和 python 以及 Publisher/Subscriber 参考。但是,我没有找到任何关于如何处理队列中消息的文档。我想将最后收到的消息与队列中的其他元素区别对待。
例子
publisher.py
import zmq
import random
import time
port = "5556"
topic = "1"
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:%s" % port)
while True:
messagedata = random.randrange(1,215)
print "%s %d" % (topic, messagedata)
socket.send("%s %d" % (topic, messagedata))
time.sleep(.2)
subscriber.py
import zmq
port = "5556"
topic = "1"
context = zmq.Context()
socket = context.socket(zmq.SUB)
print "Connecting..."
socket.connect ("tcp://localhost:%s" % port)
socket.setsockopt(zmq.SUBSCRIBE,topic)
while True:
if isLastMessage(): # probably based on socket.recv()
analysis_function() # time consuming function
else:
simple_function() # something simple like print and save in memory
我只想知道如何创建 subscriber.py
文件中描述的 isLastMessage()
函数。如果直接在 zeromq
或解决方法中有一些东西。
如果 isLastMessage()
旨在识别 publisher.py
生成的消息流中的最后一条消息,那么这是不可能的,因为没有最后一条消息。 publisher.py
产生无限量的消息!
然而,如果 publisher.py
知道它的最后一个 "real" 消息,即没有 while True:
,它可以在之后发送 "I am done" 消息。在 subscriber.py
中识别是微不足道的。
对不起,我会保留这个问题以供参考。我刚刚找到答案,在文档中有一个 NOBLOCK
标志,您可以将其添加到接收器。这样 recv
命令就不会阻塞。从 answer 的一部分中提取的一个简单解决方法如下:
while True:
try:
#check for a message, this will not block
message = socket.recv(flags=zmq.NOBLOCK)
#a message has been received
print "Message received:", message
except zmq.Again as e:
print "No message received yet"
至于真正的实现,人们不确定这是最后一次使用标志 NOBLOCK
的调用,并且一旦您进入 exception
块。 Wich 翻译成如下内容:
msg = subscribe(in_socket)
is_last = False
while True:
if is_last:
msg = subscribe(in_socket)
is_last = False
else:
try:
old_msg = msg
msg = subscribe(in_socket,flags=zmq.NOBLOCK)
# if new message was received, then process the old message
process_not_last(old_msg)
except zmq.Again as e:
process_last(msg)
is_last = True # it is probably the last message
欢迎来到非阻塞消息/信令的世界
这是任何严肃的分布式系统设计的基本特征。
如果您假设 "last" 消息通过管道中没有另一个消息,那么 Poller()
实例可能有助于您的主事件循环,您可以在其中控制时间"wait"-a-bit 在考虑管道 "empty" 之前,不要用零等待自旋循环破坏你的 IO 资源。
显式信号总是更好(如果你可以设计远程端行为)
在接收方有零知识,接收到的"last"消息的上下文是什么(并且建议从消息发送方广播显式信号),但是有是与此相反的功能——指示 ZeroMQ 原型 "internally"-丢弃所有此类消息,而不是 "last"-消息,从而减少接收方处理以纠正 "last"-有消息。
aQuoteStreamMESSAGE.setsockopt( zmq.CONFLATE, 1 )
如果您可能想阅读更多关于 ZeroMQ 模式和反模式的内容,请不要错过 Pieter HINTJENS 的精彩著作 "Code Connected, Volume 1"(也是 pdf 格式)并且可能喜欢 distributed-computing using 的更广阔视野
我开始使用 zeromq
和 python 以及 Publisher/Subscriber 参考。但是,我没有找到任何关于如何处理队列中消息的文档。我想将最后收到的消息与队列中的其他元素区别对待。
例子
publisher.py
import zmq
import random
import time
port = "5556"
topic = "1"
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:%s" % port)
while True:
messagedata = random.randrange(1,215)
print "%s %d" % (topic, messagedata)
socket.send("%s %d" % (topic, messagedata))
time.sleep(.2)
subscriber.py
import zmq
port = "5556"
topic = "1"
context = zmq.Context()
socket = context.socket(zmq.SUB)
print "Connecting..."
socket.connect ("tcp://localhost:%s" % port)
socket.setsockopt(zmq.SUBSCRIBE,topic)
while True:
if isLastMessage(): # probably based on socket.recv()
analysis_function() # time consuming function
else:
simple_function() # something simple like print and save in memory
我只想知道如何创建 subscriber.py
文件中描述的 isLastMessage()
函数。如果直接在 zeromq
或解决方法中有一些东西。
如果 isLastMessage()
旨在识别 publisher.py
生成的消息流中的最后一条消息,那么这是不可能的,因为没有最后一条消息。 publisher.py
产生无限量的消息!
然而,如果 publisher.py
知道它的最后一个 "real" 消息,即没有 while True:
,它可以在之后发送 "I am done" 消息。在 subscriber.py
中识别是微不足道的。
对不起,我会保留这个问题以供参考。我刚刚找到答案,在文档中有一个 NOBLOCK
标志,您可以将其添加到接收器。这样 recv
命令就不会阻塞。从 answer 的一部分中提取的一个简单解决方法如下:
while True:
try:
#check for a message, this will not block
message = socket.recv(flags=zmq.NOBLOCK)
#a message has been received
print "Message received:", message
except zmq.Again as e:
print "No message received yet"
至于真正的实现,人们不确定这是最后一次使用标志 NOBLOCK
的调用,并且一旦您进入 exception
块。 Wich 翻译成如下内容:
msg = subscribe(in_socket)
is_last = False
while True:
if is_last:
msg = subscribe(in_socket)
is_last = False
else:
try:
old_msg = msg
msg = subscribe(in_socket,flags=zmq.NOBLOCK)
# if new message was received, then process the old message
process_not_last(old_msg)
except zmq.Again as e:
process_last(msg)
is_last = True # it is probably the last message
欢迎来到非阻塞消息/信令的世界
这是任何严肃的分布式系统设计的基本特征。
如果您假设 "last" 消息通过管道中没有另一个消息,那么 Poller()
实例可能有助于您的主事件循环,您可以在其中控制时间"wait"-a-bit 在考虑管道 "empty" 之前,不要用零等待自旋循环破坏你的 IO 资源。
显式信号总是更好(如果你可以设计远程端行为)
在接收方有零知识,接收到的"last"消息的上下文是什么(并且建议从消息发送方广播显式信号),但是有是与此相反的功能——指示 ZeroMQ 原型 "internally"-丢弃所有此类消息,而不是 "last"-消息,从而减少接收方处理以纠正 "last"-有消息。
aQuoteStreamMESSAGE.setsockopt( zmq.CONFLATE, 1 )
如果您可能想阅读更多关于 ZeroMQ 模式和反模式的内容,请不要错过 Pieter HINTJENS 的精彩著作 "Code Connected, Volume 1"(也是 pdf 格式)并且可能喜欢 distributed-computing using