使用 AMQP 在 Azure 事件中心检查消费者组的位置

Checkpointing position of a Consumer Group in Azure Event Hubs with AMQP

我正在开发一些代码,使用 AMQP 和 Apache Qpid 库来处理 Azure 事件中心的事件。我注意到的一件事是,当我的应用程序重新启动时,所有消息都会从消费者组/分区重新读取。

我的假设是我的消费者没有像它应该的那样设置检查点(基于 https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-features#event-consumers),但我不确定我需要在 JMS 消费者上设置什么选项才能做到这一点。

我当前的连接代码(在附加消息侦听器之前)看起来像这样:

    final ConnectionFactory factory = new JmsConnectionFactory(uri);
    final Connection connection = factory.createConnection();
    connection.start();

    final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

我需要在 URL 选项方面做些什么来导致检查点发生吗?

这个问题的简短回答是 AMQP 没有内置检查点的概念,而且 JMS 也没有。这样做的结果是,每次通过 AMQP 读取的应用程序启动时,它将从事件流的开头开始读取并重新处理所有内容。

如果应用程序开发得当(因为很可能有意倒带),这应该不会导致功能问题,但它确实有可能非常浪费资源。最后,我决定使用 Microsoft 的事件中心客户端 Java,它内置了检查点支持。

我在我的 github 页面上的一些示例代码中勾画出了这个,比较 https://github.com/michaeljmcd/eventhub-qpid-example and https://github.com/michaeljmcd/eventhub-client-example