在 Beam 中写入基于元素的文件名
Write to Element-Based Filename in Beam
我正在尝试将特定文件从 Bucket A 复制到 Bucket B。Bucket A 是结构化的(目录),而 Bucket B 没有目录。挑战在于我需要根据文件的原始文件名来命名我的文件。通常,我会创建自定义文件名策略并根据需要进行修改。但是,我知道访问原始文件名的唯一方法是通过每个元素并提取其元数据。我怎样才能访问 TextIO.write 中的每个元素?
我考虑过在 TextIO.write 之前创建一个转换,它接收元素的 pcollection 并输出 KV 的 pcollection,其中键是原始文件名,值是元素 (similar to this example ).但是,如果我这样做,我的作者怎么知道如何写 KV?
我能够通过在可序列化函数中使用 writedynamic 和按每个元素的文件名进行分区来获得这种工作的 hackkey 方式。然后我可以将分区类型传递给我的文件名策略,进而实现我想要的结果。话虽这么说,这似乎远非高效,也不是为此而设计的,因为我实际上不需要对任何东西进行分区。
这里有一些您可能希望考虑的方法,具体取决于您是尝试一次性复制还是创建某种方式来执行此系统:
如果您只是想复制文件。那么你可能根本不需要数据流。您可以使用 gsutil 来复制文件。
如果您只需要不加修改地复制文件并且仍然想使用数据流,您可以自己在数据流中使用 gsutil。
如果您需要转换每个文件。您可能希望进行对整个文件进行操作的转换,将其完全读入并完全修改,然后将其写出到自定义 ParDo 中。
替代使用 Dataflow。每当创建 GCS 文件时,您都可以使用 google cloud functions to trigger。
注意:TextIO 和 FileIO 是基于记录的转换,而不是基于文件的转换。他们将一个文件 appart 拉入记录,以实现并行性。原始文件名和记录顺序并没有真正得到维护。我看到您曾尝试使用 KV 维护文件名,但正如您提到的 FileIO 不允许您为每条记录传递文件名。
当使用 writeDynamic
时,by
方法指定用于将传入数据分区到其相应目的地的标准。例如,如果这是根据 KV 对的密钥决定的,我们可以使用 .by(KV::getKey)
并且目标文件名可以通过 .withNaming
.
进行调整
此外,使用 via
方法,我们可以提供一个应用于每个分区的函数,如 here 所述。在这种情况下,我们想使用 select 目标的键,但我们不想将它们写入输出文件。因此,我们可以用.via(Contextful.fn(KV::getValue), TextIO.sink())
.
写值,省略键。
而 by
接受 SerializableFunction
作为参数,而 via
方法需要使用 Contextful<Contextful.Fn<UserT,OutputT>> outputFn
。这就是为什么我将 KV::getValue
包装在 Contextful.fn()
. In some examples like this template 中,提供上下文(例如所需的侧输入)可能很有用,但在这里我只想传递函数。
代码片段(更多细节)
p.apply("Create Data", Create.of(KV.of("one", "this is row 1"), KV.of("two", "this is row 2"), KV.of("three", "this is row 3"), KV.of("four", "this is row 4")))
.apply(FileIO.<String, KV<String, String>>writeDynamic()
.by(KV::getKey)
.withDestinationCoder(StringUtf8Coder.of())
.via(Contextful.fn(KV::getValue), TextIO.sink())
.to(output)
.withNaming(key -> FileIO.Write.defaultNaming("file-" + key, ".txt")));
我正在尝试将特定文件从 Bucket A 复制到 Bucket B。Bucket A 是结构化的(目录),而 Bucket B 没有目录。挑战在于我需要根据文件的原始文件名来命名我的文件。通常,我会创建自定义文件名策略并根据需要进行修改。但是,我知道访问原始文件名的唯一方法是通过每个元素并提取其元数据。我怎样才能访问 TextIO.write 中的每个元素?
我考虑过在 TextIO.write 之前创建一个转换,它接收元素的 pcollection 并输出 KV 的 pcollection,其中键是原始文件名,值是元素 (similar to this example ).但是,如果我这样做,我的作者怎么知道如何写 KV?
我能够通过在可序列化函数中使用 writedynamic 和按每个元素的文件名进行分区来获得这种工作的 hackkey 方式。然后我可以将分区类型传递给我的文件名策略,进而实现我想要的结果。话虽这么说,这似乎远非高效,也不是为此而设计的,因为我实际上不需要对任何东西进行分区。
这里有一些您可能希望考虑的方法,具体取决于您是尝试一次性复制还是创建某种方式来执行此系统:
如果您只是想复制文件。那么你可能根本不需要数据流。您可以使用 gsutil 来复制文件。
如果您只需要不加修改地复制文件并且仍然想使用数据流,您可以自己在数据流中使用 gsutil。
如果您需要转换每个文件。您可能希望进行对整个文件进行操作的转换,将其完全读入并完全修改,然后将其写出到自定义 ParDo 中。
替代使用 Dataflow。每当创建 GCS 文件时,您都可以使用 google cloud functions to trigger。
注意:TextIO 和 FileIO 是基于记录的转换,而不是基于文件的转换。他们将一个文件 appart 拉入记录,以实现并行性。原始文件名和记录顺序并没有真正得到维护。我看到您曾尝试使用 KV 维护文件名,但正如您提到的 FileIO 不允许您为每条记录传递文件名。
当使用 writeDynamic
时,by
方法指定用于将传入数据分区到其相应目的地的标准。例如,如果这是根据 KV 对的密钥决定的,我们可以使用 .by(KV::getKey)
并且目标文件名可以通过 .withNaming
.
此外,使用 via
方法,我们可以提供一个应用于每个分区的函数,如 here 所述。在这种情况下,我们想使用 select 目标的键,但我们不想将它们写入输出文件。因此,我们可以用.via(Contextful.fn(KV::getValue), TextIO.sink())
.
而 by
接受 SerializableFunction
作为参数,而 via
方法需要使用 Contextful<Contextful.Fn<UserT,OutputT>> outputFn
。这就是为什么我将 KV::getValue
包装在 Contextful.fn()
. In some examples like this template 中,提供上下文(例如所需的侧输入)可能很有用,但在这里我只想传递函数。
代码片段(更多细节
p.apply("Create Data", Create.of(KV.of("one", "this is row 1"), KV.of("two", "this is row 2"), KV.of("three", "this is row 3"), KV.of("four", "this is row 4")))
.apply(FileIO.<String, KV<String, String>>writeDynamic()
.by(KV::getKey)
.withDestinationCoder(StringUtf8Coder.of())
.via(Contextful.fn(KV::getValue), TextIO.sink())
.to(output)
.withNaming(key -> FileIO.Write.defaultNaming("file-" + key, ".txt")));