Spring-卡夫卡并发属性

Spring-Kafka Concurrency Property

我正在使用 Spring-Kafka 编写我的第一个 Kafka 消费者。查看了框架提供的不同选项,并且对此几乎没有疑问。如果您已经处理过,有人可以在下面澄清一下吗?

问题 - 1 : 根据 Spring-Kafka 文档,有两种实现 Kafka-Consumer 的方法; "You can receive messages by configuring a MessageListenerContainer and providing a message listener or by using the @KafkaListener annotation"。谁能告诉我什么时候应该选择一个选项而不是另一个选项?

问题 - 2:我选择了 KafkaListener 方法来编写我的应用程序。为此,我需要初始化一个容器工厂实例,并且在容器工厂内部有一个控制并发的选项。只是想仔细检查一下我对并发的理解是否正确。

假设,我有一个名为 MyTopic 的主题,其中有 4 个分区。为了使用来自 MyTopic 的消息,我启动了我的应用程序的 2 个实例,这些实例是通过将并发设置为 2 来启动的。因此,理想情况下,根据 kafka 分配策略,2 个分区应该转到 consumer1,另外 2 个分区应该转到 consumer2 .由于并发设置为2,是否每个消费者都会启动2个线程,并行消费主题中的数据?如果我们并行消费,我们还应该考虑任何事情。

问题 3 - 我选择了手动确认模式,并且不在外部管理偏移量(不将其持久化到任何 database/filesystem)。那么我是否需要编写自定义代码来处理再平衡,或者框架会自动管理它?我认为不会,因为我只是在处理完所有记录后才承认。

问题 - 4 : 另外,在手动 ACK 模式下,哪个 Listener 会提供更多性能? BATCH 消息监听器或普通消息监听器。我想如果我使用普通消息监听器,偏移量将在处理完每条消息后提交。

粘贴下面的代码供您参考。

批量确认消费者

    public void onMessage(List<ConsumerRecord<String, String>> records, Acknowledgment acknowledgment,
          Consumer<?, ?> consumer) {
      for (ConsumerRecord<String, String> record : records) {
          System.out.println("Record : " + record.value());
          // Process the message here..
          listener.addOffset(record.topic(), record.partition(), record.offset());
       }
       acknowledgment.acknowledge();
    }

正在初始化容器工厂:

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<String, String>(consumerConfigs());
}

@Bean
public Map<String, Object> consumerConfigs() {
    Map<String, Object> configs = new HashMap<String, Object>();
    configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServer);
    configs.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enablAutoCommit);
    configs.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPolInterval);
    configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
    configs.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
    configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    return configs;
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
    // Not sure about the impact of this property, so going with 1
    factory.setConcurrency(2);
    factory.setBatchListener(true);
    factory.getContainerProperties().setAckMode(AckMode.MANUAL);
    factory.getContainerProperties().setConsumerRebalanceListener(RebalanceListener.getInstance());
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setMessageListener(new BatchAckConsumer());
    return factory;
}
  1. @KafkaListener 是消息驱动的 "POJO" 它添加了负载转换、参数匹配等内容。如果你实现 MessageListener 你只能得到来自 Kafka 的原始 ConsumerRecord。见 @KafkaListener Annotation.

  2. 对,并发表示线程数;每个线程创建一个 Consumer;他们 运行 并行;在您的示例中,每个分区将获得 2 个分区。

Also should we consider anything if we are consuming in parallel.

您的侦听器必须是线程安全的(没有共享状态或任何此类状态需要锁保护。

  1. 你说的 "handle rebalance events" 不清楚是什么意思。当重新平衡发生时,框架将提交任何未决的偏移量。

  2. 没有区别;消息监听器对比批处理侦听器只是一种偏好。即使使用消息侦听器,使用 MANUAL ackmode,在处理完轮询的所有结果时也会提交偏移量。在 MANUAL_IMMEDIATE 模式下,偏移量是一个一个地提交的。

Q1:

根据文档,

The @KafkaListener annotation is used to designate a bean method as a listener for a listener container. The bean is wrapped in a MessagingMessageListenerAdapter configured with various features, such as converters to convert the data, if necessary, to match the method parameters.

You can configure most attributes on the annotation with SpEL by using "#{…​} or property placeholders (${…​}). See the Javadoc for more information."

这种方法对于简单的 POJO 侦听器很有用,您不需要实现任何接口。您还可以使用注释以声明方式收听任何主题和分区。您还可以 return 您收到的值,而在 MessageListener 的情况下,您受接口签名的约束。

Q2:

最好是。如果您有多个主题可以使用,那么它会变得更加复杂。 Kafka 默认使用 RangeAssignor,它有自己的行为(您可以更改它——查看更多详细信息 under)。

Q3:

如果你的消费者死了,就会有再平衡。如果您手动确认并且您的消费者在提交抵消之前死亡,则您无需执行任何操作,Kafka 会处理。但是您最终可能会收到一些重复的消息(至少一次)

Q4:

这取决于你所说的 "performance" 是什么意思。如果您的意思是延迟,那么尽可能快地使用每条记录将是可行的方法。如果要实现高吞吐量,那么批量消费效率更高

我使用 Spring kafka 和各种监听器编写了一些示例 - 查看 this repo