读取多个 .gz 文件并识别哪一行属于哪个文件
Reading multiple .gz file and identifying which row belongs to which file
我正在读取多个 .gz 文件以使用 google 数据流进行处理。数据的最终目的地是 BigQuery。 BigQuery table 为 .gz 文件中的 csv 文件中的每一列提供专用列。 BQ table 中还有一列作为 file_name,它给出了该记录所属的文件名。我正在使用 TextIO.Read 读取文件并对其进行 ParDo 转换。在 DoFn 中有一种方法可以识别传入字符串所属的文件名。
我的代码如下所示:
PCollection<String> logs = pipeline.apply(TextIO.Read.named("ReadLines")
.from("gcs path").withCompressionType(TextIO.CompressionType.AUTO));
PCollection<TableRow> formattedResults = logs.apply(ParDo.named("Format").of(new DoFn<String, TableRow>() {}
更新 1:
我现在正在尝试如下:
PCollection<String> fileNamesCollection // this is collection of file names
GcsIOChannelFactory channelFactory = new GcsIOChannelFactory(options.as(GcsOptions.class));
PCollection<KV<String,String>> kv = fileNamesCollection.apply(ParDo.named("Format").of(new DoFn<String, KV<String,String>>() {
private static final long serialVersionUID = 1L;
@Override
public void processElement(ProcessContext c) throws Exception {
ReadableByteChannel readChannel = channelFactory.open(c.element());
GZIPInputStream gzip = new GZIPInputStream(Channels.newInputStream(readChannel));
BufferedReader br = new BufferedReader(new InputStreamReader(gzip));
String line = null;
while ((line = br.readLine()) != null) {
c.output(KV.of(c.element(), line));
}
}
}));
但是当我 运行 这个程序得到 channelFactory 不可序列化时,我有任何通道工厂正在实现 Serializable 接口并且可以在这里使用。
更新2:我终于可以执行程序并成功提交作业了。感谢 jkff 的帮助。
下面是我的最终代码,我把它贴在这里,这样对其他人也有帮助。
ProcessLogFilesOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(ProcessLogFilesOptions.class); // ProcessLogFilesOptions is a custom class
DataflowWorkerLoggingOptions loggingOptions = options.as(DataflowWorkerLoggingOptions.class);
loggingOptions.setDefaultWorkerLogLevel(Level.WARN);
String jobName = "unique_job_name";
options.as(BlockingDataflowPipelineOptions.class).setJobName(jobName);
Pipeline pipeline = Pipeline.create(options);
List<String> filesToProcess = new ArrayList<String>();
for(String fileName : fileNameWithoutHrAndSuffix) { // fileNameWithoutHrAndSuffix has elements like Log_20160921,Log_20160922 etc
filesToProcess.addAll((new GcsIOChannelFactory(options.as(GcsOptions.class))).match(LogDestinationStoragePath+fileName));
}
// at this time filesToProcess will have all logs files name as Log_2016092101.gz,Log_2016092102.gz,.........,Log_2016092201.gz,Log_2016092223.gz
PCollection<String> fileNamesCollection = pipeline.apply(Create.of(filesToProcess));
PCollection<KV<String,String>> kv = fileNamesCollection.apply(ParDo.named("Parsing_Files").of(new DoFn<String, KV<String,String>>() {
private static final long serialVersionUID = 1L;
@Override
public void processElement(ProcessContext c) throws Exception {
// I have to create _options here because Options, GcsIOChannelFactory are non serializable
ProcessLogFilesOptions _options = PipelineOptionsFactory.as(ProcessLogFilesOptions.class);
GcsIOChannelFactory channelFactory = new GcsIOChannelFactory(_options.as(GcsOptions.class));
ReadableByteChannel readChannel = channelFactory.open(c.element());
GZIPInputStream gzip = new GZIPInputStream(Channels.newInputStream(readChannel));
BufferedReader br = new BufferedReader(new InputStreamReader(gzip));
String line = null;
while ((line = br.readLine()) != null) {
c.output(KV.of(c.element(), line));
}
br.close();
gzip.close();
readChannel.close();
}
}));
// Performing reshuffling here as suggested
PCollection <KV<String,String>> withFileName = kv.apply(Reshuffle.<String, String>of());
PCollection<TableRow> formattedResults = withFileName
.apply(ParDo.named("Generating_TableRow").of(new DoFn<KV<String,String>, TableRow>() {
private static final long serialVersionUID = 1L;
@Override
public void processElement(ProcessContext c) throws Exception {
KV<String,String> kv = c.element();
String logLine = kv.getValue();
String logFileName = kv.getKey();
// do further processing as you want here
}));
// Finally insert in BQ table the formattedResults
现在,答案是否定的。不幸的是,如果您需要访问文件名,在这种情况下最好的办法是自己实现文件模式扩展和文件解析(作为 ParDo
)。您需要牢记以下几点:
- 确保插入 redistribute right before the parsing
ParDo
, to prevent excessive fusion。
- 您可以使用
GcsIoChannelFactory
扩展文件模式(参见 this question) and to open a ReadableByteChannel
. Use Channels.newInputStream to create an InputStream
, then wrap it into Java's standard GZipInputStream
and read it line-by-line - see this question for examples. Remember to close the streams 中的示例。
或者,您可以考虑编写自己的 file-based source。但是,在这种特殊情况下(.gz 文件)我不建议这样做,因为 API 主要用于可以从任何偏移量随机访问读取的文件。
我正在读取多个 .gz 文件以使用 google 数据流进行处理。数据的最终目的地是 BigQuery。 BigQuery table 为 .gz 文件中的 csv 文件中的每一列提供专用列。 BQ table 中还有一列作为 file_name,它给出了该记录所属的文件名。我正在使用 TextIO.Read 读取文件并对其进行 ParDo 转换。在 DoFn 中有一种方法可以识别传入字符串所属的文件名。
我的代码如下所示:
PCollection<String> logs = pipeline.apply(TextIO.Read.named("ReadLines")
.from("gcs path").withCompressionType(TextIO.CompressionType.AUTO));
PCollection<TableRow> formattedResults = logs.apply(ParDo.named("Format").of(new DoFn<String, TableRow>() {}
更新 1:
我现在正在尝试如下:
PCollection<String> fileNamesCollection // this is collection of file names
GcsIOChannelFactory channelFactory = new GcsIOChannelFactory(options.as(GcsOptions.class));
PCollection<KV<String,String>> kv = fileNamesCollection.apply(ParDo.named("Format").of(new DoFn<String, KV<String,String>>() {
private static final long serialVersionUID = 1L;
@Override
public void processElement(ProcessContext c) throws Exception {
ReadableByteChannel readChannel = channelFactory.open(c.element());
GZIPInputStream gzip = new GZIPInputStream(Channels.newInputStream(readChannel));
BufferedReader br = new BufferedReader(new InputStreamReader(gzip));
String line = null;
while ((line = br.readLine()) != null) {
c.output(KV.of(c.element(), line));
}
}
}));
但是当我 运行 这个程序得到 channelFactory 不可序列化时,我有任何通道工厂正在实现 Serializable 接口并且可以在这里使用。
更新2:我终于可以执行程序并成功提交作业了。感谢 jkff 的帮助。 下面是我的最终代码,我把它贴在这里,这样对其他人也有帮助。
ProcessLogFilesOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(ProcessLogFilesOptions.class); // ProcessLogFilesOptions is a custom class
DataflowWorkerLoggingOptions loggingOptions = options.as(DataflowWorkerLoggingOptions.class);
loggingOptions.setDefaultWorkerLogLevel(Level.WARN);
String jobName = "unique_job_name";
options.as(BlockingDataflowPipelineOptions.class).setJobName(jobName);
Pipeline pipeline = Pipeline.create(options);
List<String> filesToProcess = new ArrayList<String>();
for(String fileName : fileNameWithoutHrAndSuffix) { // fileNameWithoutHrAndSuffix has elements like Log_20160921,Log_20160922 etc
filesToProcess.addAll((new GcsIOChannelFactory(options.as(GcsOptions.class))).match(LogDestinationStoragePath+fileName));
}
// at this time filesToProcess will have all logs files name as Log_2016092101.gz,Log_2016092102.gz,.........,Log_2016092201.gz,Log_2016092223.gz
PCollection<String> fileNamesCollection = pipeline.apply(Create.of(filesToProcess));
PCollection<KV<String,String>> kv = fileNamesCollection.apply(ParDo.named("Parsing_Files").of(new DoFn<String, KV<String,String>>() {
private static final long serialVersionUID = 1L;
@Override
public void processElement(ProcessContext c) throws Exception {
// I have to create _options here because Options, GcsIOChannelFactory are non serializable
ProcessLogFilesOptions _options = PipelineOptionsFactory.as(ProcessLogFilesOptions.class);
GcsIOChannelFactory channelFactory = new GcsIOChannelFactory(_options.as(GcsOptions.class));
ReadableByteChannel readChannel = channelFactory.open(c.element());
GZIPInputStream gzip = new GZIPInputStream(Channels.newInputStream(readChannel));
BufferedReader br = new BufferedReader(new InputStreamReader(gzip));
String line = null;
while ((line = br.readLine()) != null) {
c.output(KV.of(c.element(), line));
}
br.close();
gzip.close();
readChannel.close();
}
}));
// Performing reshuffling here as suggested
PCollection <KV<String,String>> withFileName = kv.apply(Reshuffle.<String, String>of());
PCollection<TableRow> formattedResults = withFileName
.apply(ParDo.named("Generating_TableRow").of(new DoFn<KV<String,String>, TableRow>() {
private static final long serialVersionUID = 1L;
@Override
public void processElement(ProcessContext c) throws Exception {
KV<String,String> kv = c.element();
String logLine = kv.getValue();
String logFileName = kv.getKey();
// do further processing as you want here
}));
// Finally insert in BQ table the formattedResults
现在,答案是否定的。不幸的是,如果您需要访问文件名,在这种情况下最好的办法是自己实现文件模式扩展和文件解析(作为 ParDo
)。您需要牢记以下几点:
- 确保插入 redistribute right before the parsing
ParDo
, to prevent excessive fusion。 - 您可以使用
GcsIoChannelFactory
扩展文件模式(参见 this question) and to open aReadableByteChannel
. Use Channels.newInputStream to create anInputStream
, then wrap it into Java's standardGZipInputStream
and read it line-by-line - see this question for examples. Remember to close the streams 中的示例。
或者,您可以考虑编写自己的 file-based source。但是,在这种特殊情况下(.gz 文件)我不建议这样做,因为 API 主要用于可以从任何偏移量随机访问读取的文件。