Apache Beam TextIO 不适用于 Spark Runner
Apache Beam TextIO does not work with Spark Runner
我正在尝试 运行 我在 Spark 上的 Beam 代码用于 POC。我正在 运行 将应用程序连接到 Google Cloud Dataproc 进行测试。从 PubSub 主题读取并将消息写入 Google Cloud Storage 上的存储桶是一个非常简单的测试。 Dataproc 集群具有适用于 Spark 的正确版本,并且可以访问其他 GCP API。
我也尝试使用 FileIO,但这也不起作用。我尝试发布到另一个 PubSub 主题而不是写作,这很有效,但这不是我的用例。我在使用 TextIO 写入之前尝试打印,并确认我可以从 PubSub 读取消息。
这是管道:
PCollection<String> messages = pipeline
.apply(PubsubIO.readStrings().fromSubscription(sub))
.apply(Window.into(FixedWindows.of(Duration.standardSeconds(1))));
messages.apply(TextIO.write().to("gs://...").withNumShards(1).withWindowedWrites());
pipeline.run();
我没有看到有关 Dataproc 作业输出的任何日志。没有错误或任何东西。桶上也没有文件。
我发现这是触发的问题。下面是详细讨论:
https://lists.apache.org/thread.html/a831da3cd74159bf0e0f3fe77363b022cde943ba40c6ab68bb33d5bb@%3Cuser.beam.apache.org%3E
我通过将窗口转换更改为提前触发触发器来解决此问题:
.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10)))
.triggering(AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
.alignedTo(Duration.standardSeconds(10))))
.withAllowedLateness(Duration.standardSeconds(10))
.discardingFiredPanes())
我正在尝试 运行 我在 Spark 上的 Beam 代码用于 POC。我正在 运行 将应用程序连接到 Google Cloud Dataproc 进行测试。从 PubSub 主题读取并将消息写入 Google Cloud Storage 上的存储桶是一个非常简单的测试。 Dataproc 集群具有适用于 Spark 的正确版本,并且可以访问其他 GCP API。
我也尝试使用 FileIO,但这也不起作用。我尝试发布到另一个 PubSub 主题而不是写作,这很有效,但这不是我的用例。我在使用 TextIO 写入之前尝试打印,并确认我可以从 PubSub 读取消息。
这是管道:
PCollection<String> messages = pipeline
.apply(PubsubIO.readStrings().fromSubscription(sub))
.apply(Window.into(FixedWindows.of(Duration.standardSeconds(1))));
messages.apply(TextIO.write().to("gs://...").withNumShards(1).withWindowedWrites());
pipeline.run();
我没有看到有关 Dataproc 作业输出的任何日志。没有错误或任何东西。桶上也没有文件。
我发现这是触发的问题。下面是详细讨论:
https://lists.apache.org/thread.html/a831da3cd74159bf0e0f3fe77363b022cde943ba40c6ab68bb33d5bb@%3Cuser.beam.apache.org%3E
我通过将窗口转换更改为提前触发触发器来解决此问题:
.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10)))
.triggering(AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
.alignedTo(Duration.standardSeconds(10))))
.withAllowedLateness(Duration.standardSeconds(10))
.discardingFiredPanes())