使用多个子拓扑优化 Kafka Streams 应用程序
Optimizing a Kafka Streams Application with Multiple Sub-Topologies
我正在 运行 构建一个具有三个子拓扑的 Kafka Streams 应用程序。 activity的阶段大致如下:
stream
主题 A
selectKey
并重新分区主题 A to
主题 B
stream
主题 B
foreach
主题 B 到主题 C Producer
stream
主题 C
- 主题 C
to
主题 D
主题A、B、C各物化,也就是说如果每个主题有40个分区,我的最大并行度是120。
起初我运行宁宁 5 个流应用程序,每个应用程序有 8 个线程。通过此设置,我遇到了不一致的性能。似乎某些共享同一线程的子拓扑比其他子拓扑更渴望 CPU,一段时间后,我会收到此错误:Member [client_id] in group [consumer_group] has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
。一切都会重新平衡,这可能会导致性能下降,直到下一次失败和重新平衡。
我的问题如下:
- 多个子拓扑如何能够在一个线程上运行?轮询队列?
- 每个线程如何决定如何为其每个子拓扑分配计算资源?
- 在这种情况下,您如何优化线程与主题分区的比率以避免周期性的消费者故障?例如,1:1 比率能否确保更一致的性能?
- 如果您使用 1:1 比率,您如何确保每个线程都分配有自己的主题分区并且一些线程不会闲置?
线程将对不同子拓扑的所有主题进行 poll() 并检查记录 topic
元数据以将其提供给正确的任务。
每个子拓扑都被同等对待,即,如果您愿意,可用资源会均匀分布。
1:1 比率仅在您有足够的内核时才有用。我建议监控您的 CPU 利用率。如果它太高(大于 80%),你应该添加更多 cores/threads.
Kafka Streams 会自动为您处理。
一些一般性评论:
- 您可能会考虑增加
max.poll.interval.ms
配置以避免消费者退出群组
- 您可能会考虑减少
max.poll.records
以减少每次 poll()
调用的记录,从而减少两次连续调用 poll()
. 之间的时间
- 请注意,
max.poll.records
并不意味着增加 network/broker 通信——如果单个获取请求 return 的记录多于 max.poll.records
配置,数据将被缓冲在消费者中,下一个 poll()
将从缓冲数据中提供服务,避免代理往返
我正在 运行 构建一个具有三个子拓扑的 Kafka Streams 应用程序。 activity的阶段大致如下:
stream
主题 AselectKey
并重新分区主题 Ato
主题 Bstream
主题 Bforeach
主题 B 到主题 CProducer
stream
主题 C- 主题 C
to
主题 D
主题A、B、C各物化,也就是说如果每个主题有40个分区,我的最大并行度是120。
起初我运行宁宁 5 个流应用程序,每个应用程序有 8 个线程。通过此设置,我遇到了不一致的性能。似乎某些共享同一线程的子拓扑比其他子拓扑更渴望 CPU,一段时间后,我会收到此错误:Member [client_id] in group [consumer_group] has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
。一切都会重新平衡,这可能会导致性能下降,直到下一次失败和重新平衡。
我的问题如下:
- 多个子拓扑如何能够在一个线程上运行?轮询队列?
- 每个线程如何决定如何为其每个子拓扑分配计算资源?
- 在这种情况下,您如何优化线程与主题分区的比率以避免周期性的消费者故障?例如,1:1 比率能否确保更一致的性能?
- 如果您使用 1:1 比率,您如何确保每个线程都分配有自己的主题分区并且一些线程不会闲置?
线程将对不同子拓扑的所有主题进行 poll() 并检查记录
topic
元数据以将其提供给正确的任务。每个子拓扑都被同等对待,即,如果您愿意,可用资源会均匀分布。
1:1 比率仅在您有足够的内核时才有用。我建议监控您的 CPU 利用率。如果它太高(大于 80%),你应该添加更多 cores/threads.
Kafka Streams 会自动为您处理。
一些一般性评论:
- 您可能会考虑增加
max.poll.interval.ms
配置以避免消费者退出群组 - 您可能会考虑减少
max.poll.records
以减少每次poll()
调用的记录,从而减少两次连续调用poll()
. 之间的时间
- 请注意,
max.poll.records
并不意味着增加 network/broker 通信——如果单个获取请求 return 的记录多于max.poll.records
配置,数据将被缓冲在消费者中,下一个poll()
将从缓冲数据中提供服务,避免代理往返