TTL(生存时间)如何应用于命名空间?

How is the TTL (Time to Live) applied to a namespace?

Apache Pulsar 具有官方文档 Message retention and expiry 主题下记录的 TTL 功能。但是,我无法确定在配置中的哪个位置设置了此检查的执行频率。使用标准 bin/pulsar standalone 命令,自定义命名空间,配置为 5 秒的 ttl bin/pulsar-admin namespaces set-message-ttl public/ttl-test --messageTTL 5.

我可以看到消息仅在设置的时间间隔后过期,并且以下日志消息打印到控制台:

15:11:59.337 [pulsar-msg-expiry-monitor-52-1] INFO org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor - [persistent://public/ttl-test/my-topic][spark-shell] Starting message expiry check, ttl= 5 seconds

我的问题的症结在于:我怎样才能提高检查消息是否超过 TTL 的速度?

代理中的配置 messageExpiryCheckIntervalInMinutes 确定命名空间主题检查过期消息的频率。

根据 configuration

上的官方文档

使用set-message-ttl命令并指定命名空间名称(对于持久主题默认为public/default)和时间。

bin/pulsar-admin namespaces set-message-ttl public/default --messageTTL 120

生产者和消费者实现ttl(python客户端)的示例代码

import pulsar
client = pulsar.Client('pulsar://localhost:6650')
producer = client.create_producer('my-topic-reader1')

producer.send(('Hello-Pulsar1').encode('utf-8'))
producer.send(('Hello-Pulsar2').encode('utf-8'))
producer.send(('Hello-Pulsar3').encode('utf-8'))

producer.close()
client.close()

您可以使用发送方法发送多条消息。 类.

中的主题名称应相同
import pulsar
client = pulsar.Client('pulsar://localhost:6650')
consumer = client.subscribe("my-topic-reader1", "my-subscription")

//receive all the messages.whatever we publish
msg = consumer.receive()
print("Received message '{}' id='{}'".format(msg.data(), msg.message_id()))

//Here we are not acknowledge all the messages.

//close the consumer and client
consumer.close()
client.close()

我们在 120 秒内再次打开客户端和消费者并尝试读取相同的消息,这不是 publish.the 我们再次关闭客户端和消费者。

稍后(120 秒后),我们将再次打开客户端和消费者,然后尝试接收消息。但它不应该来。在这种情况下,您正在实现生存时间。