Flink:声明动态元组大小和类型

Flink: Declaring dynamic tuple size & type

有没有办法动态声明元组中的各种类型?

我找到了一种动态声明元组中列数的方法:

env.readCsvFile(filePath).tupleType(Tuple.getTupleClass(3))

但是没有任何类型参数,它会抛出错误:

Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: Tuple needs to be parameterized by using generics.

我想尽可能简单地使用 Tuple 中的所有元素 String。以下作品:

env.readCsvFile(filePath).types(String.class, String.class);

这导致 Tuple2(String,String) 类型。但就我而言,我不知道 csv 中有多少列数据。但我很好地阅读所有列作为字符串。 (我知道最多有 25 列的限制)

我什至尝试通过指定 CsvInputFormat 的子类型来阅读:

env.readFile(new TupleCsvInputFormat(filePath,TypeInformation.of(String.class), filePath);

但是无法编译。不确定如何在我的案例中使用它。我也不确定如何扩展 Tuple class 来实现相同的目标(如果可能的话)。 TypeHint 似乎要求我事先知道列数。

我不确定其他 env.read...() 方法。我尝试了一些,但是像 ignoreFirstLine() 这样的一些方法不可用。它们只带有 CsvReader.

所以,如果列数可以是任意的(通过输入传递),有人可以帮我找出读取 csv 的最佳方法,并将 Tuple 的每个元素作为简单的读取String?

可以编写自己的方法来读取 CSV 文件。也许是这样的:

public static void main(String[] args) throws Exception {
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    int n = 3; // number of columns here
    Class[] types = IntStream.range(0, n).mapToObj(i -> String.class).toArray(Class[]::new);
    DataSet<Tuple> csv = readCsv(env, "filename.csv", types);
    csv.print();
}

private static DataSource<Tuple> readCsv(ExecutionEnvironment env, String filename, Class[] fieldTypes) {
    TupleTypeInfo<Tuple> typeInfo = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(fieldTypes);
    TupleCsvInputFormat<Tuple> inputFormat = new TupleCsvInputFormat<>(new Path(filename), typeInfo);
    return new DataSource<>(env, inputFormat, typeInfo, Utils.getCallLocationName());
}

注意:此方法跳过调用 CsvReader class 中的 configureInputFormat 方法。如果你需要它,你可以做到。