如何在 Apache Beam 2.4 中替换 withFilenamePolicy?
How to replace withFilenamePolicy in Apache Beam 2.4?
我正在尝试从 Kafka 源中读取数据,按时间戳进行分区并使用 Apache Beam 2.4 写入 GCS。我想为输出文件应用自定义 FilenamePolicy
。
根据我在 Whosebug 和谷歌上的发现,这在过去是可能的,方法是使用
.apply(TextIO.write()
.to("gs://somebucket/")
.withFilenamePolicy(new PerWindowFiles(prefix))
.withWindowedWrites()
.withNumShards(1));
withFilenamePolicy
选项不再可用。在 Beam 2.4 中是如何完成的?
我尝试使用 documentation 示例中 FileIO
的 writeDynamic()
功能 - 但我不明白为什么我的 TextIO
不被接受作为输入:
withFilenamePolicy()
已在 2.2
中删除
您现在可以使用更简单的语法编写示例
pipeline.apply(Create.of(...))
.apply(TextIO.write()
.to(new PerWindowFiles("gs://somebucket/"))
.withTempDirectory(
FileBasedSink.convertToFileResourceIfPossible("gs://somebucket/tmp"))
.withWindowedWrites()
.withNumShards(1));
N.B。使用自定义 FileNamePolicy,您还需要明确指定 withTempDirectory
.
在您的第二个(屏幕截图)示例中,您使用的是默认 TextIO.sink()
,这是一个 FileIO.Sink<String>
来下沉 Event
s。您需要 Sink<Event>
的实例(它也将实现任何自定义文件命名)或像这样用 Contextful
包装您的 Event::getPayload
:
.apply(FileIO.<String, Event>writeDynamic()
.by(Event.getEventType)
.via(Contextful.fn(Event::getPayload))
我正在尝试从 Kafka 源中读取数据,按时间戳进行分区并使用 Apache Beam 2.4 写入 GCS。我想为输出文件应用自定义 FilenamePolicy
。
根据我在 Whosebug 和谷歌上的发现,这在过去是可能的,方法是使用
.apply(TextIO.write()
.to("gs://somebucket/")
.withFilenamePolicy(new PerWindowFiles(prefix))
.withWindowedWrites()
.withNumShards(1));
withFilenamePolicy
选项不再可用。在 Beam 2.4 中是如何完成的?
我尝试使用 documentation 示例中 FileIO
的 writeDynamic()
功能 - 但我不明白为什么我的 TextIO
不被接受作为输入:
withFilenamePolicy()
已在 2.2
您现在可以使用更简单的语法编写示例
pipeline.apply(Create.of(...))
.apply(TextIO.write()
.to(new PerWindowFiles("gs://somebucket/"))
.withTempDirectory(
FileBasedSink.convertToFileResourceIfPossible("gs://somebucket/tmp"))
.withWindowedWrites()
.withNumShards(1));
N.B。使用自定义 FileNamePolicy,您还需要明确指定 withTempDirectory
.
在您的第二个(屏幕截图)示例中,您使用的是默认 TextIO.sink()
,这是一个 FileIO.Sink<String>
来下沉 Event
s。您需要 Sink<Event>
的实例(它也将实现任何自定义文件命名)或像这样用 Contextful
包装您的 Event::getPayload
:
.apply(FileIO.<String, Event>writeDynamic()
.by(Event.getEventType)
.via(Contextful.fn(Event::getPayload))