哪个 kafka 属性 决定了 KafkaConsumer 的轮询频率?

Which kafka property decides Poll frequency for KafkaConsumer?

我正在尝试了解有关 kafka 流(kafka 流客户端到 kafka)的一些细节。

我知道 KafkConsumer(java 客户端)会从 kafka 获取数据,但是我无法理解客户端轮询 kakfa 主题以获取数据的频率是多少?

轮询的频率由您的代码定义,因为您负责调用轮询。 使用 KafkaConsumer 的用户代码的一个非常天真的例子就像 following

public class KafkaConsumerExample {
  ...


    static void runConsumer() throws InterruptedException {
        final Consumer<Long, String> consumer = createConsumer();

        final int giveUp = 100;   int noRecordsCount = 0;

        while (true) {
            final ConsumerRecords<Long, String> consumerRecords =
                    consumer.poll(1000);

            if (consumerRecords.count()==0) {
                noRecordsCount++;
                if (noRecordsCount > giveUp) break;
                else continue;
            }

            consumerRecords.forEach(record -> {
                System.out.printf("Consumer Record:(%d, %s, %d, %d)\n",
                        record.key(), record.value(),
                        record.partition(), record.offset());
            });

            consumer.commitAsync();
        }
        consumer.close();
        System.out.println("DONE");
    }
}

在这种情况下,频率由 consumerRecords.forEach 中处理消息的持续时间定义。

但是,请记住,如果您不调用 poll "fast enough",您的消费者将被代理协调器视为已死亡,并且将触发重新平衡。 这个"fast enough"是由kafka>=0.10.1.0中的属性max.poll.interval.ms决定的。有关详细信息,请参阅

max.poll.interval.ms 默认值为五分钟,因此如果您的 consumerRecords.forEach 花费的时间超过该时间,您的消费者将被视为已死亡。

如果您不想直接使用原始 KafkaConsumer,您可以使用 alpakka kafka,一个用于在 safe[= 中消费和生产 kafka 主题的库37=] 和背压方式(基于 akka 流)。
使用此库,轮询频率由配置决定 akka.kafka.consumer.poll-interval.
我们说是安全的,因为即使您的处理速度跟不上,它也会继续轮询以避免消费者被视为已死。它能够做到这一点,因为 KafkaConsumer 允许暂停消费者

 /**
     * Suspend fetching from the requested partitions. Future calls to {@link #poll(Duration)} will not return
     * any records from these partitions until they have been resumed using {@link #resume(Collection)}.
     * Note that this method does not affect partition subscription. In particular, it does not cause a group
     * rebalance when automatic assignment is used.
     * @param partitions The partitions which should be paused
     * @throws IllegalStateException if any of the provided partitions are not currently assigned to this consumer
     */
    @Override
    public void pause(Collection<TopicPartition> partitions) { ... }

要完全理解这一点,您应该阅读 akka-streams 和背压。