Cloud Dataflow 中的 ETL 和解析 CSV 文件
ETL & Parsing CSV files in Cloud Dataflow
我是云数据流的新手,Java 所以我希望这是一个正确的问题。
我有一个包含 n 列和行的 csv 文件,可以是字符串、整数或时间戳。我是否需要为每一列创建一个新的 PCollection?
我在示例中找到的大部分文档都类似于:
PCollection<String> data = p.apply(TextIO.Read.from("gs://abc/def.csv"));
但对我来说,将整个 csv 文件作为字符串导入没有意义。我在这里缺少什么以及我应该如何设置我的 PCollections?
此示例将创建一个集合,文件中每行包含 1 String
,例如如果文件是:
Alex,28,111-222-3344
Sam,30,555-666-7788
Drew,19,123-45-6789
那么该集合在逻辑上将包含 "Alex,28,111-222-3344"
、"Sam,30,555-666-7788"
和 "Drew,19,123-45-6789"
。您可以通过 ParDo
或 MapElements
转换管道集合,在 Java 中应用进一步的解析代码,例如:
class User {
public String name;
public int age;
public String phone;
}
PCollection<String> lines = p.apply(TextIO.Read.from("gs://abc/def.csv"));
PCollection<User> users = lines.apply(MapElements.via((String line) -> {
User user = new User();
String[] parts = line.split(",");
user.name = parts[0];
user.age = Integer.parseInt(parts[1]);
user.phone = parts[2];
return user;
}).withOutputType(new TypeDescriptor<User>() {});)
line.split(",");
String.split 没有意义,如果行数据是这样的:
a,b,c,"we,have a string contains comma",d,e
处理csv数据的属性方法是导入一个csv库:
<dependency>
<groupId>com.opencsv</groupId>
<artifactId>opencsv</artifactId>
<version>3.7</version>
</dependency>
并在 ParDo 中使用以下代码:
public void processElement(ProcessContext c) throws IOException {
String line = c.element();
CSVParser csvParser = new CSVParser();
String[] parts = csvParser.parseLine(line);
}
我是云数据流的新手,Java 所以我希望这是一个正确的问题。
我有一个包含 n 列和行的 csv 文件,可以是字符串、整数或时间戳。我是否需要为每一列创建一个新的 PCollection?
我在示例中找到的大部分文档都类似于:
PCollection<String> data = p.apply(TextIO.Read.from("gs://abc/def.csv"));
但对我来说,将整个 csv 文件作为字符串导入没有意义。我在这里缺少什么以及我应该如何设置我的 PCollections?
此示例将创建一个集合,文件中每行包含 1 String
,例如如果文件是:
Alex,28,111-222-3344
Sam,30,555-666-7788
Drew,19,123-45-6789
那么该集合在逻辑上将包含 "Alex,28,111-222-3344"
、"Sam,30,555-666-7788"
和 "Drew,19,123-45-6789"
。您可以通过 ParDo
或 MapElements
转换管道集合,在 Java 中应用进一步的解析代码,例如:
class User {
public String name;
public int age;
public String phone;
}
PCollection<String> lines = p.apply(TextIO.Read.from("gs://abc/def.csv"));
PCollection<User> users = lines.apply(MapElements.via((String line) -> {
User user = new User();
String[] parts = line.split(",");
user.name = parts[0];
user.age = Integer.parseInt(parts[1]);
user.phone = parts[2];
return user;
}).withOutputType(new TypeDescriptor<User>() {});)
line.split(",");
String.split 没有意义,如果行数据是这样的:
a,b,c,"we,have a string contains comma",d,e
处理csv数据的属性方法是导入一个csv库:
<dependency>
<groupId>com.opencsv</groupId>
<artifactId>opencsv</artifactId>
<version>3.7</version>
</dependency>
并在 ParDo 中使用以下代码:
public void processElement(ProcessContext c) throws IOException {
String line = c.element();
CSVParser csvParser = new CSVParser();
String[] parts = csvParser.parseLine(line);
}