在作业执行前过滤 GCS uris
Filtering GCS uris before job execution
我有一个无法解决的常见用例。
假设我有一个像 gs://mybucket/mydata/*/files.json
这样的文件模式,其中 * 应该与日期匹配。
假设我想保留 251 个日期(这是一个示例,假设有大量日期但没有元模式来匹配它们,例如 2019* 或其他日期)。
现在,我有两个选择:
- 为每个文件创建一个 TextIO,这太过分了,几乎每次都会失败(图表太大)
- 读取所有数据,然后在我的工作中从数据中过滤它:当您有 10 TB 的数据而您只需要 10 Gb 时,这也太过分了
就我而言,我只想做类似的事情(伪代码):
Read(LIST[uri1,uri2,...,uri251])
而且这条指令实际上在图表上产生了一个 TextIO 任务。
很抱歉,如果我遗漏了什么,但我找不到解决方法。
谢谢
好的,我找到了,这个命名误导了我:
Example 2: reading a PCollection of filenames.
Pipeline p = ...;
// E.g. the filenames might be computed from other data in the pipeline, or
// read from a data source.
PCollection<String> filenames = ...;
// Read all files in the collection.
PCollection<String> lines =
filenames
.apply(FileIO.matchAll())
.apply(FileIO.readMatches())
.apply(TextIO.readFiles());
(引自 Apache Beam 文档 https://beam.apache.org/releases/javadoc/2.13.0/org/apache/beam/sdk/io/TextIO.html)
因此我们需要生成 URIS 的 PCollection(使用 Create/of
)或从管道中读取它,然后匹配所有 uris(或我猜的模式)并读取所有文件。
我有一个无法解决的常见用例。
假设我有一个像 gs://mybucket/mydata/*/files.json
这样的文件模式,其中 * 应该与日期匹配。
假设我想保留 251 个日期(这是一个示例,假设有大量日期但没有元模式来匹配它们,例如 2019* 或其他日期)。 现在,我有两个选择:
- 为每个文件创建一个 TextIO,这太过分了,几乎每次都会失败(图表太大)
- 读取所有数据,然后在我的工作中从数据中过滤它:当您有 10 TB 的数据而您只需要 10 Gb 时,这也太过分了
就我而言,我只想做类似的事情(伪代码):
Read(LIST[uri1,uri2,...,uri251])
而且这条指令实际上在图表上产生了一个 TextIO 任务。 很抱歉,如果我遗漏了什么,但我找不到解决方法。
谢谢
好的,我找到了,这个命名误导了我:
Example 2: reading a PCollection of filenames.
Pipeline p = ...;
// E.g. the filenames might be computed from other data in the pipeline, or
// read from a data source.
PCollection<String> filenames = ...;
// Read all files in the collection.
PCollection<String> lines =
filenames
.apply(FileIO.matchAll())
.apply(FileIO.readMatches())
.apply(TextIO.readFiles());
(引自 Apache Beam 文档 https://beam.apache.org/releases/javadoc/2.13.0/org/apache/beam/sdk/io/TextIO.html)
因此我们需要生成 URIS 的 PCollection(使用 Create/of
)或从管道中读取它,然后匹配所有 uris(或我猜的模式)并读取所有文件。