Kafka consumer/partition vs thread/partition 关系

Kafka consumer/partition vs thread/partition relationship

我是 Apache Kafka 的新手,我想了解它们之间的区别:

  1. 创建属于同一组 ID 的两个消费者,它们从同一主题的两个分区中消费。
  2. 使用从同一主题的两个分区中消费的两个线程创建一个消费者。

在第一种方式中,我真正理解的是每个消费者只会消费与其“相关”的分区的消息,因为这两个消费者属于同一组。

正如我在下图示例中看到的,consumer1 将使用 AAAA 和 BBBB,而 consumer2 将使用 CCCC 和 DDDD。

在第二种方法中,正如我在文档中看到的那样:

Consumer with many threads: If processing a record takes a while, a single Consumer can run multiple threads to process records, but it is harder to manage offset for each Thread/Task. If one consumer runs multiple threads, then two messages on the same partitions could be processed by two different threads which make it hard to guarantee record delivery order without complex thread coordination. This setup might be appropriate if processing a single task takes a long time, but try to avoid it.

因此在下面的示例中,可能会发生一些不同的情况:

这两种方法之间的区别看起来微不足道。在第一种方法中,每个消费者仅按顺序使用分配给它的分区,而在第二种方法中,每个线程都可以从两个分区中使用,但可能会发生线程 1 消耗 AAAA 和线程 2 BBBB,并且 BBBB 的进程在进程 AAAA 之前完成完成了。

我用springboot开发了一个Kafka消费者。我的 kafka 主题有 5 个分区。每个分区都有需要顺序消费的信息。我决定只创建一个有 5 个线程的消费者,而不是创建 5 个消费者。从理论上讲,这并没有多大意义,因为我需要按顺序使用每个分区的信息,而创建线程会破坏该逻辑。​​

在 Kafka 侦听器中,我定义了 RECEIVED_PARTITION_ID 来打印哪个分区正在使用数据。我已经将 log4j 配置为打印哪个线程是 运行ning KafkaListener,例如:[org.springframework.kafka.KafkaListenerEndpointContainer#0-thread-C-1]

KafkaConsumer.java的代码:

@KafkaListener(topics = "${kafka.topic}", groupId = "${kafka.groupid}", containerFactory = "kafkaListenerContainerFactory")    
public void consumeJson(String mensaje,
            @Header(KafkaHeaders.RECEIVED_PARTITION_ID) String partitionID) throws InterruptedException {
        logger.info("-> Consumed JSon message");
        logger.info("-> Read from partition:" + partitionID);
        logger.info("-> Consumed JSON Message: [" + mensaje.toString()+"]");
}

KafkaConfiguration.java的代码:

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> config = new HashMap<>();

    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    config.put(ConsumerConfig.GROUP_ID_CONFIG, groupid);
    config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPoll);
    // Commit manual (para poner AckMode.BATCH), por configuración
    config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);

    return new DefaultKafkaConsumerFactory<>(config);
}

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String,String>> kafkaListenerContainerFactory(){
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
    factory.setConsumerFactory(consumerFactory());
    factory.setConcurrency(5);
    // Commit the offset when the listener returns after processing the record. 
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH);
    return factory;
}

当我 运行 我的 springboot 应用程序和我看到日志时,看起来一个 thread/partition 关系已经完成,就像一个 consumer/partition 已经建立一样。每个线程只消耗一个分区的数据!线程 0 -> 分区 0,线程 1 -> 分区 1,线程 2 -> 分区 2,线程 3 -> 分区 3,线程 4 -> 分区 4。我检查了 thwosands 的日志行,它总是匹配。

小部分日志:

[org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1][9980][e.e.s.l.KafkaConsumer:][][] Consumed JSon message Read from partition3

[org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1][9980][e.e.s.l.KafkaConsumer:][][] Consumed JSon message Read from partition1

[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1][9980][e.e.s.l.KafkaConsumer:][][] Consumed JSon message Read from partition0

[org.springframework.kafka.KafkaListenerEndpointContainer#0-4-C-1][9980][e.e.s.l.KafkaConsumer:][][] Consumed JSon message Read from partition4

[org.springframework.kafka.KafkaListenerEndpointContainer#0-2-C-1][9980][e.e.s.l.KafkaConsumer:][][] Consumed JSon message Read from partition2

[org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1][9980][e.e.s.l.KafkaConsumer:][][] Consumed JSon message Read from partition1

这对我来说没有意义。正如文档所说:

Consumer with many threads: If processing a record takes a while, a single Consumer can run multiple threads to process records, but it is harder to manage offset for each Thread/Task. If one consumer runs multiple threads, then two messages on the same partitions could be processed by two different threads which make it hard to guarantee record delivery order without complex thread coordination. This setup might be appropriate if processing a single task takes a long time, but try to avoid it.

任何人都可以向我解释发生了什么事吗? 同一消费者组5消费者/5分区关系和1消费者/5线程关系有什么区别?

现在看起来是一样的。

谢谢!

你永远不应该执行异步处理;使用5个消费者,每个消费者得到一个分区,是正确的做法,也是保证顺序处理的唯一方法。

使用单个消费者的多个线程会导致偏移量管理出现问题。

将并发设置为 5 创建 5 个消费者。

This doesn´t make sense to me. As the documentation says:

哪个“文档”?

最后一句话正确:

This setup might be appropriate if processing a single task takes a long time, but try to avoid it.

重要的词是might;但我会说永远不会。