如何将 csv 字符串转换为 Spark-ML 兼容的 Dataset<Row> 格式?
How to transform a csv string into a Spark-ML compatible Dataset<Row> format?
我有一个 Dataset<Row> df
,它包含 string
类型的两列("key" 和 "value")。 df.printSchema();给我以下输出:
root
|-- key: string (nullable = true)
|-- value: string (nullable = true)
value 列的内容实际上是一个 csv 格式的行(来自 kafka 主题),该行的最后一个条目代表 class 标签,之前的所有条目都是特征(第一个行未包含在数据集中):
feature0,feature1,label
0.6720004294237854,-0.4033586564886893,0
0.6659082469383558,0.07688976580256132,0
0.8086502311695247,0.564354801275521,1
由于我想在此数据上训练一个 classifier,我需要将此表示转换为一行密集向量类型,包含所有特征值和一列双精度类型,包含标签值:
root
|-- indexedFeatures: vector (nullable = false)
|-- indexedLabel: double (nullable = false)
如何使用 java 1.8 和 Spark 2.2.0 执行此操作?
编辑:我走得更远,但在尝试使其使用灵活数量的特征维度时,我再次陷入困境。我创建了一个
你有不同的方法来实现这一点。
根据您的 CSV 文件创建架构。
public class CSVData implements Serializable {
String col1;
String col2;
long col3;
String col4;
//getters and setters
}
然后将文件转换为RDD。
JavaSparkContext sc;
JavaRDD<String> data = sc.textFile("path-to-csv-file");
JavaSQLContext sqlContext = new JavaSQLContext(sc);
JavaRDD<Record> csv_rdd = sc.textFile(data).map(
new Function<String, Record>() {
public Record call(String line) throws Exception {
String[] fields = line.split(",");
Record sd = new Record(fields[0], fields[1], fields[2].trim(), fields[3]);
return sd;
}
});
或
创建 Spark 会话以将文件作为数据集读取。
SparkSession spark = SparkSession
.builder()
.appName("SparkSample")
.master("local[*]")
.getOrCreate();
//Read file
Dataset<Row> ds = spark.read().text("path-to-csv-file");
or
Dataset<Row> ds = spark.read().csv("path-to-csv-file");
ds.show();
一个VectorAssembler (javadocs)可以将数据集转换成需要的格式。
首先,输入分为三列:
Dataset<FeaturesAndLabelData> featuresAndLabelData = inputDf.select("value").as(Encoders.STRING())
.flatMap(s -> {
String[] splitted = s.split(",");
if (splitted.length == 3) {
return Collections.singleton(new FeaturesAndLabelData(
Double.parseDouble(splitted[0]),
Double.parseDouble(splitted[1]),
Integer.parseInt(splitted[2]))).iterator();
} else {
// apply some error handling...
return Collections.emptyIterator();
}
}, Encoders.bean(FeaturesAndLabelData.class));
然后结果由 VectorAssembler 转换:
VectorAssembler assembler = new VectorAssembler()
.setInputCols(new String[] { "feature1", "feature2" })
.setOutputCol("indexedFeatures");
Dataset<Row> result = assembler.transform(featuresAndLabelData)
.withColumn("indexedLabel", functions.col("label").cast("double"))
.select("indexedFeatures", "indexedLabel");
结果数据框具有所需的格式:
+----------------------------------------+------------+
|indexedFeatures |indexedLabel|
+----------------------------------------+------------+
|[0.6720004294237854,-0.4033586564886893]|0.0 |
|[0.6659082469383558,0.07688976580256132]|0.0 |
|[0.8086502311695247,0.564354801275521] |1.0 |
+----------------------------------------+------------+
root
|-- indexedFeatures: vector (nullable = true)
|-- indexedLabel: double (nullable = true)
FeaturesAndLabelData 是一个简单的 Java bean,用于确保列名正确:
public class FeaturesAndLabelData {
private double feature1;
private double feature2;
private int label;
//getters and setters...
}
我有一个 Dataset<Row> df
,它包含 string
类型的两列("key" 和 "value")。 df.printSchema();给我以下输出:
root
|-- key: string (nullable = true)
|-- value: string (nullable = true)
value 列的内容实际上是一个 csv 格式的行(来自 kafka 主题),该行的最后一个条目代表 class 标签,之前的所有条目都是特征(第一个行未包含在数据集中):
feature0,feature1,label
0.6720004294237854,-0.4033586564886893,0
0.6659082469383558,0.07688976580256132,0
0.8086502311695247,0.564354801275521,1
由于我想在此数据上训练一个 classifier,我需要将此表示转换为一行密集向量类型,包含所有特征值和一列双精度类型,包含标签值:
root
|-- indexedFeatures: vector (nullable = false)
|-- indexedLabel: double (nullable = false)
如何使用 java 1.8 和 Spark 2.2.0 执行此操作?
编辑:我走得更远,但在尝试使其使用灵活数量的特征维度时,我再次陷入困境。我创建了一个
你有不同的方法来实现这一点。
根据您的 CSV 文件创建架构。
public class CSVData implements Serializable {
String col1;
String col2;
long col3;
String col4;
//getters and setters
}
然后将文件转换为RDD。
JavaSparkContext sc;
JavaRDD<String> data = sc.textFile("path-to-csv-file");
JavaSQLContext sqlContext = new JavaSQLContext(sc);
JavaRDD<Record> csv_rdd = sc.textFile(data).map(
new Function<String, Record>() {
public Record call(String line) throws Exception {
String[] fields = line.split(",");
Record sd = new Record(fields[0], fields[1], fields[2].trim(), fields[3]);
return sd;
}
});
或
创建 Spark 会话以将文件作为数据集读取。
SparkSession spark = SparkSession
.builder()
.appName("SparkSample")
.master("local[*]")
.getOrCreate();
//Read file
Dataset<Row> ds = spark.read().text("path-to-csv-file");
or
Dataset<Row> ds = spark.read().csv("path-to-csv-file");
ds.show();
一个VectorAssembler (javadocs)可以将数据集转换成需要的格式。
首先,输入分为三列:
Dataset<FeaturesAndLabelData> featuresAndLabelData = inputDf.select("value").as(Encoders.STRING())
.flatMap(s -> {
String[] splitted = s.split(",");
if (splitted.length == 3) {
return Collections.singleton(new FeaturesAndLabelData(
Double.parseDouble(splitted[0]),
Double.parseDouble(splitted[1]),
Integer.parseInt(splitted[2]))).iterator();
} else {
// apply some error handling...
return Collections.emptyIterator();
}
}, Encoders.bean(FeaturesAndLabelData.class));
然后结果由 VectorAssembler 转换:
VectorAssembler assembler = new VectorAssembler()
.setInputCols(new String[] { "feature1", "feature2" })
.setOutputCol("indexedFeatures");
Dataset<Row> result = assembler.transform(featuresAndLabelData)
.withColumn("indexedLabel", functions.col("label").cast("double"))
.select("indexedFeatures", "indexedLabel");
结果数据框具有所需的格式:
+----------------------------------------+------------+
|indexedFeatures |indexedLabel|
+----------------------------------------+------------+
|[0.6720004294237854,-0.4033586564886893]|0.0 |
|[0.6659082469383558,0.07688976580256132]|0.0 |
|[0.8086502311695247,0.564354801275521] |1.0 |
+----------------------------------------+------------+
root
|-- indexedFeatures: vector (nullable = true)
|-- indexedLabel: double (nullable = true)
FeaturesAndLabelData 是一个简单的 Java bean,用于确保列名正确:
public class FeaturesAndLabelData {
private double feature1;
private double feature2;
private int label;
//getters and setters...
}