保留消息直到使用 Python + Stomp 进行订阅

Retain Messages until a Subscription is Made using Python + Stomp

我目前正在编写两个脚本来使用 stomp 客户端库订阅消息服务器,write.py 来编写data 和 read.py 获取数据。

如果我先开始read.py然后运行write.py,write.py 正确接收消息。

但是,如果我先运行write.py然后运行read.py, read.py 不检索任何以前发送到服务器的消息。

以下是脚本的相关部分。

如何实现 write.py 放入队列的消息一直保留到 read.py订阅并检索它们?

write.py

def writeMQ(msg):
    queue = '/topic/test'
    conn = stomp.Connection(host_and_ports=[(MQ_SERVER, MQ_PORT)])
    try:
        conn.start()
        conn.connect(MQ_USER, MQ_PASSWD, wait=True)
        conn.send(body=msg, destination=queue, persistent=True)
    except:
        traceback.print_exc()
    finally:
        conn.disconnect()

    return

read.py

class MyListener(stomp.ConnectionListener):    
    def on_error(self, headers, message):    
        print ('received an error {0}'.format(message))    

    def on_message(self, headers, message):    
        print ('received an message {0}'.format(message))    


def readMQ():                                                
    queue = '/topic/test'     
    conn = stomp.Connection(host_and_ports=[(MQ_SERVER, MQ_PORT)])    
    try:                                                              
        conn.set_listener("", MyListener())                           
        conn.start()                                                  
        conn.connect(MQ_USER, MQ_PASSWD, wait=True)    

        conn.subscribe(destination=queue, ack="auto", id=1)                       


        stop = raw_input()                  
    except:                   
        traceback.print_exc()    
    finally:                     
        conn.disconnect()        

    return                   

问题是消息正在发送到主题。

Apollo Documentation描述主题和queue的区别如下:

Queues hold on to unconsumed messages even when there are no subscriptions attached, while a topic will drop messages when there are no connected subscriptions.

因此,当read.py首先启动并监听时,主题识别订阅并转发消息。但是当 write.py 首先启动时,消息被丢弃,因为没有订阅的客户端。

因此您可以使用 queue 而不是主题。如果服务器能够静默创建 queue,只需设置

queue = '/queue/test' .

不知道用的是哪个版本的stomp,但是找不到参数

send(..., persistent=True) .

无论如何,坚持不是正确的方法,因为它仍然不允许简单地保留消息以供以后连接,但会在服务器出现故障时保存消息。

您可以使用

retain:set

header 用于主题消息。