读取多个 .gz 文件并识别哪一行属于哪个文件

Reading multiple .gz file and identifying which row belongs to which file

我正在读取多个 .gz 文件以使用 google 数据流进行处理。数据的最终目的地是 BigQuery。 BigQuery table 为 .gz 文件中的 csv 文件中的每一列提供专用列。 BQ table 中还有一列作为 file_name,它给出了该记录所属的文件名。我正在使用 TextIO.Read 读取文件并对其进行 ParDo 转换。在 DoFn 中有一种方法可以识别传入字符串所属的文件名。

我的代码如下所示:

PCollection<String> logs = pipeline.apply(TextIO.Read.named("ReadLines")
                .from("gcs path").withCompressionType(TextIO.CompressionType.AUTO));

PCollection<TableRow> formattedResults = logs.apply(ParDo.named("Format").of(new DoFn<String, TableRow>() {}

更新 1:

我现在正在尝试如下:

        PCollection<String> fileNamesCollection // this is collection of file names
        GcsIOChannelFactory channelFactory = new GcsIOChannelFactory(options.as(GcsOptions.class));
        PCollection<KV<String,String>> kv = fileNamesCollection.apply(ParDo.named("Format").of(new DoFn<String, KV<String,String>>() {
                private static final long serialVersionUID = 1L;

                @Override
                public void processElement(ProcessContext c) throws Exception {
                    ReadableByteChannel readChannel = channelFactory.open(c.element());
                    GZIPInputStream gzip = new GZIPInputStream(Channels.newInputStream(readChannel));
                    BufferedReader br = new BufferedReader(new InputStreamReader(gzip));

                    String line = null;
                    while ((line = br.readLine()) != null) {
                        c.output(KV.of(c.element(), line));
                    }
                }
        }));

但是当我 运行 这个程序得到 channelFactory 不可序列化时,我有任何通道工厂正在实现 Serializable 接口并且可以在这里使用。

更新2:我终于可以执行程序并成功提交作业了。感谢 jkff 的帮助。 下面是我的最终代码,我把它贴在这里,这样对其他人也有帮助。

        ProcessLogFilesOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
                .as(ProcessLogFilesOptions.class); // ProcessLogFilesOptions is a custom class
        DataflowWorkerLoggingOptions loggingOptions = options.as(DataflowWorkerLoggingOptions.class);
        loggingOptions.setDefaultWorkerLogLevel(Level.WARN);

        String jobName = "unique_job_name";
        options.as(BlockingDataflowPipelineOptions.class).setJobName(jobName);

        Pipeline pipeline = Pipeline.create(options);

        List<String> filesToProcess = new ArrayList<String>();
        for(String fileName : fileNameWithoutHrAndSuffix) { // fileNameWithoutHrAndSuffix has elements like Log_20160921,Log_20160922 etc
            filesToProcess.addAll((new GcsIOChannelFactory(options.as(GcsOptions.class))).match(LogDestinationStoragePath+fileName));
        }
        // at this time filesToProcess will have all logs files name as Log_2016092101.gz,Log_2016092102.gz,.........,Log_2016092201.gz,Log_2016092223.gz
        PCollection<String> fileNamesCollection = pipeline.apply(Create.of(filesToProcess));

        PCollection<KV<String,String>> kv = fileNamesCollection.apply(ParDo.named("Parsing_Files").of(new DoFn<String, KV<String,String>>() {
                private static final long serialVersionUID = 1L;
                @Override
                public void processElement(ProcessContext c) throws Exception {
                    // I have to create _options here because Options, GcsIOChannelFactory are non serializable
                    ProcessLogFilesOptions _options = PipelineOptionsFactory.as(ProcessLogFilesOptions.class);
                    GcsIOChannelFactory channelFactory = new GcsIOChannelFactory(_options.as(GcsOptions.class));
                    ReadableByteChannel readChannel = channelFactory.open(c.element());
                    GZIPInputStream gzip = new GZIPInputStream(Channels.newInputStream(readChannel));
                    BufferedReader br = new BufferedReader(new InputStreamReader(gzip));

                    String line = null;
                    while ((line = br.readLine()) != null) {
                        c.output(KV.of(c.element(), line));
                    }

                    br.close();
                    gzip.close();
                    readChannel.close();
                }
        }));

        // Performing reshuffling here as suggested
        PCollection <KV<String,String>> withFileName = kv.apply(Reshuffle.<String, String>of());

        PCollection<TableRow> formattedResults = withFileName
                .apply(ParDo.named("Generating_TableRow").of(new DoFn<KV<String,String>, TableRow>() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public void processElement(ProcessContext c) throws Exception {
                    KV<String,String> kv = c.element();
                    String logLine = kv.getValue();
                    String logFileName = kv.getKey();

                    // do further processing as you want here
        }));

        // Finally insert in BQ table the formattedResults

现在,答案是否定的。不幸的是,如果您需要访问文件名,在这种情况下最好的办法是自己实现文件模式扩展和文件解析(作为 ParDo)。您需要牢记以下几点:

  • 确保插入 redistribute right before the parsing ParDo, to prevent excessive fusion
  • 您可以使用 GcsIoChannelFactory 扩展文件模式(参见 this question) and to open a ReadableByteChannel. Use Channels.newInputStream to create an InputStream, then wrap it into Java's standard GZipInputStream and read it line-by-line - see this question for examples. Remember to close the streams 中的示例。

或者,您可以考虑编写自己的 file-based source。但是,在这种特殊情况下(.gz 文件)我不建议这样做,因为 API 主要用于可以从任何偏移量随机访问读取的文件。