按关键阶段分组的光束流管道中的缓慢/滞后
Slowness / Lag in beam streaming pipeline in group by key stage
上下文
大家好,我一直在使用 Apache Beam
管道生成列式数据库以存储在 GCS
中,我有一个数据流来自 Kafka
并且有一个 window 的 1 米。
我想将那 1m window 的所有数据转换成一个列式 DB 文件(ORC
在我的例子中,可以是 Parquet
或其他任何东西),我写了一个此转换的管道。
问题
我遇到了普遍缓慢的问题。我怀疑这可能是由于我只有钥匙的钥匙转换组。真的有必要这样做吗?如果不是,应该怎么做?我读到 combine 对此不是很有用,因为我的管道并不是真正聚合数据而是创建合并文件。我真正需要的是每个 window 的可迭代对象列表,它将被转换为 ORC 文件。
管道表示法
input -> window -> group by key (only 1 key) -> pardo (to create DB) -> IO (to write to GCS)
我试过的
我试过使用分析器缩放 horizontally/vertically。使用分析器,我看到超过 50% 的时间通过按键操作进入组。我确实相信问题出在热键上,但我无法找到应该做什么的解决方案。当我删除 group by key
操作时,我的管道跟上了 Kafka
滞后(即,在 Kafka
端似乎不是问题)。
代码片段
p.apply("ReadLines", KafkaIO.<Long, byte[]>read().withBootstrapServers("myserver.com:9092")
.withTopic(options.getInputTopic())
.withTimestampPolicyFactory(MyTimePolicy.myTimestampPolicyFactory())
.withConsumerConfigUpdates(Map.of("group.id", "mygroup-id")).commitOffsetsInFinalize()
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(ByteArrayDeserializer.class).withoutMetadata())
.apply("UncompressSnappy", ParDo.of(new UncompressSnappy()))
.apply("DecodeProto", ParDo.of(new DecodePromProto()))
.apply("MapTSSample", ParDo.of(new MapTSSample()))
.apply(Window.<TSSample>into(FixedWindows.of(Duration.standardMinutes(1)))
.withTimestampCombiner(TimestampCombiner.END_OF_WINDOW))
.apply(WithKeys.<Integer, TSSample>of(1))
.apply(GroupByKey.<Integer, TSSample>create())
.apply("CreateTSORC", ParDo.of(new CreateTSORC()))
.apply(new WriteOneFilePerWindow(options.getOutput(), 1));
墙时间配置文件
https://gist.github.com/anandsinghkunwar/4cc26f7e3da7473af66ce9a142a74c35
问题确实似乎是 hot keys issue,我不得不更改我的管道来为 ORC 文件创建自定义 IO,并将分片数量增加到 50
。我完全删除了 GroupByKey
。由于 Beam 尚未自动确定 FileIO.write()
的分片数量,您必须手动选择适合您的工作负载的数量。
此外,在 Google 数据流中启用流引擎 API 进一步加快了摄取速度。
上下文
大家好,我一直在使用 Apache Beam
管道生成列式数据库以存储在 GCS
中,我有一个数据流来自 Kafka
并且有一个 window 的 1 米。
我想将那 1m window 的所有数据转换成一个列式 DB 文件(ORC
在我的例子中,可以是 Parquet
或其他任何东西),我写了一个此转换的管道。
问题
我遇到了普遍缓慢的问题。我怀疑这可能是由于我只有钥匙的钥匙转换组。真的有必要这样做吗?如果不是,应该怎么做?我读到 combine 对此不是很有用,因为我的管道并不是真正聚合数据而是创建合并文件。我真正需要的是每个 window 的可迭代对象列表,它将被转换为 ORC 文件。
管道表示法
input -> window -> group by key (only 1 key) -> pardo (to create DB) -> IO (to write to GCS)
我试过的
我试过使用分析器缩放 horizontally/vertically。使用分析器,我看到超过 50% 的时间通过按键操作进入组。我确实相信问题出在热键上,但我无法找到应该做什么的解决方案。当我删除 group by key
操作时,我的管道跟上了 Kafka
滞后(即,在 Kafka
端似乎不是问题)。
代码片段
p.apply("ReadLines", KafkaIO.<Long, byte[]>read().withBootstrapServers("myserver.com:9092")
.withTopic(options.getInputTopic())
.withTimestampPolicyFactory(MyTimePolicy.myTimestampPolicyFactory())
.withConsumerConfigUpdates(Map.of("group.id", "mygroup-id")).commitOffsetsInFinalize()
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(ByteArrayDeserializer.class).withoutMetadata())
.apply("UncompressSnappy", ParDo.of(new UncompressSnappy()))
.apply("DecodeProto", ParDo.of(new DecodePromProto()))
.apply("MapTSSample", ParDo.of(new MapTSSample()))
.apply(Window.<TSSample>into(FixedWindows.of(Duration.standardMinutes(1)))
.withTimestampCombiner(TimestampCombiner.END_OF_WINDOW))
.apply(WithKeys.<Integer, TSSample>of(1))
.apply(GroupByKey.<Integer, TSSample>create())
.apply("CreateTSORC", ParDo.of(new CreateTSORC()))
.apply(new WriteOneFilePerWindow(options.getOutput(), 1));
墙时间配置文件
https://gist.github.com/anandsinghkunwar/4cc26f7e3da7473af66ce9a142a74c35
问题确实似乎是 hot keys issue,我不得不更改我的管道来为 ORC 文件创建自定义 IO,并将分片数量增加到 50
。我完全删除了 GroupByKey
。由于 Beam 尚未自动确定 FileIO.write()
的分片数量,您必须手动选择适合您的工作负载的数量。
此外,在 Google 数据流中启用流引擎 API 进一步加快了摄取速度。