保留消息直到使用 Python + Stomp 进行订阅
Retain Messages until a Subscription is Made using Python + Stomp
我目前正在编写两个脚本来使用 stomp 客户端库订阅消息服务器,write.py 来编写data 和 read.py 获取数据。
如果我先开始read.py然后运行write.py,write.py 正确接收消息。
但是,如果我先运行write.py然后运行read.py, read.py 不检索任何以前发送到服务器的消息。
以下是脚本的相关部分。
如何实现 write.py 放入队列的消息一直保留到 read.py订阅并检索它们?
write.py
def writeMQ(msg):
queue = '/topic/test'
conn = stomp.Connection(host_and_ports=[(MQ_SERVER, MQ_PORT)])
try:
conn.start()
conn.connect(MQ_USER, MQ_PASSWD, wait=True)
conn.send(body=msg, destination=queue, persistent=True)
except:
traceback.print_exc()
finally:
conn.disconnect()
return
read.py
class MyListener(stomp.ConnectionListener):
def on_error(self, headers, message):
print ('received an error {0}'.format(message))
def on_message(self, headers, message):
print ('received an message {0}'.format(message))
def readMQ():
queue = '/topic/test'
conn = stomp.Connection(host_and_ports=[(MQ_SERVER, MQ_PORT)])
try:
conn.set_listener("", MyListener())
conn.start()
conn.connect(MQ_USER, MQ_PASSWD, wait=True)
conn.subscribe(destination=queue, ack="auto", id=1)
stop = raw_input()
except:
traceback.print_exc()
finally:
conn.disconnect()
return
问题是消息正在发送到主题。
Apollo Documentation描述主题和queue的区别如下:
Queues hold on to unconsumed messages even when there are no subscriptions attached, while a topic will drop messages when there are no connected subscriptions.
因此,当read.py首先启动并监听时,主题识别订阅并转发消息。但是当 write.py 首先启动时,消息被丢弃,因为没有订阅的客户端。
因此您可以使用 queue 而不是主题。如果服务器能够静默创建 queue,只需设置
queue = '/queue/test' .
不知道用的是哪个版本的stomp,但是找不到参数
send(..., persistent=True) .
无论如何,坚持不是正确的方法,因为它仍然不允许简单地保留消息以供以后连接,但会在服务器出现故障时保存消息。
您可以使用
retain:set
header 用于主题消息。
我目前正在编写两个脚本来使用 stomp 客户端库订阅消息服务器,write.py 来编写data 和 read.py 获取数据。
如果我先开始read.py然后运行write.py,write.py 正确接收消息。
但是,如果我先运行write.py然后运行read.py, read.py 不检索任何以前发送到服务器的消息。
以下是脚本的相关部分。
如何实现 write.py 放入队列的消息一直保留到 read.py订阅并检索它们?
write.py
def writeMQ(msg):
queue = '/topic/test'
conn = stomp.Connection(host_and_ports=[(MQ_SERVER, MQ_PORT)])
try:
conn.start()
conn.connect(MQ_USER, MQ_PASSWD, wait=True)
conn.send(body=msg, destination=queue, persistent=True)
except:
traceback.print_exc()
finally:
conn.disconnect()
return
read.py
class MyListener(stomp.ConnectionListener):
def on_error(self, headers, message):
print ('received an error {0}'.format(message))
def on_message(self, headers, message):
print ('received an message {0}'.format(message))
def readMQ():
queue = '/topic/test'
conn = stomp.Connection(host_and_ports=[(MQ_SERVER, MQ_PORT)])
try:
conn.set_listener("", MyListener())
conn.start()
conn.connect(MQ_USER, MQ_PASSWD, wait=True)
conn.subscribe(destination=queue, ack="auto", id=1)
stop = raw_input()
except:
traceback.print_exc()
finally:
conn.disconnect()
return
问题是消息正在发送到主题。
Apollo Documentation描述主题和queue的区别如下:
Queues hold on to unconsumed messages even when there are no subscriptions attached, while a topic will drop messages when there are no connected subscriptions.
因此,当read.py首先启动并监听时,主题识别订阅并转发消息。但是当 write.py 首先启动时,消息被丢弃,因为没有订阅的客户端。
因此您可以使用 queue 而不是主题。如果服务器能够静默创建 queue,只需设置
queue = '/queue/test' .
不知道用的是哪个版本的stomp,但是找不到参数
send(..., persistent=True) .
无论如何,坚持不是正确的方法,因为它仍然不允许简单地保留消息以供以后连接,但会在服务器出现故障时保存消息。
您可以使用
retain:set
header 用于主题消息。