Apache Spark 使用 Java 从 CSV 读取数组浮点数

Apache Spark read array float from CSV using Java

我正在使用 Java 处理一个新的 Spark 项目。我必须从 CSV 文件中读取一些数据,这些 CSV 有一个浮点数组,我不知道如何在我的数据集中获取这个数组。

我正在阅读此 CSV:

[CSV data image][1] https://imgur.com/a/PdrMhev

我正在尝试以这种方式获取数据:

Dataset<Row> typedTrainingData = sparkSession.sql("SELECT CAST(IDp as String) IDp, CAST(Instt as String) Instt, CAST(dataVector as String) dataVector FROM TRAINING_DATA");

我明白了:

root
 |-- IDp: string (nullable = true)
 |-- Instt: string (nullable = true)
 |-- dataVector: string (nullable = true)

+-------+-------------+-----------------+
|    IDp|        Instt|       dataVector|
+-------+-------------+-----------------+
|    p01|      V11apps|-0.41,-0.04,0.1..|
|    p02|      V21apps|-1.50,-1.50,-1...|
+-------+-------------+-----------------+

如您在架构中所见,我将数组作为字符串读取,但我想获取为数组。推荐?

我想在此加载的数据中使用 MLlib 的一些机器学习算法,因此我想将数据作为数组获取。

谢谢大家!!!!!!!!!

首先定义您的架构,

StructType customStructType = new StructType();
        customStructType = customStructType.add("_c0", DataTypes.StringType, false);
        customStructType = customStructType.add("_c1", DataTypes.StringType, false);
        customStructType = customStructType.add("_c2", DataTypes.createArrayType(DataTypes.LongType), false);

然后您可以将 df 映射到新架构,

    Dataset<Row> newDF = oldDF.map((MapFunction<Row, Row>) row -> {

        String strings[] = row.getString(3).split(","); 
        long[] result = new long[strings.length];
        for (int i = 0; i < strings.length; i++)
        result[i] = Long.parseLong(strings[i]);

        return RowFactory.create(row.getString(0),row.getString(1),result);
    }, RowEncoder.apply(customStructType));