在 Apache Beam 中读取 CSV 文件时跳过 header
skip header while reading a CSV file in Apache Beam
我想跳过 CSV 文件中的 header 行。截至目前,我在将 header 加载到 google 存储之前手动删除它。
下面是我的代码:
PCollection<String> financeobj =p.apply(TextIO.read().from("gs://storage_path/Financials.csv"));
PCollection<ClassFinance> pojos5 = financeobj.apply(ParDo.of(new DoFn<String, ClassFinance>() { // converting String into classtype
private static final long serialVersionUID = 1L;
@ProcessElement
public void processElement(ProcessContext c) {
String[] strArr = c.element().split(",");
ClassFinance fin = new ClassFinance();
fin.setBeneficiaryFinance(strArr[0]);
fin.setCatlibCode(strArr[1]);
fin.set_rNR_(Double.valueOf(strArr[2]));
fin.set_rNCS_(Double.valueOf(strArr[3]));
fin.set_rCtb_(Double.valueOf(strArr[4]));
fin.set_rAC_(Double.valueOf(strArr[5]));
c.output(fin);
}
}));
我已经检查了 Whosebug 中的现有问题,但我认为它没有希望:跳过 header 行 - Cloud DataFlow 是否可行?
有什么帮助吗?
编辑 : 我试过类似下面的方法并且有效 :
PCollection<String> financeobj = p.apply(TextIO.read().from("gs://google-bucket/final_input/Financials123.csv"));
PCollection<ClassFinance> pojos5 = financeobj.apply(ParDo.of(new DoFn<String, ClassFinance>() { // converting String into classtype
private static final long serialVersionUID = 1L;
@ProcessElement
public void processElement(ProcessContext c) {
String[] strArr2 = c.element().split(",");
String header = Arrays.toString(strArr2);
ClassFinance fin = new ClassFinance();
if(header.contains("Beneficiary"))
System.out.println("Header");
else {
fin.setBeneficiaryFinance(strArr2[0].trim());
fin.setCatlibCode(strArr2[1].trim());
fin.setrNR(Double.valueOf(strArr2[2].trim().replace("", "0")));
fin.setrNCS(Double.valueOf(strArr2[3].trim().replace("", "0")));
fin.setrCtb(Double.valueOf(strArr2[4].trim().replace("", "0")));
fin.setrAC(Double.valueOf(strArr2[5].trim().replace("", "0")));
c.output(fin);
}
}
}));
您分享的较旧的 Stack Overflow post (Skipping header rows - is it possible with Cloud DataFlow?) 包含您问题的答案。
此选项在 Apache Beam SDK 中目前不可用,尽管在 Apache Beam JIRA 问题中有一个开放的功能请求跟踪器,BEAM-123。请注意,在撰写本文时,此功能请求仍处于打开状态且未得到解决,而且这种情况已经持续了 2 年。但是,从这个意义上说,似乎正在做一些努力,并且该问题的最新更新是从 2018 年 2 月开始的,所以我建议您随时了解该 JIRA 问题的最新情况,因为它最后移到了 sdk-java-core
组件,它可能会在那里得到更多的关注。
考虑到这些信息,我会说您使用的方法(在将文件上传到 GCS 之前删除 header)是最适合您的选择。我会避免手动执行此操作,因为您可以轻松编写脚本并自动执行 remove header ⟶ upload file 过程。
编辑:
我已经能够使用 DoFn
想出一个简单的过滤器。它可能不是最优雅的解决方案(我自己不是 Apache Beam 专家),但它确实有效,您可以根据自己的需要对其进行调整。它要求您事先知道正在上传的 CSV 文件的 header(因为它将按元素内容过滤),但同样,将其作为您可以根据需要修改的模板:
public class RemoveCSVHeader {
// The Filter class
static class FilterCSVHeaderFn extends DoFn<String, String> {
String headerFilter;
public FilterCSVHeaderFn(String headerFilter) {
this.headerFilter = headerFilter;
}
@ProcessElement
public void processElement(ProcessContext c) {
String row = c.element();
// Filter out elements that match the header
if (!row.equals(this.headerFilter)) {
c.output(row);
}
}
}
// The main class
public static void main(String[] args) throws IOException {
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
PCollection<String> vals = p.apply(TextIO.read().from("gs://BUCKET/FILE.csv"));
String header = "col1,col2,col3,col4";
vals.apply(ParDo.of(new FilterCSVHeaderFn(header)))
.apply(TextIO.write().to("out"));
p.run().waitUntilFinish();
}
}
https://medium.com/@baranitharan/the-textio-write-1be1c07fbef0
TextIO.Write in Dataflow now has withHeader 函数向数据添加 header 行。此功能是在版本 1.7.0.
中添加的
因此您可以像这样向您的 csv 添加 header:
TextIO.Write.named("WriteToText")
.to("/path/to/the/file")
.withHeader("col_name1,col_name2,col_name3,col_name4")
.withSuffix(".csv"));
withHeader 函数会自动在 header 行的末尾添加一个换行符。
这段代码对我有用。我已经使用 Filter.by() 从 csv 文件中过滤掉 header 行。
static void run(GcsToDbOptions options) {
Pipeline p = Pipeline.create(options);
// Read the CSV file from GCS input file path
p.apply("Read Rows from " + options.getInputFile(), TextIO.read()
.from(options.getInputFile()))
// filter the header row
.apply("Remove header row",
Filter.by((String row) -> !((row.startsWith("dwid") || row.startsWith("\"dwid\"")
|| row.startsWith("'dwid'")))))
// write the rows to database using prepared statement
.apply("Write to Auths Table in Postgres", JdbcIO.<String>write()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(dataSource(options)))
.withStatement(INSERT_INTO_MYTABLE)
.withPreparedStatementSetter(new StatementSetter()));
PipelineResult result = p.run();
try {
result.getState();
result.waitUntilFinish();
} catch (UnsupportedOperationException e) {
// do nothing
} catch (Exception e) {
e.printStackTrace();
}}
我想跳过 CSV 文件中的 header 行。截至目前,我在将 header 加载到 google 存储之前手动删除它。
下面是我的代码:
PCollection<String> financeobj =p.apply(TextIO.read().from("gs://storage_path/Financials.csv"));
PCollection<ClassFinance> pojos5 = financeobj.apply(ParDo.of(new DoFn<String, ClassFinance>() { // converting String into classtype
private static final long serialVersionUID = 1L;
@ProcessElement
public void processElement(ProcessContext c) {
String[] strArr = c.element().split(",");
ClassFinance fin = new ClassFinance();
fin.setBeneficiaryFinance(strArr[0]);
fin.setCatlibCode(strArr[1]);
fin.set_rNR_(Double.valueOf(strArr[2]));
fin.set_rNCS_(Double.valueOf(strArr[3]));
fin.set_rCtb_(Double.valueOf(strArr[4]));
fin.set_rAC_(Double.valueOf(strArr[5]));
c.output(fin);
}
}));
我已经检查了 Whosebug 中的现有问题,但我认为它没有希望:跳过 header 行 - Cloud DataFlow 是否可行?
有什么帮助吗?
编辑 : 我试过类似下面的方法并且有效 :
PCollection<String> financeobj = p.apply(TextIO.read().from("gs://google-bucket/final_input/Financials123.csv"));
PCollection<ClassFinance> pojos5 = financeobj.apply(ParDo.of(new DoFn<String, ClassFinance>() { // converting String into classtype
private static final long serialVersionUID = 1L;
@ProcessElement
public void processElement(ProcessContext c) {
String[] strArr2 = c.element().split(",");
String header = Arrays.toString(strArr2);
ClassFinance fin = new ClassFinance();
if(header.contains("Beneficiary"))
System.out.println("Header");
else {
fin.setBeneficiaryFinance(strArr2[0].trim());
fin.setCatlibCode(strArr2[1].trim());
fin.setrNR(Double.valueOf(strArr2[2].trim().replace("", "0")));
fin.setrNCS(Double.valueOf(strArr2[3].trim().replace("", "0")));
fin.setrCtb(Double.valueOf(strArr2[4].trim().replace("", "0")));
fin.setrAC(Double.valueOf(strArr2[5].trim().replace("", "0")));
c.output(fin);
}
}
}));
您分享的较旧的 Stack Overflow post (Skipping header rows - is it possible with Cloud DataFlow?) 包含您问题的答案。
此选项在 Apache Beam SDK 中目前不可用,尽管在 Apache Beam JIRA 问题中有一个开放的功能请求跟踪器,BEAM-123。请注意,在撰写本文时,此功能请求仍处于打开状态且未得到解决,而且这种情况已经持续了 2 年。但是,从这个意义上说,似乎正在做一些努力,并且该问题的最新更新是从 2018 年 2 月开始的,所以我建议您随时了解该 JIRA 问题的最新情况,因为它最后移到了 sdk-java-core
组件,它可能会在那里得到更多的关注。
考虑到这些信息,我会说您使用的方法(在将文件上传到 GCS 之前删除 header)是最适合您的选择。我会避免手动执行此操作,因为您可以轻松编写脚本并自动执行 remove header ⟶ upload file 过程。
编辑:
我已经能够使用 DoFn
想出一个简单的过滤器。它可能不是最优雅的解决方案(我自己不是 Apache Beam 专家),但它确实有效,您可以根据自己的需要对其进行调整。它要求您事先知道正在上传的 CSV 文件的 header(因为它将按元素内容过滤),但同样,将其作为您可以根据需要修改的模板:
public class RemoveCSVHeader {
// The Filter class
static class FilterCSVHeaderFn extends DoFn<String, String> {
String headerFilter;
public FilterCSVHeaderFn(String headerFilter) {
this.headerFilter = headerFilter;
}
@ProcessElement
public void processElement(ProcessContext c) {
String row = c.element();
// Filter out elements that match the header
if (!row.equals(this.headerFilter)) {
c.output(row);
}
}
}
// The main class
public static void main(String[] args) throws IOException {
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
PCollection<String> vals = p.apply(TextIO.read().from("gs://BUCKET/FILE.csv"));
String header = "col1,col2,col3,col4";
vals.apply(ParDo.of(new FilterCSVHeaderFn(header)))
.apply(TextIO.write().to("out"));
p.run().waitUntilFinish();
}
}
https://medium.com/@baranitharan/the-textio-write-1be1c07fbef0 TextIO.Write in Dataflow now has withHeader 函数向数据添加 header 行。此功能是在版本 1.7.0.
中添加的因此您可以像这样向您的 csv 添加 header:
TextIO.Write.named("WriteToText")
.to("/path/to/the/file")
.withHeader("col_name1,col_name2,col_name3,col_name4")
.withSuffix(".csv"));
withHeader 函数会自动在 header 行的末尾添加一个换行符。
这段代码对我有用。我已经使用 Filter.by() 从 csv 文件中过滤掉 header 行。
static void run(GcsToDbOptions options) {
Pipeline p = Pipeline.create(options);
// Read the CSV file from GCS input file path
p.apply("Read Rows from " + options.getInputFile(), TextIO.read()
.from(options.getInputFile()))
// filter the header row
.apply("Remove header row",
Filter.by((String row) -> !((row.startsWith("dwid") || row.startsWith("\"dwid\"")
|| row.startsWith("'dwid'")))))
// write the rows to database using prepared statement
.apply("Write to Auths Table in Postgres", JdbcIO.<String>write()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(dataSource(options)))
.withStatement(INSERT_INTO_MYTABLE)
.withPreparedStatementSetter(new StatementSetter()));
PipelineResult result = p.run();
try {
result.getState();
result.waitUntilFinish();
} catch (UnsupportedOperationException e) {
// do nothing
} catch (Exception e) {
e.printStackTrace();
}}