我应该如何将 org.apache.spark.ml.linalg.Vector 的 RDD 转换为数据集?
How should I convert an RDD of org.apache.spark.ml.linalg.Vector to Dataset?
我很难理解 RDD、DataSet 和 DataFrames 之间的转换是如何工作的。
我是 Spark 的新手,每次我需要从一个数据模型传递到另一个数据模型(尤其是从 RDD 到数据集和数据帧)时都会卡住。
谁能告诉我正确的做法?
例如,现在我有一个 RDD[org.apache.spark.ml.linalg.Vector]
,我需要将它传递给我的机器学习算法,例如 KMeans (Spark DataSet MLlib)。因此,我需要将其转换为具有一个名为 "features" 的列的数据集,该列应包含 Vector 类型的行。我应该怎么做?
要将 RDD 转换为 dataframe,最简单的方法是在 Scala 中使用 toDF()
。要使用此函数,必须导入使用 SparkSession
对象完成的隐含函数。可以这样操作:
val spark = SparkSession.builder().getOrCreate()
import spark.implicits._
val df = rdd.toDF("features")
toDF()
采用元组的 RDD。当 RDD 由普通 Scala 对象构建时,它们将被隐式转换,即不需要做任何事情,当 RDD 有多个列时也不需要做任何事情,RDD 已经包含一个元组。然而,在这种特殊情况中,您需要先将RDD[org.apache.spark.ml.linalg.Vector]
转换为RDD[(org.apache.spark.ml.linalg.Vector)]
。因此,需要对元组进行如下转换:
val df = rdd.map(Tuple1(_)).toDF("features")
上面的代码会将 RDD 转换为具有称为特征的单列的数据框。
要转换为 数据集,最简单的方法是使用案例 class。确保 case class 定义在 Main 对象之外。首先将 RDD 转换为数据帧,然后执行以下操作:
case class A(features: org.apache.spark.ml.linalg.Vector)
val ds = df.as[A]
要显示所有可能的转换,可以使用 .rdd
:
从数据框或数据集中访问基础 RDD
val rdd = df.rdd
与其在 RDD 和 dataframes/datasets 之间来回转换,通常更容易使用数据帧 API 进行所有计算。如果没有合适的函数来做你想做的事,通常可以定义一个 UDF,即用户定义的函数。参见此处的示例:https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-udfs.html
您只需要 Encoder
。进口
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.ml.linalg
RDD
:
val rdd = sc.parallelize(Seq(
linalg.Vectors.dense(1.0, 2.0), linalg.Vectors.sparse(2, Array(), Array())
))
转化率:
val ds = spark.createDataset(rdd)(ExpressionEncoder(): Encoder[linalg.Vector])
.toDF("features")
ds.show
// +---------+
// | features|
// +---------+
// |[1.0,2.0]|
// |(2,[],[])|
// +---------+
ds.printSchema
// root
// |-- features: vector (nullable = true)
我很难理解 RDD、DataSet 和 DataFrames 之间的转换是如何工作的。 我是 Spark 的新手,每次我需要从一个数据模型传递到另一个数据模型(尤其是从 RDD 到数据集和数据帧)时都会卡住。 谁能告诉我正确的做法?
例如,现在我有一个 RDD[org.apache.spark.ml.linalg.Vector]
,我需要将它传递给我的机器学习算法,例如 KMeans (Spark DataSet MLlib)。因此,我需要将其转换为具有一个名为 "features" 的列的数据集,该列应包含 Vector 类型的行。我应该怎么做?
要将 RDD 转换为 dataframe,最简单的方法是在 Scala 中使用 toDF()
。要使用此函数,必须导入使用 SparkSession
对象完成的隐含函数。可以这样操作:
val spark = SparkSession.builder().getOrCreate()
import spark.implicits._
val df = rdd.toDF("features")
toDF()
采用元组的 RDD。当 RDD 由普通 Scala 对象构建时,它们将被隐式转换,即不需要做任何事情,当 RDD 有多个列时也不需要做任何事情,RDD 已经包含一个元组。然而,在这种特殊情况中,您需要先将RDD[org.apache.spark.ml.linalg.Vector]
转换为RDD[(org.apache.spark.ml.linalg.Vector)]
。因此,需要对元组进行如下转换:
val df = rdd.map(Tuple1(_)).toDF("features")
上面的代码会将 RDD 转换为具有称为特征的单列的数据框。
要转换为 数据集,最简单的方法是使用案例 class。确保 case class 定义在 Main 对象之外。首先将 RDD 转换为数据帧,然后执行以下操作:
case class A(features: org.apache.spark.ml.linalg.Vector)
val ds = df.as[A]
要显示所有可能的转换,可以使用 .rdd
:
val rdd = df.rdd
与其在 RDD 和 dataframes/datasets 之间来回转换,通常更容易使用数据帧 API 进行所有计算。如果没有合适的函数来做你想做的事,通常可以定义一个 UDF,即用户定义的函数。参见此处的示例:https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-udfs.html
您只需要 Encoder
。进口
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.ml.linalg
RDD
:
val rdd = sc.parallelize(Seq(
linalg.Vectors.dense(1.0, 2.0), linalg.Vectors.sparse(2, Array(), Array())
))
转化率:
val ds = spark.createDataset(rdd)(ExpressionEncoder(): Encoder[linalg.Vector])
.toDF("features")
ds.show
// +---------+
// | features|
// +---------+
// |[1.0,2.0]|
// |(2,[],[])|
// +---------+
ds.printSchema
// root
// |-- features: vector (nullable = true)