KafkaStream 吞吐量低
Low throughput on KafkaStream
我的 kafka 流吞吐量有一些问题。我尝试阅读一个包含 +90M 记录的主题。使用我的 kafka 流应用程序,它基本上只打印每条记录,我得到了 ~4K records/second 的吞吐量。但是,如果我使用基本的 kafka-avro-console-consumer 命令行使用完全相同的主题,我将获得 ~80K records/second 的吞吐量!是否有一些已知的限制可以解释为什么流应用程序的性能应该低于 kafka-avro-console-consumer 的基础?关于我应该调整哪个流配置以获得更好的性能的任何指导?
我的配置是:
Properties configs = new Properties();
configs.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.getBootstrapServer());
configs.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
kafkaConfig.getSchemaRegistryServer());
configs.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
configs.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, EARLIEST);
configs.put(StreamsConfig.APPLICATION_ID_CONFIG, "KS-test3");
并且拓扑简单地执行:
StreamsBuilder streamsBuilder = new StreamsBuilder();
streamsBuilder.stream(scheduleEventTopic)
.foreach(this::printRecord);
return streamsBuilder.build();
尝试将 max.poll.records
的值增加到更高的值。这个配置意味着你一次可以得到的记录数poll()
max.poll.records (1000 default)
您可能还想查看 max.poll.interval.ms
每次投票之间的时间并尝试减少它并查看。
此外,您可能希望增加流线程数并将其设置为否。您正在使用的主题的分区数。
num.stream.threads (1 default)
参考:https://docs.confluent.io/current/streams/developer-guide/config-streams.html
P.S:默认值来自上述参考,您的可能会有所不同。
我真的找到了我的问题。 commit.interval
设置为 0 以禁用我的聚合中的批处理。相反,我使用 cache.max.bytes.buffering
在不影响性能的情况下获得相同的效果。我的吞吐量从 4K tps 增加到 100k tps
我的 kafka 流吞吐量有一些问题。我尝试阅读一个包含 +90M 记录的主题。使用我的 kafka 流应用程序,它基本上只打印每条记录,我得到了 ~4K records/second 的吞吐量。但是,如果我使用基本的 kafka-avro-console-consumer 命令行使用完全相同的主题,我将获得 ~80K records/second 的吞吐量!是否有一些已知的限制可以解释为什么流应用程序的性能应该低于 kafka-avro-console-consumer 的基础?关于我应该调整哪个流配置以获得更好的性能的任何指导?
我的配置是:
Properties configs = new Properties();
configs.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.getBootstrapServer());
configs.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
kafkaConfig.getSchemaRegistryServer());
configs.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
configs.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, EARLIEST);
configs.put(StreamsConfig.APPLICATION_ID_CONFIG, "KS-test3");
并且拓扑简单地执行:
StreamsBuilder streamsBuilder = new StreamsBuilder();
streamsBuilder.stream(scheduleEventTopic)
.foreach(this::printRecord);
return streamsBuilder.build();
尝试将 max.poll.records
的值增加到更高的值。这个配置意味着你一次可以得到的记录数poll()
max.poll.records (1000 default)
您可能还想查看 max.poll.interval.ms
每次投票之间的时间并尝试减少它并查看。
此外,您可能希望增加流线程数并将其设置为否。您正在使用的主题的分区数。
num.stream.threads (1 default)
参考:https://docs.confluent.io/current/streams/developer-guide/config-streams.html
P.S:默认值来自上述参考,您的可能会有所不同。
我真的找到了我的问题。 commit.interval
设置为 0 以禁用我的聚合中的批处理。相反,我使用 cache.max.bytes.buffering
在不影响性能的情况下获得相同的效果。我的吞吐量从 4K tps 增加到 100k tps