使用 Flink-Kafka 连接器均匀消费事件
Consuming events evenly using Flink-Kafka connector
我正在使用 Flink 处理来自 Kafka 的流数据。流程很基础,从Kafka消费,数据丰富,然后下沉到FS。
在我的例子中,分区数大于 Flink 并行度级别。我注意到 Flink 并没有从所有分区中均匀地消耗。
偶尔会在某些 Kafka 分区中产生滞后。
重启应用程序有助于 Flink 对消耗进行“重新平衡”,并快速关闭延迟。但是,过了一会儿,我发现其他分区有延迟等等。
看到这种行为,我尝试按照 Flink 文档中的建议使用 rebalance() 来重新平衡消耗率:
"Partitions elements round-robin, creating equal load per partition. Useful for performance optimization in the presence of data skew."
dataStream.rebalance();
代码更改很小,只需将 rebalance() 添加到数据流源即可。
运行 具有 rebalance() 的应用导致 Flink 出现非常奇怪的行为:
我把并行度设置为260,提交了一个job,但是不知为什么,job manager把槽数乘以4,看执行计划图,发现现在所有数据都被260消耗了核心,然后它被发送到 3 个接收器(希望是均匀的)。由于缺乏资源,作业失败。
因为我想使用 260 个内核,所以我尝试再次提交作业,这次并行度为 65 (=260/4)。
作业运行良好,但处理率较低。在网络 UI 中,我发现插槽总数不等于可用任务插槽 + 运行 任务。但是,如果我将 rtbJsonRequest(我提交的作业)称为具有 65(=260/4)个任务槽的作业,而不是它所写的 260,则等于。
长话短说,我正在尝试找到一种方法来平衡 Kafka 分区的消耗。根据 Flink 文档,rebalance() 是我需要的,但显然我用错了。
添加更多输入。主题中有520个分区,并行度为260(每个核心有2个分区)。
我看得很清楚,有几个分区的消耗率很低:
在源之后插入重新平衡不会平衡源本身,而是通过在作业图中插入循环网络洗牌来平衡后面的输入。这最多只能平衡接收器上的负载,这对您的问题没有帮助。
您总共使用了多少个 Kafka 分区?您正在使用主题或分区发现吗?重新启动作业有帮助似乎确实很奇怪。
我发现我的 2 个 Flink 任务管理器与其他 worker 相比处理速度非常低。
正如您在下面的屏幕截图中看到的,每秒少于 5K 个事件,而其他处理至少 37K:
这确实帮助我了解我遇到的是环境问题,而不是 Flink 问题。
就我而言,安装 CPU 调速器并重新启动机器解决了问题。
我在这个过程中学到了一个很重要的事情,默认情况下 Flink 不会发现 Kafka 分区。如果你想添加它,只需添加到你的属性中:
"flink.partition-discovery.interval-millis", "time_interval"
Properties properties = new Properties();
properties.setProperty("group.id", consumerGroup);
properties.setProperty("auto.offset.reset", autoOffsetReset);
properties.setProperty("bootstrap.servers", kafkaBootstrapServers);
properties.setProperty(
"flink.partition-discovery.interval-millis", "30000");
我将分区发现添加到属性中,作业抛出 NPE.Is 这是设置分区发现的正确方法 属性 ?
java.lang.NullPointerException: null
at org.apache.flink.streaming.connectors.kafka.internal.Kafka09PartitionDiscoverer.getAllPartitionsForTopics(Kafka09PartitionDiscoverer.java:77)
我正在使用 Flink 处理来自 Kafka 的流数据。流程很基础,从Kafka消费,数据丰富,然后下沉到FS。
在我的例子中,分区数大于 Flink 并行度级别。我注意到 Flink 并没有从所有分区中均匀地消耗。
偶尔会在某些 Kafka 分区中产生滞后。 重启应用程序有助于 Flink 对消耗进行“重新平衡”,并快速关闭延迟。但是,过了一会儿,我发现其他分区有延迟等等。
看到这种行为,我尝试按照 Flink 文档中的建议使用 rebalance() 来重新平衡消耗率:
"Partitions elements round-robin, creating equal load per partition. Useful for performance optimization in the presence of data skew."
dataStream.rebalance();
代码更改很小,只需将 rebalance() 添加到数据流源即可。 运行 具有 rebalance() 的应用导致 Flink 出现非常奇怪的行为:
我把并行度设置为260,提交了一个job,但是不知为什么,job manager把槽数乘以4,看执行计划图,发现现在所有数据都被260消耗了核心,然后它被发送到 3 个接收器(希望是均匀的)。由于缺乏资源,作业失败。
因为我想使用 260 个内核,所以我尝试再次提交作业,这次并行度为 65 (=260/4)。 作业运行良好,但处理率较低。在网络 UI 中,我发现插槽总数不等于可用任务插槽 + 运行 任务。但是,如果我将 rtbJsonRequest(我提交的作业)称为具有 65(=260/4)个任务槽的作业,而不是它所写的 260,则等于。
添加更多输入。主题中有520个分区,并行度为260(每个核心有2个分区)。
我看得很清楚,有几个分区的消耗率很低:
在源之后插入重新平衡不会平衡源本身,而是通过在作业图中插入循环网络洗牌来平衡后面的输入。这最多只能平衡接收器上的负载,这对您的问题没有帮助。
您总共使用了多少个 Kafka 分区?您正在使用主题或分区发现吗?重新启动作业有帮助似乎确实很奇怪。
我发现我的 2 个 Flink 任务管理器与其他 worker 相比处理速度非常低。
正如您在下面的屏幕截图中看到的,每秒少于 5K 个事件,而其他处理至少 37K:
这确实帮助我了解我遇到的是环境问题,而不是 Flink 问题。 就我而言,安装 CPU 调速器并重新启动机器解决了问题。
我在这个过程中学到了一个很重要的事情,默认情况下 Flink 不会发现 Kafka 分区。如果你想添加它,只需添加到你的属性中:
"flink.partition-discovery.interval-millis", "time_interval"
Properties properties = new Properties();
properties.setProperty("group.id", consumerGroup);
properties.setProperty("auto.offset.reset", autoOffsetReset);
properties.setProperty("bootstrap.servers", kafkaBootstrapServers);
properties.setProperty(
"flink.partition-discovery.interval-millis", "30000");
我将分区发现添加到属性中,作业抛出 NPE.Is 这是设置分区发现的正确方法 属性 ?
java.lang.NullPointerException: null
at org.apache.flink.streaming.connectors.kafka.internal.Kafka09PartitionDiscoverer.getAllPartitionsForTopics(Kafka09PartitionDiscoverer.java:77)