如何将 RDD[Row] 转换回 DataFrame
How to convert an RDD[Row] back to DataFrame
我一直在尝试将 RDD 转换为 DataFrame,然后再转换回来。首先,我有一个名为 dataPair 的 (Int, Int) 类型的 RDD。然后我创建了一个 DataFrame object 列 headers 使用:
val dataFrame = dataPair.toDF(header(0), header(1))
然后我使用以下方法将它从 DataFrame 转换回 RDD:
val testRDD = dataFrame.rdd
其中 returns 类型为 org.apache.spark.sql.Row 的 RDD(不是 (Int, Int))。然后我想使用 .toDF 将其转换回 RDD,但出现错误:
error: value toDF is not a member of org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]
我已经尝试为 testRDD 定义类型为 Data(Int, Int) 的模式,但我得到了类型不匹配异常:
error: type mismatch;
found : org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]
required: org.apache.spark.rdd.RDD[Data]
val testRDD: RDD[Data] = dataFrame.rdd
^
我已经导入了
import sqlContext.implicits._
要从行的 RDD 创建 DataFrame,通常有两个主要选项:
1) 您可以使用 toDF()
可以由 import sqlContext.implicits._
导入。但是,此方法仅适用于以下类型的 RDD:
RDD[Int]
RDD[Long]
RDD[String]
RDD[T <: scala.Product]
(来源:SQLContext.implicits
对象的 Scaladoc)
最后一个签名实际上意味着它可以用于元组的RDD或case classes的RDD(因为元组和case classes是subclasses scala.Product).
因此,要将此方法用于 RDD[Row]
,您必须将其映射到 RDD[T <: scala.Product]
。这可以通过将每一行映射到自定义案例 class 或元组来完成,如以下代码片段所示:
val df = rdd.map({
case Row(val1: String, ..., valN: Long) => (val1, ..., valN)
}).toDF("col1_name", ..., "colN_name")
或
case class MyClass(val1: String, ..., valN: Long = 0L)
val df = rdd.map({
case Row(val1: String, ..., valN: Long) => MyClass(val1, ..., valN)
}).toDF("col1_name", ..., "colN_name")
这种方法的主要缺点(在我看来)是您必须在映射函数中逐列显式设置生成的 DataFrame 的架构。如果您事先不知道架构,也许这可以通过编程方式完成,但事情可能会变得有点混乱。因此,或者,还有另一种选择:
2) 您可以使用 createDataFrame(rowRDD: RDD[Row], schema: StructType)
,它在 SQLContext 对象中可用。示例:
val df = oldDF.sqlContext.createDataFrame(rdd, oldDF.schema)
请注意,无需显式设置任何架构列。我们重用了旧的DF的schema,它是StructType
class并且可以很容易地扩展。但是,这种方法有时是行不通的,并且在某些情况下可能不如第一种方法有效。
我希望它比以前更清楚。干杯。
我一直在尝试将 RDD 转换为 DataFrame,然后再转换回来。首先,我有一个名为 dataPair 的 (Int, Int) 类型的 RDD。然后我创建了一个 DataFrame object 列 headers 使用:
val dataFrame = dataPair.toDF(header(0), header(1))
然后我使用以下方法将它从 DataFrame 转换回 RDD:
val testRDD = dataFrame.rdd
其中 returns 类型为 org.apache.spark.sql.Row 的 RDD(不是 (Int, Int))。然后我想使用 .toDF 将其转换回 RDD,但出现错误:
error: value toDF is not a member of org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]
我已经尝试为 testRDD 定义类型为 Data(Int, Int) 的模式,但我得到了类型不匹配异常:
error: type mismatch;
found : org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]
required: org.apache.spark.rdd.RDD[Data]
val testRDD: RDD[Data] = dataFrame.rdd
^
我已经导入了
import sqlContext.implicits._
要从行的 RDD 创建 DataFrame,通常有两个主要选项:
1) 您可以使用 toDF()
可以由 import sqlContext.implicits._
导入。但是,此方法仅适用于以下类型的 RDD:
RDD[Int]
RDD[Long]
RDD[String]
RDD[T <: scala.Product]
(来源:SQLContext.implicits
对象的 Scaladoc)
最后一个签名实际上意味着它可以用于元组的RDD或case classes的RDD(因为元组和case classes是subclasses scala.Product).
因此,要将此方法用于 RDD[Row]
,您必须将其映射到 RDD[T <: scala.Product]
。这可以通过将每一行映射到自定义案例 class 或元组来完成,如以下代码片段所示:
val df = rdd.map({
case Row(val1: String, ..., valN: Long) => (val1, ..., valN)
}).toDF("col1_name", ..., "colN_name")
或
case class MyClass(val1: String, ..., valN: Long = 0L)
val df = rdd.map({
case Row(val1: String, ..., valN: Long) => MyClass(val1, ..., valN)
}).toDF("col1_name", ..., "colN_name")
这种方法的主要缺点(在我看来)是您必须在映射函数中逐列显式设置生成的 DataFrame 的架构。如果您事先不知道架构,也许这可以通过编程方式完成,但事情可能会变得有点混乱。因此,或者,还有另一种选择:
2) 您可以使用 createDataFrame(rowRDD: RDD[Row], schema: StructType)
,它在 SQLContext 对象中可用。示例:
val df = oldDF.sqlContext.createDataFrame(rdd, oldDF.schema)
请注意,无需显式设置任何架构列。我们重用了旧的DF的schema,它是StructType
class并且可以很容易地扩展。但是,这种方法有时是行不通的,并且在某些情况下可能不如第一种方法有效。
我希望它比以前更清楚。干杯。