withHintMatchesManyFiles 真的可以在读取大量文件时提高 TextIO 的性能吗?
Can withHintMatchesManyFiles really improve the performance of TextIO when reading large number of files?
在这个中,我们知道
PCollection<String> lines = p.apply(TextIO.read()
.from("gs://some-bucket/many/files/*")
.withHintMatchesManyFiles());
Using this hint causes the transforms to execute in a way optimized for reading a large
number of files: the number of files that can be read in this case is practically unlimited,
and most likely the pipeline will run faster, cheaper and more reliably than without this hint.
但是pipeline这一步卡在下面代码
PCollection<String> lines = pipeline.apply("readDataFromGCS",
TextIO.read().from(sourcePath + "/prefix*")
.withHintMatchesManyFiles()
.watchForNewFiles(Duration.standardMinutes(1), Watch.Growth.never()));
并且每分钟大约有 10 ~ 30MB 的新文件上传到 GCS。
但是,我们尝试从 GCS in pub/sub 读取文件,管道可以正常工作。
raw_event = p.apply("Read Sub Message", PubsubIO.readStrings().fromTopic(options.getTopic()))
.apply("Extract File Name", ParDo.of(new ExtractFileNameFn()))
.apply("Read file matchall", FileIO.matchAll())
.apply("Read file match", FileIO.readMatches())
.apply("Read file", TextIO.readFiles());
我在这里遗漏了什么吗?或者有没有其他方法可以更有效地从 GCS 读取大量文件?
我的管道的工作流程是从 GCS 读取数据,并在数据处理后接收到 Pub/Sub。
光束版本:2.16.0
当您尝试通过 Dataflow 使用 TextIO.read() 读取 zipped/compressed 文件时,压缩文件只能由单个工作程序和该工作程序的单个线程解压缩。这会导致您的管道等待单个工作人员解压缩所有数据,因此,系统会输出一条警告消息,指出您的管道被卡住了,但实际上,管道没有被卡住,只是在尝试解压缩您的数据.此时流数据时没有并行解压。
在这个
PCollection<String> lines = p.apply(TextIO.read() .from("gs://some-bucket/many/files/*") .withHintMatchesManyFiles());
Using this hint causes the transforms to execute in a way optimized for reading a large number of files: the number of files that can be read in this case is practically unlimited, and most likely the pipeline will run faster, cheaper and more reliably than without this hint.
但是pipeline这一步卡在下面代码
PCollection<String> lines = pipeline.apply("readDataFromGCS",
TextIO.read().from(sourcePath + "/prefix*")
.withHintMatchesManyFiles()
.watchForNewFiles(Duration.standardMinutes(1), Watch.Growth.never()));
并且每分钟大约有 10 ~ 30MB 的新文件上传到 GCS。
但是,我们尝试从 GCS in pub/sub 读取文件,管道可以正常工作。
raw_event = p.apply("Read Sub Message", PubsubIO.readStrings().fromTopic(options.getTopic()))
.apply("Extract File Name", ParDo.of(new ExtractFileNameFn()))
.apply("Read file matchall", FileIO.matchAll())
.apply("Read file match", FileIO.readMatches())
.apply("Read file", TextIO.readFiles());
我在这里遗漏了什么吗?或者有没有其他方法可以更有效地从 GCS 读取大量文件?
我的管道的工作流程是从 GCS 读取数据,并在数据处理后接收到 Pub/Sub。
光束版本:2.16.0
当您尝试通过 Dataflow 使用 TextIO.read() 读取 zipped/compressed 文件时,压缩文件只能由单个工作程序和该工作程序的单个线程解压缩。这会导致您的管道等待单个工作人员解压缩所有数据,因此,系统会输出一条警告消息,指出您的管道被卡住了,但实际上,管道没有被卡住,只是在尝试解压缩您的数据.此时流数据时没有并行解压。