我应该如何将 org.apache.spark.ml.linalg.Vector 的 RDD 转换为数据集?

How should I convert an RDD of org.apache.spark.ml.linalg.Vector to Dataset?

我很难理解 RDD、DataSet 和 DataFrames 之间的转换是如何工作的。 我是 Spark 的新手,每次我需要从一个数据模型传递到另一个数据模型(尤其是从 RDD 到数据集和数据帧)时都会卡住。 谁能告诉我正确的做法?

例如,现在我有一个 RDD[org.apache.spark.ml.linalg.Vector],我需要将它传递给我的机器学习算法,例如 KMeans (Spark DataSet MLlib)。因此,我需要将其转换为具有一个名为 "features" 的列的数据集,该列应包含 Vector 类型的行。我应该怎么做?

要将 RDD 转换为 dataframe,最简单的方法是在 Scala 中使用 toDF()。要使用此函数,必须导入使用 SparkSession 对象完成的隐含函数。可以这样操作:

val spark = SparkSession.builder().getOrCreate()
import spark.implicits._

val df = rdd.toDF("features")

toDF() 采用元组的 RDD。当 RDD 由普通 Scala 对象构建时,它们将被隐式转换,即不需要做任何事情,当 RDD 有多个列时也不需要做任何事情,RDD 已经包含一个元组。然而,在这种特殊情况中,您需要先将RDD[org.apache.spark.ml.linalg.Vector]转换为RDD[(org.apache.spark.ml.linalg.Vector)]。因此,需要对元组进行如下转换:

val df = rdd.map(Tuple1(_)).toDF("features")

上面的代码会将 RDD 转换为具有称为特征的单列的数据框。


要转换为 数据集,最简单的方法是使用案例 class。确保 case class 定义在 Main 对象之外。首先将 RDD 转换为数据帧,然后执行以下操作:

case class A(features: org.apache.spark.ml.linalg.Vector)

val ds = df.as[A]

要显示所有可能的转换,可以使用 .rdd:

从数据框或数据集中访问基础 RDD
val rdd = df.rdd

与其在 RDD 和 dataframes/datasets 之间来回转换,通常更容易使用数据帧 API 进行所有计算。如果没有合适的函数来做你想做的事,通常可以定义一个 UDF,即用户定义的函数。参见此处的示例:https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-udfs.html

您只需要 Encoder。进口

import org.apache.spark.sql.Encoder
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.ml.linalg

RDD:

val rdd = sc.parallelize(Seq(
  linalg.Vectors.dense(1.0, 2.0), linalg.Vectors.sparse(2, Array(), Array())
))

转化率:

val ds = spark.createDataset(rdd)(ExpressionEncoder(): Encoder[linalg.Vector])
 .toDF("features")

ds.show
// +---------+
// | features|
// +---------+
// |[1.0,2.0]|
// |(2,[],[])|
// +---------+


ds.printSchema
// root
//  |-- features: vector (nullable = true)