Google Cloud Dataflow 中的 FileBasedSource 使用示例

Example of FileBasedSource usage in Google Cloud Dataflow

有人可以 post 一个子类化 FileBasedSource 的简单示例吗?我是 Google Dataflow 的新手,对 Java 非常缺乏经验。我的目标是读取文件,同时将行号作为键,或者根据行号跳过行。

XMLSource 的实现是了解 FileBasedSource 工作原理的一个很好的起点。您的 reader 可能需要这样的东西(其中 readNextLine() 读取到行尾并更新偏移量):

protected void startReading(ReadableByteChannel channel) throws IOException {
  if (getCurrentSource().getMode() == FileBasedSource.Mode.SINGLE_FILE_OR_SUBRANGE) {
    // If we are not at the beginning of a line, we should ignore the current line.
    if (getCurrentSource().getStartOffset() > 0) {
      SeekableByteChannel seekChannel = (SeekableByteChannel) channel;
      // Start from one character back and read till we find a new line.
      seekChannel.position(seekChannel.position() - 1);
      nextOffset = seekChannel.position() + readNextLine(new ByteArrayOutputStream());
    }
  }
}

我已经创建了一个包含完整 LineIO 示例的要点,它可能比 XMLSource 更简单。