如何将 ConsumerRebalanceListener 附加到 ReactiveKafkaConsumerTemplate

How to attach ConsumerRebalanceListener to ReactiveKafkaConsumerTemplate

对于非响应式消费者,我们可以使用 consumer.subscribe 附加一个监听器:

      KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
      consumer.subscribe(Collections.singleton("example-topic-2020-6-24"),
              new ConsumerRebalanceListener() {
                  @Override
                  public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                      System.out.printf("onPartitionsRevoked - consumerName: %s, partitions: %s%n", name,
                              formatPartitions(partitions));
                  }

                  @Override
                  public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                      System.out.printf("onPartitionsAssigned - consumerName: %s, partitions: %s%n", name,
                              formatPartitions(partitions));
                  }
              });

ReactiveKafkaConsumerTemplate 怎么做?

你不能直接做;相反,您使用 ReceiveOptions...

注册听众

https://github.com/reactor/reactor-kafka/blob/1c677186782be613385278fa500316e4bbb8dfaf/src/main/java/reactor/kafka/receiver/ReceiverOptions.java#L94-L114