在 Beam 中写入基于元素的文件名

Write to Element-Based Filename in Beam

我正在尝试将特定文件从 Bucket A 复制到 Bucket B。Bucket A 是结构化的(目录),而 Bucket B 没有目录。挑战在于我需要根据文件的原始文件名来命名我的文件。通常,我会创建自定义文件名策略并根据需要进行修改。但是,我知道访问原始文件名的唯一方法是通过每个元素并提取其元数据。我怎样才能访问 TextIO.write 中的每个元素?

我考虑过在 TextIO.write 之前创建一个转换,它接收元素的 pcollection 并输出 KV 的 pcollection,其中键是原始文件名,值是元素 (similar to this example ).但是,如果我这样做,我的作者怎么知道如何写 KV?

我能够通过在可序列化函数中使用 writedynamic 和按每个元素的文件名进行分区来获得这种工作的 hackkey 方式。然后我可以将分区类型传递给我的文件名策略,进而实现我想要的结果。话虽这么说,这似乎远非高效,也不是为此而设计的,因为我实际上不需要对任何东西进行分区。

这里有一些您可能希望考虑的方法,具体取决于您是尝试一次性复制还是创建某种方式来执行此系统:

如果您只是想复制文件。那么你可能根本不需要数据流。您可以使用 gsutil 来复制文件。

如果您只需要不加修改地复制文件并且仍然想使用数据流,您可以自己在数据流中使用 gsutil。

如果您需要转换每个文件。您可能希望进行对整个文件进行操作的转换,将其完全读入并完全修改,然后将其写出到自定义 ParDo 中。

替代使用 Dataflow。每当创建 GCS 文件时,您都可以使用 google cloud functions to trigger

注意:TextIO 和 FileIO 是基于记录的转换,而不是基于文件的转换。他们将一个文件 appart 拉入记录,以实现并行性。原始文件名和记录顺序并没有真正得到维护。我看到您曾尝试使用 KV 维护文件名,但正如您提到的 FileIO 不允许您为每条记录传递文件名。

当使用 writeDynamic 时,by 方法指定用于将传入数据分区到其相应目的地的标准。例如,如果这是根据 KV 对的密钥决定的,我们可以使用 .by(KV::getKey) 并且目标文件名可以通过 .withNaming.

进行调整

此外,使用 via 方法,我们可以提供一个应用于每个分区的函数,如 here 所述。在这种情况下,我们想使用 select 目标的键,但我们不想将它们写入输出文件。因此,我们可以用.via(Contextful.fn(KV::getValue), TextIO.sink()).

写值,省略键。

by 接受 SerializableFunction 作为参数,而 via 方法需要使用 Contextful<Contextful.Fn<UserT,OutputT>> outputFn。这就是为什么我将 KV::getValue 包装在 Contextful.fn(). In some examples like this template 中,提供上下文(例如所需的侧输入)可能很有用,但在这里我只想传递函数。

代码片段(更多细节

p.apply("Create Data", Create.of(KV.of("one", "this is row 1"), KV.of("two", "this is row 2"), KV.of("three", "this is row 3"), KV.of("four", "this is row 4")))
 .apply(FileIO.<String, KV<String, String>>writeDynamic()
    .by(KV::getKey)
    .withDestinationCoder(StringUtf8Coder.of())
    .via(Contextful.fn(KV::getValue), TextIO.sink())
    .to(output)
    .withNaming(key -> FileIO.Write.defaultNaming("file-" + key, ".txt")));