如何将 ConsumerRebalanceListener 附加到 ReactiveKafkaConsumerTemplate
How to attach ConsumerRebalanceListener to ReactiveKafkaConsumerTemplate
对于非响应式消费者,我们可以使用 consumer.subscribe 附加一个监听器:
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
consumer.subscribe(Collections.singleton("example-topic-2020-6-24"),
new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.printf("onPartitionsRevoked - consumerName: %s, partitions: %s%n", name,
formatPartitions(partitions));
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
System.out.printf("onPartitionsAssigned - consumerName: %s, partitions: %s%n", name,
formatPartitions(partitions));
}
});
ReactiveKafkaConsumerTemplate 怎么做?
对于非响应式消费者,我们可以使用 consumer.subscribe 附加一个监听器:
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
consumer.subscribe(Collections.singleton("example-topic-2020-6-24"),
new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.printf("onPartitionsRevoked - consumerName: %s, partitions: %s%n", name,
formatPartitions(partitions));
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
System.out.printf("onPartitionsAssigned - consumerName: %s, partitions: %s%n", name,
formatPartitions(partitions));
}
});
ReactiveKafkaConsumerTemplate 怎么做?