在 Apache Beam 中读取 CSV 文件时跳过 header

skip header while reading a CSV file in Apache Beam

我想跳过 CSV 文件中的 header 行。截至目前,我在将 header 加载到 google 存储之前手动删除它。

下面是我的代码:

PCollection<String> financeobj =p.apply(TextIO.read().from("gs://storage_path/Financials.csv"));        
    PCollection<ClassFinance> pojos5 = financeobj.apply(ParDo.of(new DoFn<String, ClassFinance>() { // converting String into classtype

        private static final long serialVersionUID = 1L;
        @ProcessElement
        public void processElement(ProcessContext c) {
            String[] strArr = c.element().split(",");
            ClassFinance fin = new ClassFinance();
            fin.setBeneficiaryFinance(strArr[0]);
            fin.setCatlibCode(strArr[1]);
            fin.set_rNR_(Double.valueOf(strArr[2]));
            fin.set_rNCS_(Double.valueOf(strArr[3]));
            fin.set_rCtb_(Double.valueOf(strArr[4]));
            fin.set_rAC_(Double.valueOf(strArr[5]));
            c.output(fin);
        }
    }));

我已经检查了 Whosebug 中的现有问题,但我认为它没有希望:跳过 header 行 - Cloud DataFlow 是否可行?

有什么帮助吗?

编辑 : 我试过类似下面的方法并且有效 :

PCollection<String> financeobj = p.apply(TextIO.read().from("gs://google-bucket/final_input/Financials123.csv"));       

    PCollection<ClassFinance> pojos5 = financeobj.apply(ParDo.of(new DoFn<String, ClassFinance>() { // converting String into classtype

        private static final long serialVersionUID = 1L;
        @ProcessElement
        public void processElement(ProcessContext c) {  
            String[] strArr2 = c.element().split(",");
            String header = Arrays.toString(strArr2);
            ClassFinance fin = new ClassFinance();

                if(header.contains("Beneficiary"))
                System.out.println("Header");
                else {
            fin.setBeneficiaryFinance(strArr2[0].trim());
            fin.setCatlibCode(strArr2[1].trim());
            fin.setrNR(Double.valueOf(strArr2[2].trim().replace("", "0")));
            fin.setrNCS(Double.valueOf(strArr2[3].trim().replace("", "0")));
            fin.setrCtb(Double.valueOf(strArr2[4].trim().replace("", "0")));
            fin.setrAC(Double.valueOf(strArr2[5].trim().replace("", "0")));
            c.output(fin);
            }
        }
    }));

您分享的较旧的 Stack Overflow post (Skipping header rows - is it possible with Cloud DataFlow?) 包含您问题的答案。

此选项在 Apache Beam SDK 中目前不可用,尽管在 Apache Beam JIRA 问题中有一个开放的功能请求跟踪器,BEAM-123。请注意,在撰写本文时,此功能请求仍处于打开状态且未得到解决,而且这种情况已经持续了 2 年。但是,从这个意义上说,似乎正在做一些努力,并且该问题的最新更新是从 2018 年 2 月开始的,所以我建议您随时了解该 JIRA 问题的最新情况,因为它最后移到了 sdk-java-core 组件,它可能会在那里得到更多的关注。

考虑到这些信息,我会说您使用的方法(在将文件上传到 GCS 之前删除 header)是最适合您的选择。我会避免手动执行此操作,因为您可以轻松编写脚本并自动执行 remove headerupload file 过程。


编辑:

我已经能够使用 DoFn 想出一个简单的过滤器。它可能不是最优雅的解决方案(我自己不是 Apache Beam 专家),但它确实有效,您可以根据自己的需要对其进行调整。它要求您事先知道正在上传的 CSV 文件的 header(因为它将按元素内容过滤),但同样,将其作为您可以根据需要修改的模板:

public class RemoveCSVHeader {
  // The Filter class
  static class FilterCSVHeaderFn extends DoFn<String, String> {
    String headerFilter;

    public FilterCSVHeaderFn(String headerFilter) {
      this.headerFilter = headerFilter;
    }

    @ProcessElement
    public void processElement(ProcessContext c) {
      String row = c.element();
      // Filter out elements that match the header
      if (!row.equals(this.headerFilter)) {
        c.output(row);
      }
    }
  }

  // The main class
  public static void main(String[] args) throws IOException {
    PipelineOptions options = PipelineOptionsFactory.create();
    Pipeline p = Pipeline.create(options);

    PCollection<String> vals = p.apply(TextIO.read().from("gs://BUCKET/FILE.csv"));

    String header = "col1,col2,col3,col4";

    vals.apply(ParDo.of(new FilterCSVHeaderFn(header)))
        .apply(TextIO.write().to("out"));

    p.run().waitUntilFinish();
  }
}

https://medium.com/@baranitharan/the-textio-write-1be1c07fbef0 TextIO.Write in Dataflow now has withHeader 函数向数据添加 header 行。此功能是在版本 1.7.0.

中添加的

因此您可以像这样向您的 csv 添加 header:

TextIO.Write.named("WriteToText")
            .to("/path/to/the/file")
            .withHeader("col_name1,col_name2,col_name3,col_name4")
            .withSuffix(".csv"));

withHeader 函数会自动在 header 行的末尾添加一个换行符。

这段代码对我有用。我已经使用 Filter.by() 从 csv 文件中过滤掉 header 行。

static void run(GcsToDbOptions options) {

Pipeline p = Pipeline.create(options);
// Read the CSV file from GCS input file path
p.apply("Read Rows from " + options.getInputFile(), TextIO.read()
    .from(options.getInputFile()))
    // filter the header row
    .apply("Remove header row",
        Filter.by((String row) -> !((row.startsWith("dwid") || row.startsWith("\"dwid\"")
            || row.startsWith("'dwid'")))))
    // write the rows to database using prepared statement
    .apply("Write to Auths Table in Postgres", JdbcIO.<String>write()
        .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(dataSource(options)))
        .withStatement(INSERT_INTO_MYTABLE)
        .withPreparedStatementSetter(new StatementSetter()));
PipelineResult result = p.run();
try {
  result.getState();
  result.waitUntilFinish();
} catch (UnsupportedOperationException e) {
  // do nothing
} catch (Exception e) {
  e.printStackTrace();
}}