maxOffsetsPerTrigger 与 spark 集群中的核心数
maxOffsetsPerTrigger vs number of cores in the spark cluster
例如,我的spark结构化流应用程序以Kafka作为消息源,下面是不同配置的详细信息。
卡夫卡设置:
消息来源:kafka
分区:40
输入参数:
每次触发最大偏移量:1000
集群设置:
工人人数 = 5
数量cores/worker = 8
问题:
通过以上设置,是否读取
(1000 * 5 * 8) = 每次40000条消息
或
(1000 * 5) = 5000 条消息每次
或
读取 1000 条消息并将其分发到 5 个工作节点?
根据 documentation:
Rate limit on maximum number of offsets processed per trigger interval. The specified total number of offsets will be proportionally split across topicPartitions of different volume.
所以这是您列表中的最后一个选项,每个执行器最多将处理每个触发器 200 个偏移量,在各个核心之间分配 (25 offsets/core)。但如果您在特定触发期间没有收集到足够的数据,它可能会更小。
此外,在新版本的 Spark 中,还有其他选项,例如,minOffsetsPerTrigger
允许处理更大的批次,以防您的触发周期没有足够的数据来处理。
例如,我的spark结构化流应用程序以Kafka作为消息源,下面是不同配置的详细信息。
卡夫卡设置:
消息来源:kafka
分区:40
输入参数:
每次触发最大偏移量:1000
集群设置:
工人人数 = 5
数量cores/worker = 8
问题:
通过以上设置,是否读取
(1000 * 5 * 8) = 每次40000条消息
或
(1000 * 5) = 5000 条消息每次
或
读取 1000 条消息并将其分发到 5 个工作节点?
根据 documentation:
Rate limit on maximum number of offsets processed per trigger interval. The specified total number of offsets will be proportionally split across topicPartitions of different volume.
所以这是您列表中的最后一个选项,每个执行器最多将处理每个触发器 200 个偏移量,在各个核心之间分配 (25 offsets/core)。但如果您在特定触发期间没有收集到足够的数据,它可能会更小。
此外,在新版本的 Spark 中,还有其他选项,例如,minOffsetsPerTrigger
允许处理更大的批次,以防您的触发周期没有足够的数据来处理。