Spark DataFrame 列名未传递给从节点?
Spark DataFrame column names not passed to slave nodes?
我正在通过 map 方法将一个函数应用到 DataFrame 的行(称为 df
),但我在调用结果 RDD 上的 collect 时看到了 NullPointerException if df.columns
作为参数传递给 f()。
可以粘贴到 spark-shell 中的以下 Scala 代码显示了该问题的最小示例(请参阅函数 prepRDD_buggy()
)。我还在函数 prepRDD()
中发布了针对此问题的当前解决方法,其中唯一的区别是列名作为 val
而不是 df.columns
.
传递
能否请一些 Spark 专家指出发生这种情况的确切原因或证实我们的假设,即从节点不获取 DataFrame 列名?
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.types._
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors
// A Simple DataFrame
val dataRDD: RDD[Row] = sc.parallelize(Array(
Row(1.0,2.1,3.3),
Row(3.4,5.9,8.9),
Row(3.1,2.3,4.1)))
val struct: StructType = StructType(
StructField("y", DoubleType, false) ::
StructField("x1", DoubleType, false) ::
StructField("x2", DoubleType, false) :: Nil)
val df: DataFrame = sqlContext.createDataFrame(dataRDD, struct)
// Make LabeledPoint object from Row objects
def makeLP(row: Row, colnames: Array[String]) =
LabeledPoint(row.getDouble(0),
Vectors.dense((1 until row.length).toArray map (i => row.getDouble(i))))
// Make RDD[LabeledPoint] from DataFrame
def prepRDD_buggy(df: DataFrame): RDD[LabeledPoint] = {
df map (row => makeLP(row, df.columns))
}
val mat_buggy = prepRDD_buggy(df)
mat_buggy.collect // throws NullPointerException !
// Make RDD[LabeledPoint] from DataFrame
def prepRDD(df: DataFrame): RDD[LabeledPoint] = {
val cnames = df.columns
df map (row => makeLP(row, cnames))
}
val mat = prepRDD(df)
mat.collect // Works fine
这是我在 spark-shell.
中的 运行 mat_buggy.collect
上看到的(非常冗长的)错误消息的前几行
15/12/24 18:09:28 INFO SparkContext: Starting job: collect at <console>:42
15/12/24 18:09:28 INFO DAGScheduler: Got job 0 (collect at <console>:42) with 2 output partitions
15/12/24 18:09:28 INFO DAGScheduler: Final stage: ResultStage 0(collect at <console>:42)
15/12/24 18:09:28 INFO DAGScheduler: Parents of final stage: List()
15/12/24 18:09:28 INFO DAGScheduler: Missing parents: List()
15/12/24 18:09:28 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[3] at map at <console>:38), which has no missing parents
15/12/24 18:09:28 INFO MemoryStore: ensureFreeSpace(11600) called with curMem=0, maxMem=560993402
15/12/24 18:09:28 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 11.3 KB, free 535.0 MB)
15/12/24 18:09:28 INFO MemoryStore: ensureFreeSpace(4540) called with curMem=11600, maxMem=560993402
15/12/24 18:09:28 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 4.4 KB, free 535.0 MB)
15/12/24 18:09:28 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 10.10.10.98:53386 (size: 4.4 KB, free: 535.0 MB)
15/12/24 18:09:28 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:861
15/12/24 18:09:28 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 0 (MapPartitionsRDD[3] at map at <console>:38)
15/12/24 18:09:28 INFO YarnScheduler: Adding task set 0.0 with 2 tasks
15/12/24 18:09:28 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, ip-10-10-10-217.ec2.internal, PROCESS_LOCAL, 2385 bytes)
15/12/24 18:09:28 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, ip-10-10-10-213.ec2.internal, PROCESS_LOCAL, 2385 bytes)
15/12/24 18:09:28 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on ip-10-10-10-213.ec2.internal:56642 (size: 4.4 KB, free: 535.0 MB)
15/12/24 18:09:28 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on ip-10-10-10-217.ec2.internal:56396 (size: 4.4 KB, free: 535.0 MB)
15/12/24 18:09:29 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, ip-10-10-10-217.ec2.internal): java.lang.NullPointerException
at org.apache.spark.sql.DataFrame.schema(DataFrame.scala:290)
at org.apache.spark.sql.DataFrame.columns(DataFrame.scala:306)
at $line34.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$prepRDD_buggy.apply(<console>:38)
at $line34.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$prepRDD_buggy.apply(<console>:38)
at scala.collection.Iterator$$anon.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
你的假设是正确的。 columns
需要访问 schema
并且模式依赖于 queryExecution
这是暂时的因此不会被运送给工人。所以你在 prepRDD
中所做的或多或少是正确的,尽管可以直接从行中提取相同的信息:
scala> df.rdd.map(_.schema.fieldNames).first
res14: Array[String] = Array(y, x1, x2, x3)
旁注 VectorAssembler
加上简单的 map
会是更好的选择。
我正在通过 map 方法将一个函数应用到 DataFrame 的行(称为 df
),但我在调用结果 RDD 上的 collect 时看到了 NullPointerException if df.columns
作为参数传递给 f()。
可以粘贴到 spark-shell 中的以下 Scala 代码显示了该问题的最小示例(请参阅函数 prepRDD_buggy()
)。我还在函数 prepRDD()
中发布了针对此问题的当前解决方法,其中唯一的区别是列名作为 val
而不是 df.columns
.
能否请一些 Spark 专家指出发生这种情况的确切原因或证实我们的假设,即从节点不获取 DataFrame 列名?
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.types._
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors
// A Simple DataFrame
val dataRDD: RDD[Row] = sc.parallelize(Array(
Row(1.0,2.1,3.3),
Row(3.4,5.9,8.9),
Row(3.1,2.3,4.1)))
val struct: StructType = StructType(
StructField("y", DoubleType, false) ::
StructField("x1", DoubleType, false) ::
StructField("x2", DoubleType, false) :: Nil)
val df: DataFrame = sqlContext.createDataFrame(dataRDD, struct)
// Make LabeledPoint object from Row objects
def makeLP(row: Row, colnames: Array[String]) =
LabeledPoint(row.getDouble(0),
Vectors.dense((1 until row.length).toArray map (i => row.getDouble(i))))
// Make RDD[LabeledPoint] from DataFrame
def prepRDD_buggy(df: DataFrame): RDD[LabeledPoint] = {
df map (row => makeLP(row, df.columns))
}
val mat_buggy = prepRDD_buggy(df)
mat_buggy.collect // throws NullPointerException !
// Make RDD[LabeledPoint] from DataFrame
def prepRDD(df: DataFrame): RDD[LabeledPoint] = {
val cnames = df.columns
df map (row => makeLP(row, cnames))
}
val mat = prepRDD(df)
mat.collect // Works fine
这是我在 spark-shell.
中的 运行mat_buggy.collect
上看到的(非常冗长的)错误消息的前几行
15/12/24 18:09:28 INFO SparkContext: Starting job: collect at <console>:42
15/12/24 18:09:28 INFO DAGScheduler: Got job 0 (collect at <console>:42) with 2 output partitions
15/12/24 18:09:28 INFO DAGScheduler: Final stage: ResultStage 0(collect at <console>:42)
15/12/24 18:09:28 INFO DAGScheduler: Parents of final stage: List()
15/12/24 18:09:28 INFO DAGScheduler: Missing parents: List()
15/12/24 18:09:28 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[3] at map at <console>:38), which has no missing parents
15/12/24 18:09:28 INFO MemoryStore: ensureFreeSpace(11600) called with curMem=0, maxMem=560993402
15/12/24 18:09:28 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 11.3 KB, free 535.0 MB)
15/12/24 18:09:28 INFO MemoryStore: ensureFreeSpace(4540) called with curMem=11600, maxMem=560993402
15/12/24 18:09:28 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 4.4 KB, free 535.0 MB)
15/12/24 18:09:28 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 10.10.10.98:53386 (size: 4.4 KB, free: 535.0 MB)
15/12/24 18:09:28 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:861
15/12/24 18:09:28 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 0 (MapPartitionsRDD[3] at map at <console>:38)
15/12/24 18:09:28 INFO YarnScheduler: Adding task set 0.0 with 2 tasks
15/12/24 18:09:28 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, ip-10-10-10-217.ec2.internal, PROCESS_LOCAL, 2385 bytes)
15/12/24 18:09:28 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, ip-10-10-10-213.ec2.internal, PROCESS_LOCAL, 2385 bytes)
15/12/24 18:09:28 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on ip-10-10-10-213.ec2.internal:56642 (size: 4.4 KB, free: 535.0 MB)
15/12/24 18:09:28 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on ip-10-10-10-217.ec2.internal:56396 (size: 4.4 KB, free: 535.0 MB)
15/12/24 18:09:29 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, ip-10-10-10-217.ec2.internal): java.lang.NullPointerException
at org.apache.spark.sql.DataFrame.schema(DataFrame.scala:290)
at org.apache.spark.sql.DataFrame.columns(DataFrame.scala:306)
at $line34.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$prepRDD_buggy.apply(<console>:38)
at $line34.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$prepRDD_buggy.apply(<console>:38)
at scala.collection.Iterator$$anon.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
你的假设是正确的。 columns
需要访问 schema
并且模式依赖于 queryExecution
这是暂时的因此不会被运送给工人。所以你在 prepRDD
中所做的或多或少是正确的,尽管可以直接从行中提取相同的信息:
scala> df.rdd.map(_.schema.fieldNames).first
res14: Array[String] = Array(y, x1, x2, x3)
旁注 VectorAssembler
加上简单的 map
会是更好的选择。