Cloud Dataflow:读取整个文本文件而不是逐行读取

Cloud Dataflow: reading entire text files rather than lines by line

我正在寻找一种读取整个文件的方法,以便将每个文件完全读取为一个字符串。 我想在 gs://my_bucket/*/*.json 上传递 JSON 文本文件的模式,让 ParDo 然后完全处理每个文件。

最好的方法是什么?

我将给出最普遍有用的答案,即使在某些特殊情况 [1] 中您可能会采取不同的做法。

我想你要做的是定义一个新的FileBasedSource子类并使用Read.from(<source>)。您的来源还将包括 FileBasedReader 的子类; source 包含配置数据,reader 实际上进行读取。

我认为 API 的完整描述最好留给 Javadoc,但我会强调关键的覆盖点以及它们与您的需求的关系:

  • FileBasedSource#isSplittable() 您将要覆盖 return false。这将表明没有文件内拆分。
  • FileBasedSource#createForSubrangeOfFile(String, long, long) 您将覆盖 return 仅指定文件的子源。
  • FileBasedSource#createSingleFileReader() 您将重写为当前文件生成 FileBasedReader(该方法应假设它已经拆分为单个文件的级别)。

实施reader:

  • FileBasedReader#startReading(...) 你会重写什么都不做;框架已经为您打开了文件,它会关闭它。
  • FileBasedReader#readNextRecord() 您将覆盖以将整个文件作为单个元素读取。

[1] 一个简单的特殊情况示例是当您实际上只有少量文件时,您可以在提交作业之前展开它们,并且它们都需要相同的时间来处理。然后你可以只使用 Create.of(expand(<glob>)) 然后是 ParDo(<read a file>).

我自己也在寻找类似的解决方案。根据 Kenn 的建议和其他一些参考资料(例如 XMLSource.java),创建了以下似乎工作正常的自定义源。

我不是开发人员,所以如果有人对如何改进它有任何建议,请随时贡献。

public class FileIO {
// Match TextIO.
public static Read.Bounded<KV<String,String>> readFilepattern(String filepattern) {
    return Read.from(new FileSource(filepattern, 1));
}

public static class FileSource extends FileBasedSource<KV<String,String>> {
    private String filename = null;

    public FileSource(String fileOrPattern, long minBundleSize) {
        super(fileOrPattern, minBundleSize);
    }

    public FileSource(String filename, long minBundleSize, long startOffset, long endOffset) {
        super(filename, minBundleSize, startOffset, endOffset);
        this.filename = filename;
    }

    // This will indicate that there is no intra-file splitting.
    @Override
    public boolean isSplittable(){
        return false;
    }

    @Override
    public boolean producesSortedKeys(PipelineOptions options) throws Exception {
        return false;
    }

    @Override
    public void validate() {}

    @Override
    public Coder<KV<String,String>> getDefaultOutputCoder() {
        return KvCoder.of(StringUtf8Coder.of(),StringUtf8Coder.of());
    }

    @Override
    public FileBasedSource<KV<String,String>> createForSubrangeOfFile(String fileName, long start, long end) {
        return new FileSource(fileName, getMinBundleSize(), start, end);
    }

    @Override
    public FileBasedReader<KV<String,String>> createSingleFileReader(PipelineOptions options) {
        return new FileReader(this);
    }
}

/**
 * A reader that should read entire file of text from a {@link FileSource}.
 */
private static class FileReader extends FileBasedSource.FileBasedReader<KV<String,String>> {
    private static final Logger LOG = LoggerFactory.getLogger(FileReader.class);
    private ReadableByteChannel channel = null;
    private long nextOffset = 0;
    private long currentOffset = 0;
    private boolean isAtSplitPoint = false;
    private final ByteBuffer buf;
    private static final int BUF_SIZE = 1024;
    private KV<String,String> currentValue = null;
    private String filename;

    public FileReader(FileSource source) {
        super(source);
        buf = ByteBuffer.allocate(BUF_SIZE);
        buf.flip();
        this.filename = source.filename;
    }

    private int readFile(ByteArrayOutputStream out) throws IOException {
        int byteCount = 0;
        while (true) {
            if (!buf.hasRemaining()) {
                buf.clear();
                int read = channel.read(buf);
                if (read < 0) {
                    break;
                }
                buf.flip();
            }
            byte b = buf.get();
            byteCount++;

            out.write(b);
        }
        return byteCount;
    }

    @Override
    protected void startReading(ReadableByteChannel channel) throws IOException {
        this.channel = channel;
    }

    @Override
    protected boolean readNextRecord() throws IOException {
        currentOffset = nextOffset;

        ByteArrayOutputStream buf = new ByteArrayOutputStream();
        int offsetAdjustment = readFile(buf);
        if (offsetAdjustment == 0) {
            // EOF
            return false;
        }
        nextOffset += offsetAdjustment;
        isAtSplitPoint = true;
        currentValue = KV.of(this.filename,CoderUtils.decodeFromByteArray(StringUtf8Coder.of(), buf.toByteArray()));
        return true;
    }

    @Override
    protected boolean isAtSplitPoint() {
        return isAtSplitPoint;
    }

    @Override
    protected long getCurrentOffset() {
        return currentOffset;
    }

    @Override
    public KV<String,String> getCurrent() throws NoSuchElementException {
        return currentValue;
    }
}

}

一个更简单的方法是生成文件名列表并编写一个函数来单独处理每个文件。我显示的是 Python,但 Java 类似:

def generate_filenames():
  for shard in xrange(0, 300):
    yield 'gs://bucket/some/dir/myfilname-%05d-of-00300' % shard

with beam.Pipeline(...) as p:
  (p | generate_filenames()
     | beam.FlatMap(lambda filename: readfile(filename))
     | ...)

FileIO 会为您完成这些工作,而无需实现您自己的 FileBasedSource。

为您要阅读的每个文件创建匹配项:

mypipeline.apply("Read files from GCS", FileIO.match().filepattern("gs://mybucket/myfilles/*.txt"))

此外,如果您不希望 Dataflow 在未找到适合您的 filePattern 的文件时抛出异常,您可以这样阅读:

mypipeline.apply("Read files from GCS", FileIO.match().filepattern("gs://mybucket/myfilles/*.txt").withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW))

使用 FileIO 读取匹配项:

.apply("Read file matches", FileIO.readMatches())

以上代码 returns 一个 FileIO.ReadableFile 类型的 PCollection (PCollection)。然后创建一个 DoFn 来处理这些 ReadableFiles 以满足您的用例。

.apply("Process my files", ParDo.of(MyCustomDoFnToProcessFiles.create()))

您可以阅读 FileIO 的完整文档 here