将 Spark 数据帧转换为 org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector]
Convert Spark Data Frame to org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector]
我是 scala 和 spark 2.1 的新手。
我正在尝试计算数据框中许多元素之间的相关性,如下所示:
item_1 | item_2 | item_3 | item_4
1 | 1 | 4 | 3
2 | 0 | 2 | 0
0 | 2 | 0 | 1
这是我试过的:
val df = sqlContext.createDataFrame(
Seq((1, 1, 4, 3),
(2, 0, 2, 0),
(0, 2, 0, 1)
).toDF("item_1", "item_2", "item_3", "item_4")
val items = df.select(array(df.columns.map(col(_)): _*)).rdd.map(_.getSeq[Double](0))
并计算元素之间的相关性:
val correlMatrix: Matrix = Statistics.corr(items, "pearson")
出现以下错误消息:
<console>:89: error: type mismatch;
found : org.apache.spark.rdd.RDD[Seq[Double]]
required: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector]
val correlMatrix: Matrix = Statistics.corr(items, "pearson")
我不知道如何从数据框创建 org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector]
。
这可能是一个非常简单的任务,但我有点挣扎,我很高兴得到任何建议。
例如,您可以使用 VectorAssembler
。 Assemble 向量并转换为 RDD
import org.apache.spark.ml.feature.VectorAssembler
val rows = new VectorAssembler().setInputCols(df.columns).setOutputCol("vs")
.transform(df)
.select("vs")
.rdd
从Row
中提取Vectors
:
火花1.x:
rows.map(_.getAs[org.apache.spark.mllib.linalg.Vector](0))
火花2.x:
rows
.map(_.getAs[org.apache.spark.ml.linalg.Vector](0))
.map(org.apache.spark.mllib.linalg.Vectors.fromML)
关于您的代码:
- 您有
Integer
列而不是 Double
。
- 数据不是
array
,因此您不能使用 _.getSeq[Double](0)
。
如果您的目标是执行皮尔逊相关,您实际上不必使用 RDD 和向量。这是直接在 DataFrame 列上执行皮尔逊相关的示例(所讨论的列是 Doubles 类型)。
代码:
import org.apache.spark.sql.{SQLContext, Row, DataFrame}
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType, DoubleType}
import org.apache.spark.sql.functions._
val rb = spark.read.option("delimiter","|").option("header","false").option("inferSchema","true").format("csv").load("rb.csv").toDF("name","beerId","brewerId","abv","style","appearance","aroma","palate","taste","overall","time","reviewer").cache()
rb.agg(
corr("overall","taste"),
corr("overall","aroma"),
corr("overall","palate"),
corr("overall","appearance"),
corr("overall","abv")
).show()
在这个例子中,我导入了一个数据框(带有自定义分隔符,没有 header 和推断的数据类型),然后简单地对其中具有多个相关性的数据框执行聚合函数.
输出:
+--------------------+--------------------+---------------------+-------------------------+------------------+
|corr(overall, taste)|corr(overall, aroma)|corr(overall, palate)|corr(overall, appearance)|corr(overall, abv)|
+--------------------+--------------------+---------------------+-------------------------+------------------+
| 0.8762432795943761| 0.789023067942876| 0.7008942639550395| 0.5663593891357243|0.3539158620897098|
+--------------------+--------------------+---------------------+-------------------------+------------------+
从结果可以看出,(总体,口味)列高度相关,而(总体,abv)则没有那么多。
这是 Scala Docs DataFrame page which has the Aggregation Correlation Function 的 link。
我是 scala 和 spark 2.1 的新手。 我正在尝试计算数据框中许多元素之间的相关性,如下所示:
item_1 | item_2 | item_3 | item_4
1 | 1 | 4 | 3
2 | 0 | 2 | 0
0 | 2 | 0 | 1
这是我试过的:
val df = sqlContext.createDataFrame(
Seq((1, 1, 4, 3),
(2, 0, 2, 0),
(0, 2, 0, 1)
).toDF("item_1", "item_2", "item_3", "item_4")
val items = df.select(array(df.columns.map(col(_)): _*)).rdd.map(_.getSeq[Double](0))
并计算元素之间的相关性:
val correlMatrix: Matrix = Statistics.corr(items, "pearson")
出现以下错误消息:
<console>:89: error: type mismatch;
found : org.apache.spark.rdd.RDD[Seq[Double]]
required: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector]
val correlMatrix: Matrix = Statistics.corr(items, "pearson")
我不知道如何从数据框创建 org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector]
。
这可能是一个非常简单的任务,但我有点挣扎,我很高兴得到任何建议。
例如,您可以使用 VectorAssembler
。 Assemble 向量并转换为 RDD
import org.apache.spark.ml.feature.VectorAssembler
val rows = new VectorAssembler().setInputCols(df.columns).setOutputCol("vs")
.transform(df)
.select("vs")
.rdd
从Row
中提取Vectors
:
火花1.x:
rows.map(_.getAs[org.apache.spark.mllib.linalg.Vector](0))
火花2.x:
rows .map(_.getAs[org.apache.spark.ml.linalg.Vector](0)) .map(org.apache.spark.mllib.linalg.Vectors.fromML)
关于您的代码:
- 您有
Integer
列而不是Double
。 - 数据不是
array
,因此您不能使用_.getSeq[Double](0)
。
如果您的目标是执行皮尔逊相关,您实际上不必使用 RDD 和向量。这是直接在 DataFrame 列上执行皮尔逊相关的示例(所讨论的列是 Doubles 类型)。
代码:
import org.apache.spark.sql.{SQLContext, Row, DataFrame}
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType, DoubleType}
import org.apache.spark.sql.functions._
val rb = spark.read.option("delimiter","|").option("header","false").option("inferSchema","true").format("csv").load("rb.csv").toDF("name","beerId","brewerId","abv","style","appearance","aroma","palate","taste","overall","time","reviewer").cache()
rb.agg(
corr("overall","taste"),
corr("overall","aroma"),
corr("overall","palate"),
corr("overall","appearance"),
corr("overall","abv")
).show()
在这个例子中,我导入了一个数据框(带有自定义分隔符,没有 header 和推断的数据类型),然后简单地对其中具有多个相关性的数据框执行聚合函数.
输出:
+--------------------+--------------------+---------------------+-------------------------+------------------+
|corr(overall, taste)|corr(overall, aroma)|corr(overall, palate)|corr(overall, appearance)|corr(overall, abv)|
+--------------------+--------------------+---------------------+-------------------------+------------------+
| 0.8762432795943761| 0.789023067942876| 0.7008942639550395| 0.5663593891357243|0.3539158620897098|
+--------------------+--------------------+---------------------+-------------------------+------------------+
从结果可以看出,(总体,口味)列高度相关,而(总体,abv)则没有那么多。
这是 Scala Docs DataFrame page which has the Aggregation Correlation Function 的 link。