按关键阶段分组的光束流管道中的缓慢/滞后

Slowness / Lag in beam streaming pipeline in group by key stage

上下文

大家好,我一直在使用 Apache Beam 管道生成列式数据库以存储在 GCS 中,我有一个数据流来自 Kafka 并且有一个 window 的 1 米。

我想将那 1m window 的所有数据转换成一个列式 DB 文件(ORC 在我的例子中,可以是 Parquet 或其他任何东西),我写了一个此转换的管道。

问题

我遇到了普遍缓慢的问题。我怀疑这可能是由于我只有钥匙的钥匙转换组。真的有必要这样做吗?如果不是,应该怎么做?我读到 combine 对此不是很有用,因为我的管道并不是真正聚合数据而是创建合并文件。我真正需要的是每个 window 的可迭代对象列表,它将被转换为 ORC 文件。

管道表示法

input -> window -> group by key (only 1 key) -> pardo (to create DB) -> IO (to write to GCS)

我试过的

我试过使用分析器缩放 horizontally/vertically。使用分析器,我看到超过 50% 的时间通过按键操作进入组。我确实相信问题出在热键上,但我无法找到应该做什么的解决方案。当我删除 group by key 操作时,我的管道跟上了 Kafka 滞后(即,在 Kafka 端似乎不是问题)。

代码片段

p.apply("ReadLines", KafkaIO.<Long, byte[]>read().withBootstrapServers("myserver.com:9092")
    .withTopic(options.getInputTopic())
    .withTimestampPolicyFactory(MyTimePolicy.myTimestampPolicyFactory())
    .withConsumerConfigUpdates(Map.of("group.id", "mygroup-id")).commitOffsetsInFinalize()
    .withKeyDeserializer(LongDeserializer.class)
    .withValueDeserializer(ByteArrayDeserializer.class).withoutMetadata())
    .apply("UncompressSnappy", ParDo.of(new UncompressSnappy()))
    .apply("DecodeProto", ParDo.of(new DecodePromProto()))
    .apply("MapTSSample", ParDo.of(new MapTSSample()))
    .apply(Window.<TSSample>into(FixedWindows.of(Duration.standardMinutes(1)))
        .withTimestampCombiner(TimestampCombiner.END_OF_WINDOW))
    .apply(WithKeys.<Integer, TSSample>of(1))
    .apply(GroupByKey.<Integer, TSSample>create())
    .apply("CreateTSORC", ParDo.of(new CreateTSORC()))
    .apply(new WriteOneFilePerWindow(options.getOutput(), 1));

墙时间配置文件

https://gist.github.com/anandsinghkunwar/4cc26f7e3da7473af66ce9a142a74c35

问题确实似乎是 hot keys issue,我不得不更改我的管道来为 ORC 文件创建自定义 IO,并将分片数量增加到 50。我完全删除了 GroupByKey 。由于 Beam 尚未自动确定 FileIO.write() 的分片数量,您必须手动选择适合您的工作负载的数量。

此外,在 Google 数据流中启用流引擎 API 进一步加快了摄取速度。