使用从 TextIO 到 BigQuery 的无界 PCollection 时,数据卡在 BigQueryIO 内部的 Reshuffle/GroupByKey
When using unbounded PCollection from TextIO to BigQuery, data is stuck in Reshuffle/GroupByKey inside of BigQueryIO
我正在使用 TextIO 从云存储中读取数据。因为我想连续 运行 工作,所以我使用 watchForNewFiles。
为了完整性,如果我使用有界 PCollections(在批处理模式下没有 watchForNewFiles 和 BigQueryIO),我读取的数据工作正常,所以没有数据问题。
我有 p.run().waitUntilFinish();在我的代码中,所以管道运行。而且它不会给出任何错误。
Apache Beam 版本为 2.8.0
PCollection<String> stream =
p.apply("Read File", TextIO
.read()
.from(options.getInput())
.watchForNewFiles(
Duration.standardMinutes(1),
Watch.Growth.afterTimeSinceNewOutput(Duration.standardHours(1))
)
.withCompression(Compression.AUTO));
这工作得很好,一旦文件可用就会读取文件。 PCollection 是无界的,包含来自这些文件的文本行。
经过一些改造
PCollection<List<String>> lines = stream.apply("Parse CSV",
ParDo.of(new ParseCSV())
);
PCollection<TableRow> rows = lines.apply("Convert to BQ",
ParDo.of(new BigQueryConverter(schema))
);
ParseCSV 步骤通过 outputWithTimestamp 将时间戳添加到其接收器。
我最终得到了 TableRows 的 PCollection,准备流式传输到 BigQuery。
为此,我使用
WriteResult result = rows.apply("WriteToBigQuery",
BigQueryIO.
<TableRow>write()
.withFormatFunction(input -> input)
.withSchema(bqSchema)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
.withExtendedErrorInfo()
.to(options.getOutput())
);
这不会将数据写入 BigQuery。如果我查看 UI,我发现 BigQueryIO 确实
- ShardTableWrites
- TagWithUniqueId
- 重新洗牌
- Window.into
- GroupByKey
数据进入和离开前两步。但从来没有重新洗牌。这只读取数据但从不传递数据。 Reshuffle 中导致的步骤是 GroupByKey。
由于集合是无界的,我尝试将 window 配置为
lines = lines.apply(Window.configure()
.<List<String>>into(FixedWindows
.of(Duration.standardSeconds(10))
)
);
这会强制执行 GroupByKey 的任何操作在 10 秒后释放 window。但事实并非如此。
lines = lines.apply(Window.configure()
.<List<String>>into(FixedWindows
.of(Duration.standardSeconds(10))
)
.triggering(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(10)))
.withAllowedLateness(Duration.standardSeconds(0))
.discardingFiredPanes()
);
在处理时间上添加特定触发器也无济于事。
有什么线索吗?提前致谢!
一个解决方法可能是(对我有用)为每个元素分配一个新键并强制数据流使用 Reshuffle 或 GroupByKey 解耦转换。
streams.apply(WithKeys.of(input -> 1)).setCoder(KvCoder.of(VarIntCoder.of(), StringUtf8Coder.of()))
.apply(Reshuffle.of())
.apply(MapElements.via(new SimpleFunction<KV<Integer, String>, String>() {
@Override
public String apply(KV<Integer, String> input) {
return input.getValue();
}
}))
.apply("convertToTableRow", ...)
.apply("WriteToBigQuery", ...)
密钥可以是示例中的常量或随机数。如果选择随机,则必须将范围设置得足够小以适合 JVM 内存。喜欢ThreadLocalRandom.current().nextInt(0, 5000)
我正在使用 TextIO 从云存储中读取数据。因为我想连续 运行 工作,所以我使用 watchForNewFiles。
为了完整性,如果我使用有界 PCollections(在批处理模式下没有 watchForNewFiles 和 BigQueryIO),我读取的数据工作正常,所以没有数据问题。
我有 p.run().waitUntilFinish();在我的代码中,所以管道运行。而且它不会给出任何错误。
Apache Beam 版本为 2.8.0
PCollection<String> stream =
p.apply("Read File", TextIO
.read()
.from(options.getInput())
.watchForNewFiles(
Duration.standardMinutes(1),
Watch.Growth.afterTimeSinceNewOutput(Duration.standardHours(1))
)
.withCompression(Compression.AUTO));
这工作得很好,一旦文件可用就会读取文件。 PCollection 是无界的,包含来自这些文件的文本行。
经过一些改造
PCollection<List<String>> lines = stream.apply("Parse CSV",
ParDo.of(new ParseCSV())
);
PCollection<TableRow> rows = lines.apply("Convert to BQ",
ParDo.of(new BigQueryConverter(schema))
);
ParseCSV 步骤通过 outputWithTimestamp 将时间戳添加到其接收器。
我最终得到了 TableRows 的 PCollection,准备流式传输到 BigQuery。 为此,我使用
WriteResult result = rows.apply("WriteToBigQuery",
BigQueryIO.
<TableRow>write()
.withFormatFunction(input -> input)
.withSchema(bqSchema)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
.withExtendedErrorInfo()
.to(options.getOutput())
);
这不会将数据写入 BigQuery。如果我查看 UI,我发现 BigQueryIO 确实
- ShardTableWrites
- TagWithUniqueId
- 重新洗牌
- Window.into
- GroupByKey
数据进入和离开前两步。但从来没有重新洗牌。这只读取数据但从不传递数据。 Reshuffle 中导致的步骤是 GroupByKey。
由于集合是无界的,我尝试将 window 配置为
lines = lines.apply(Window.configure()
.<List<String>>into(FixedWindows
.of(Duration.standardSeconds(10))
)
);
这会强制执行 GroupByKey 的任何操作在 10 秒后释放 window。但事实并非如此。
lines = lines.apply(Window.configure()
.<List<String>>into(FixedWindows
.of(Duration.standardSeconds(10))
)
.triggering(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(10)))
.withAllowedLateness(Duration.standardSeconds(0))
.discardingFiredPanes()
);
在处理时间上添加特定触发器也无济于事。 有什么线索吗?提前致谢!
一个解决方法可能是(对我有用)为每个元素分配一个新键并强制数据流使用 Reshuffle 或 GroupByKey 解耦转换。
streams.apply(WithKeys.of(input -> 1)).setCoder(KvCoder.of(VarIntCoder.of(), StringUtf8Coder.of()))
.apply(Reshuffle.of())
.apply(MapElements.via(new SimpleFunction<KV<Integer, String>, String>() {
@Override
public String apply(KV<Integer, String> input) {
return input.getValue();
}
}))
.apply("convertToTableRow", ...)
.apply("WriteToBigQuery", ...)
密钥可以是示例中的常量或随机数。如果选择随机,则必须将范围设置得足够小以适合 JVM 内存。喜欢ThreadLocalRandom.current().nextInt(0, 5000)