Java Kafka Consumer 在多线程中的使用
Usage of Java Kafka Consumer in multiple threads
我正在考虑在线程池中使用 Kafka Consumer。我想出了这种方法。现在看起来工作正常,但我正在考虑缺点以及这种方法可能带来的问题。基本上我需要的是将记录处理与消费分离开来。此外,我需要有一个强有力的保证,即只有在处理完所有记录后才会提交。有人可以就如何做得更好提出建议或建议吗?
final var consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(topics);
final var threadPool = Executors.newFixedThreadPool(32);
while(true) {
ConsumerRecords<String, String> records;
synchronized (consumer) {
records = consumer.poll(Duration.ofMillis(100));
}
CompletableFuture.runAsync(this::processTask, threadPool).thenRun(() -> {
synchronized (consumer) {
consumer.commitSync();
}
});
}
问题
此解决方案不适合规定的要求:
Also, I need to have a strong guarantee that commits happens only after all records are processed
场景:
- 轮询读取 100 条记录,开始异步处理
- 轮询读取 5 条记录,开始异步处理
- 5 条记录的处理立即发生,消费者提交完成,而 100 条记录的处理仍在进行中
- 消费者崩溃
再次启动消费者时,最后一次提交将对应第105条记录。因此它将开始处理第 106 条记录,我们错过了成功处理记录 1-100 的机会。
您只需要通过以下方式提交您正在该轮询中处理的偏移量:
void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets);
此外,需要保证顺序,以便首先提交第一个轮询,然后是第二个,依此类推。这会相当复杂。
命题
我相信您正在尝试在消息处理中实现并发。这可以通过更简单的解决方案来实现。增加 max.poll.records 以读取像样的批次,将其分成更小的批次并 运行 它们以异步方式实现并发。完成所有批次后,提交给 kafka 消费者。
我看到了以下文章,它解耦了 kafka 中记录的消费和处理。您可以通过显式调用 poll()
方法并借助 pause()
和 resume()
方法处理记录来实现此目的。
我正在考虑在线程池中使用 Kafka Consumer。我想出了这种方法。现在看起来工作正常,但我正在考虑缺点以及这种方法可能带来的问题。基本上我需要的是将记录处理与消费分离开来。此外,我需要有一个强有力的保证,即只有在处理完所有记录后才会提交。有人可以就如何做得更好提出建议或建议吗?
final var consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(topics);
final var threadPool = Executors.newFixedThreadPool(32);
while(true) {
ConsumerRecords<String, String> records;
synchronized (consumer) {
records = consumer.poll(Duration.ofMillis(100));
}
CompletableFuture.runAsync(this::processTask, threadPool).thenRun(() -> {
synchronized (consumer) {
consumer.commitSync();
}
});
}
问题
此解决方案不适合规定的要求:
Also, I need to have a strong guarantee that commits happens only after all records are processed
场景:
- 轮询读取 100 条记录,开始异步处理
- 轮询读取 5 条记录,开始异步处理
- 5 条记录的处理立即发生,消费者提交完成,而 100 条记录的处理仍在进行中
- 消费者崩溃
再次启动消费者时,最后一次提交将对应第105条记录。因此它将开始处理第 106 条记录,我们错过了成功处理记录 1-100 的机会。
您只需要通过以下方式提交您正在该轮询中处理的偏移量:
void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets);
此外,需要保证顺序,以便首先提交第一个轮询,然后是第二个,依此类推。这会相当复杂。
命题
我相信您正在尝试在消息处理中实现并发。这可以通过更简单的解决方案来实现。增加 max.poll.records 以读取像样的批次,将其分成更小的批次并 运行 它们以异步方式实现并发。完成所有批次后,提交给 kafka 消费者。
我看到了以下文章,它解耦了 kafka 中记录的消费和处理。您可以通过显式调用 poll()
方法并借助 pause()
和 resume()
方法处理记录来实现此目的。