如何在没有订阅者的情况下发布
How to publish without subscriber
在对 pub/sub 和 xadd/xread 进行了一些测试后,我发现如果我的订阅者未开机,则无论何时启动订户。例如情况
- 您通过发布发送消息
- 您在通过发布发送消息后 10 秒打开您的订阅者并收听频道
- 消息将丢失。
我试过两种不同的代码,例如
Sub.py
import redis
import time
from config import configuration
client: redis = redis.Redis(
host=configuration.helheim.redis_host,
port=configuration.helheim.redis_port,
db=configuration.helheim.redis_database
)
while True:
test = client.xread({"sns": '$'}, None, 0)
print(test)
time.sleep(1)
Pub.py
import redis
from config import configuration
client: redis = redis.Redis(
host=configuration.helheim.redis_host,
port=configuration.helheim.redis_port,
db=configuration.helheim.redis_database
)
test = client.xadd("sns", {"status": "kill", "link": "https://www.sneakersnstuff.com/sv/product/49769/salomon-xa-alpine-mid-advanced"})
print(test)
Sub.py
EVENT_LISTENER.subscribe("sns")
while True:
message = EVENT_LISTENER.get_message()
if message and not message['data'] == 1:
message = json.loads(message['data'])
Pub.py
import redis
from config import configuration
client: redis = redis.Redis(
host=configuration.helheim.redis_host,
port=configuration.helheim.redis_port,
db=configuration.helheim.redis_database
)
channel = "sns"
client.publish(channel,
'{"status": "kill", "store": "sns", "link": "https://www.sneakersnstuff.com/sv/product/49769/salomon-xa-alpine-mid-advanced"}')
redis 中似乎没有保存持久的历史消息。
我的问题是,当我打开我的订阅者时,我如何才能阅读我已发布和阅读后删除的消息?
Pub/sub 从不保留消息。参见
流确实保留消息,请参阅 https://redis.io/commands/xread
问题是您正在使用带有特殊 $ id 的 xread,它只会带来您调用后添加的消息。
When blocking sometimes we want to receive just entries that are added to the stream via XADD starting from the moment we block. In such a case we are not interested in the history of already added entries. For this use case, we would have to check the stream top element ID, and use such ID in the XREAD command line. This is not clean and requires to call other commands, so instead it is possible to use the special $ ID to signal the stream that we want only the new things.
您可能想在第一次呼叫时尝试使用 0,然后使用最后的消息 ID。
如果你想避免在失败的情况下从零开始并且你不能在你的客户端中保留最后一个消息 ID,请了解 https://redis.io/topics/streams-intro#consumer-groups
在对 pub/sub 和 xadd/xread 进行了一些测试后,我发现如果我的订阅者未开机,则无论何时启动订户。例如情况
- 您通过发布发送消息
- 您在通过发布发送消息后 10 秒打开您的订阅者并收听频道
- 消息将丢失。
我试过两种不同的代码,例如
Sub.py
import redis
import time
from config import configuration
client: redis = redis.Redis(
host=configuration.helheim.redis_host,
port=configuration.helheim.redis_port,
db=configuration.helheim.redis_database
)
while True:
test = client.xread({"sns": '$'}, None, 0)
print(test)
time.sleep(1)
Pub.py
import redis
from config import configuration
client: redis = redis.Redis(
host=configuration.helheim.redis_host,
port=configuration.helheim.redis_port,
db=configuration.helheim.redis_database
)
test = client.xadd("sns", {"status": "kill", "link": "https://www.sneakersnstuff.com/sv/product/49769/salomon-xa-alpine-mid-advanced"})
print(test)
Sub.py
EVENT_LISTENER.subscribe("sns")
while True:
message = EVENT_LISTENER.get_message()
if message and not message['data'] == 1:
message = json.loads(message['data'])
Pub.py
import redis
from config import configuration
client: redis = redis.Redis(
host=configuration.helheim.redis_host,
port=configuration.helheim.redis_port,
db=configuration.helheim.redis_database
)
channel = "sns"
client.publish(channel,
'{"status": "kill", "store": "sns", "link": "https://www.sneakersnstuff.com/sv/product/49769/salomon-xa-alpine-mid-advanced"}')
redis 中似乎没有保存持久的历史消息。
我的问题是,当我打开我的订阅者时,我如何才能阅读我已发布和阅读后删除的消息?
Pub/sub 从不保留消息。参见
流确实保留消息,请参阅 https://redis.io/commands/xread
问题是您正在使用带有特殊 $ id 的 xread,它只会带来您调用后添加的消息。
When blocking sometimes we want to receive just entries that are added to the stream via XADD starting from the moment we block. In such a case we are not interested in the history of already added entries. For this use case, we would have to check the stream top element ID, and use such ID in the XREAD command line. This is not clean and requires to call other commands, so instead it is possible to use the special $ ID to signal the stream that we want only the new things.
您可能想在第一次呼叫时尝试使用 0,然后使用最后的消息 ID。
如果你想避免在失败的情况下从零开始并且你不能在你的客户端中保留最后一个消息 ID,请了解 https://redis.io/topics/streams-intro#consumer-groups