如何为 Dataflow 作业指定多个输入路径

How to specify multiple input paths to a Dataflow job

我想 运行 通过来自 Google Cloud Storage 的多个输入进行 Dataflow 作业,但是我想传递给作业的路径不能仅使用 * glob 运算符。

考虑这些路径:

gs://bucket/some/path/20160208/input1
gs://bucket/some/path/20160208/input2
gs://bucket/some/path/20160209/input1
gs://bucket/some/path/20160209/input2
gs://bucket/some/path/20160210/input1
gs://bucket/some/path/20160210/input2
gs://bucket/some/path/20160211/input1
gs://bucket/some/path/20160211/input2
gs://bucket/some/path/20160212/input1
gs://bucket/some/path/20160212/input2

我希望我的工作处理 201602092016021020160211 目录中的文件,但不处理 20160208(第一个)和 20160212(最后一个)。实际上还有很多日期,我希望能够为我的工作指定一个任意范围的日期。

The docs for TextIO.Read 说:

Standard Java Filesystem glob patterns ("*", "?", "[..]") are supported.

但我无法让它工作。有一个 link 到 Java Filesystem glob patterns , which in turn links to getPathMatcher(String),它列出了所有通配选项。其中之一是 {a,b,c},它看起来完全符合我的需要,但是,如果我将 gs://bucket/some/path/201602{09,10,11}/* 传递给 TextIO.Read#from,我会得到 "Unable to expand file pattern".

也许文档的意思是只支持*?[…],如果是这样,怎么能我构建了一个 Dataflow 将接受的 glob,它可以匹配我上面描述的任意日期范围?

更新: 我发现我可以编写一大块代码,这样我就可以将路径前缀作为逗号分隔列表传递,从每个创建一个输入并使用 Flatten 转换,但这似乎是一种非常低效的方法。看起来第一步读取所有输入文件并立即将它们再次写出到 GCS 上的临时位置。只有在读取和写入所有输入后,实际处理才开始。这一步在我正在写的工作中是完全没有必要的。我希望作业读取第一个文件,开始处理它并读取下一个文件,依此类推。 这导致了很多其他问题,我会尝试让它工作,但感觉就像死了一样由于最初的重写而结束。

文档确实意味着仅支持 *?[...]。这意味着按字母顺序或数字顺序排列的任意子集或范围不能表示为单个 glob。

以下是一些可能适合您的方法:

  1. 如果文件路径中表示的日期也存在于文件中的记录中,那么最简单的解决方案是读取所有文件并使用 Filter 转换为 select 日期范围你有兴趣。
  2. 您尝试过的多次读取的方法 TextIO.Read 转换和展平它们对于小文件集是可行的;我们的 tf-idf example 做到了这一点。您 可以 表达具有少量 glob 的任意数值范围,因此这不需要每个文件读取一次(例如,两个字符范围“23 到 67”是 2[3-] 加上[3-5][0-9]加上6[0-7])
  3. 如果文件的子集比较随意,那么globs/filenames的数量可能会超过最大图的大小,最后的建议是将文件列表放入PCollection并使用ParDo 转换为读取每个文件并发出其内容。

希望对您有所帮助!