如何在 Apache Beam 2.4 中替换 withFilenamePolicy?

How to replace withFilenamePolicy in Apache Beam 2.4?

我正在尝试从 Kafka 源中读取数据,按时间戳进行分区并使用 Apache Beam 2.4 写入 GCS。我想为输出文件应用自定义 FilenamePolicy

根据我在 Whosebug 和谷歌上的发现,这在过去是可能的,方法是使用

.apply(TextIO.write()
                    .to("gs://somebucket/")
                    .withFilenamePolicy(new PerWindowFiles(prefix))
                    .withWindowedWrites()
                    .withNumShards(1));

withFilenamePolicy 选项不再可用。在 Beam 2.4 中是如何完成的?

我尝试使用 documentation 示例中 FileIOwriteDynamic() 功能 - 但我不明白为什么我的 TextIO 不被接受作为输入:

withFilenamePolicy() 已在 2.2

中删除

您现在可以使用更简单的语法编写示例

pipeline.apply(Create.of(...))
  .apply(TextIO.write()
    .to(new PerWindowFiles("gs://somebucket/"))
    .withTempDirectory(
        FileBasedSink.convertToFileResourceIfPossible("gs://somebucket/tmp"))
    .withWindowedWrites()
    .withNumShards(1));

N.B。使用自定义 FileNamePolicy,您还需要明确指定 withTempDirectory.

在您的第二个(屏幕截图)示例中,您使用的是默认 TextIO.sink(),这是一个 FileIO.Sink<String> 来下沉 Events。您需要 Sink<Event> 的实例(它也将实现任何自定义文件命名)或像这样用 Contextful 包装您的 Event::getPayload

.apply(FileIO.<String, Event>writeDynamic()
  .by(Event.getEventType)
  .via(Contextful.fn(Event::getPayload))