Flink:声明动态元组大小和类型
Flink: Declaring dynamic tuple size & type
有没有办法动态声明元组中的各种类型?
我找到了一种动态声明元组中列数的方法:
env.readCsvFile(filePath).tupleType(Tuple.getTupleClass(3))
但是没有任何类型参数,它会抛出错误:
Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: Tuple needs to be parameterized by using generics.
我想尽可能简单地使用 Tuple 中的所有元素 String
。以下作品:
env.readCsvFile(filePath).types(String.class, String.class);
这导致 Tuple2(String,String)
类型。但就我而言,我不知道 csv 中有多少列数据。但我很好地阅读所有列作为字符串。 (我知道最多有 25 列的限制)
我什至尝试通过指定 CsvInputFormat
的子类型来阅读:
env.readFile(new TupleCsvInputFormat(filePath,TypeInformation.of(String.class), filePath);
但是无法编译。不确定如何在我的案例中使用它。我也不确定如何扩展 Tuple class 来实现相同的目标(如果可能的话)。 TypeHint
似乎要求我事先知道列数。
我不确定其他 env.read...()
方法。我尝试了一些,但是像 ignoreFirstLine()
这样的一些方法不可用。它们只带有 CsvReader
.
所以,如果列数可以是任意的(通过输入传递),有人可以帮我找出读取 csv 的最佳方法,并将 Tuple
的每个元素作为简单的读取String
?
可以编写自己的方法来读取 CSV 文件。也许是这样的:
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
int n = 3; // number of columns here
Class[] types = IntStream.range(0, n).mapToObj(i -> String.class).toArray(Class[]::new);
DataSet<Tuple> csv = readCsv(env, "filename.csv", types);
csv.print();
}
private static DataSource<Tuple> readCsv(ExecutionEnvironment env, String filename, Class[] fieldTypes) {
TupleTypeInfo<Tuple> typeInfo = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(fieldTypes);
TupleCsvInputFormat<Tuple> inputFormat = new TupleCsvInputFormat<>(new Path(filename), typeInfo);
return new DataSource<>(env, inputFormat, typeInfo, Utils.getCallLocationName());
}
注意:此方法跳过调用 CsvReader
class 中的 configureInputFormat
方法。如果你需要它,你可以做到。
有没有办法动态声明元组中的各种类型?
我找到了一种动态声明元组中列数的方法:
env.readCsvFile(filePath).tupleType(Tuple.getTupleClass(3))
但是没有任何类型参数,它会抛出错误:
Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: Tuple needs to be parameterized by using generics.
我想尽可能简单地使用 Tuple 中的所有元素 String
。以下作品:
env.readCsvFile(filePath).types(String.class, String.class);
这导致 Tuple2(String,String)
类型。但就我而言,我不知道 csv 中有多少列数据。但我很好地阅读所有列作为字符串。 (我知道最多有 25 列的限制)
我什至尝试通过指定 CsvInputFormat
的子类型来阅读:
env.readFile(new TupleCsvInputFormat(filePath,TypeInformation.of(String.class), filePath);
但是无法编译。不确定如何在我的案例中使用它。我也不确定如何扩展 Tuple class 来实现相同的目标(如果可能的话)。 TypeHint
似乎要求我事先知道列数。
我不确定其他 env.read...()
方法。我尝试了一些,但是像 ignoreFirstLine()
这样的一些方法不可用。它们只带有 CsvReader
.
所以,如果列数可以是任意的(通过输入传递),有人可以帮我找出读取 csv 的最佳方法,并将 Tuple
的每个元素作为简单的读取String
?
可以编写自己的方法来读取 CSV 文件。也许是这样的:
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
int n = 3; // number of columns here
Class[] types = IntStream.range(0, n).mapToObj(i -> String.class).toArray(Class[]::new);
DataSet<Tuple> csv = readCsv(env, "filename.csv", types);
csv.print();
}
private static DataSource<Tuple> readCsv(ExecutionEnvironment env, String filename, Class[] fieldTypes) {
TupleTypeInfo<Tuple> typeInfo = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(fieldTypes);
TupleCsvInputFormat<Tuple> inputFormat = new TupleCsvInputFormat<>(new Path(filename), typeInfo);
return new DataSource<>(env, inputFormat, typeInfo, Utils.getCallLocationName());
}
注意:此方法跳过调用 CsvReader
class 中的 configureInputFormat
方法。如果你需要它,你可以做到。