使用嵌套的用户数据类型保存 Spark DataFrame
Saving Spark DataFrames with nested User Data Types
我想保存(作为 parquet 文件)包含自定义 class 作为列的 Spark DataFrame。这个class是由另一个习惯class的一个Seq组成的。为此,我以与 VectorUDT 类似的方式为每个 class 创建了一个 UserDefinedType class。我可以按预期使用数据框,但无法将其作为镶木地板(或杰森)保存到磁盘
我将其报告为错误,但可能是我的代码有问题。我已经实现了一个更简单的示例来说明问题:
import org.apache.spark.sql.SaveMode
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
import org.apache.spark.sql.types._
@SQLUserDefinedType(udt = classOf[AUDT])
case class A(list:Seq[B])
class AUDT extends UserDefinedType[A] {
override def sqlType: DataType = StructType(Seq(StructField("list", ArrayType(BUDT, containsNull = false), nullable = true)))
override def userClass: Class[A] = classOf[A]
override def serialize(obj: Any): Any = obj match {
case A(list) =>
val row = new GenericMutableRow(1)
row.update(0, new GenericArrayData(list.map(_.asInstanceOf[Any]).toArray))
row
}
override def deserialize(datum: Any): A = {
datum match {
case row: InternalRow => new A(row.getArray(0).toArray(BUDT).toSeq)
}
}
}
object AUDT extends AUDT
@SQLUserDefinedType(udt = classOf[BUDT])
case class B(num:Int)
class BUDT extends UserDefinedType[B] {
override def sqlType: DataType = StructType(Seq(StructField("num", IntegerType, nullable = false)))
override def userClass: Class[B] = classOf[B]
override def serialize(obj: Any): Any = obj match {
case B(num) =>
val row = new GenericMutableRow(1)
row.setInt(0, num)
row
}
override def deserialize(datum: Any): B = {
datum match {
case row: InternalRow => new B(row.getInt(0))
}
}
}
object BUDT extends BUDT
object TestNested {
def main(args:Array[String]) = {
val col = Seq(new A(Seq(new B(1), new B(2))),
new A(Seq(new B(3), new B(4))))
val sc = new SparkContext(new SparkConf().setMaster("local[1]").setAppName("TestSpark"))
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
val df = sc.parallelize(1 to 2 zip col).toDF()
df.show()
df.write.mode(SaveMode.Overwrite).save(...)
}
}
这会导致以下错误:
15/09/16 16:44:39 ERROR Executor: Exception in task 0.0 in stage 1.0
(TID 1) java.lang.IllegalArgumentException: Nested type should be
repeated: required group array { required int32 num; } at
org.apache.parquet.schema.ConversionPatterns.listWrapper(ConversionPatterns.java:42)
at
org.apache.parquet.schema.ConversionPatterns.listType(ConversionPatterns.java:97)
at
org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convertField(CatalystSchemaConverter.scala:460)
at
org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convertField(CatalystSchemaConverter.scala:318)
at
org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter$$anonfun$convertField.apply(CatalystSchemaConverter.scala:522)
at
org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter$$anonfun$convertField.apply(CatalystSchemaConverter.scala:521)
at
scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
at
scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60)
at
scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:108)
at
org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convertField(CatalystSchemaConverter.scala:521)
at
org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convertField(CatalystSchemaConverter.scala:318)
at
org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convertField(CatalystSchemaConverter.scala:526)
at
org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convertField(CatalystSchemaConverter.scala:318)
at
org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter$$anonfun$convert.apply(CatalystSchemaConverter.scala:311)
at
org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter$$anonfun$convert.apply(CatalystSchemaConverter.scala:311)
at
scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:244)
at scala.collection.Iterator$class.foreach(Iterator.scala:727) at
scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at
scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at
org.apache.spark.sql.types.StructType.foreach(StructType.scala:92) at
scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at org.apache.spark.sql.types.StructType.map(StructType.scala:92) at
org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convert(CatalystSchemaConverter.scala:311)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetTypesConverter$.convertFromAttributes(ParquetTypesConverter.scala:58)
at
org.apache.spark.sql.execution.datasources.parquet.RowWriteSupport.init(ParquetTableSupport.scala:55)
at
org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:288)
at
org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:262)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.(ParquetRelation.scala:94)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anon.newInstance(ParquetRelation.scala:272)
at
org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:234)
at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$$anonfun$apply$mcV$sp.apply(InsertIntoHadoopFsRelation.scala:150)
at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$$anonfun$apply$mcV$sp.apply(InsertIntoHadoopFsRelation.scala:150)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88) at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745) 15/09/16 16:44:39 WARN
TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, localhost):
如果使用 B 而不是 A 保存数据帧,则不存在任何问题,因为 B 没有嵌套自定义 class。我错过了什么吗?
我必须对您的代码进行四处更改才能使其正常工作(在 Linux 上的 Spark 1.6.0 中进行测试),我想我可以主要 解释原因他们是需要的。然而,我确实想知道是否有更简单的解决方案。全部变化都在AUDT
,如下:
- 定义
sqlType
时,使其依赖于BUDT.sqlType
,而不仅仅是BUDT
。
- 在
serialize()
中,对每个列表元素调用 BUDT.serialize()
。
- 在
deserialize()
中:
- 调用
toArray(BUDT.sqlType)
而不是 toArray(BUDT)
- 对每个元素调用
BUDT.deserialize()
这是生成的代码:
class AUDT extends UserDefinedType[A] {
override def sqlType: DataType =
StructType(
Seq(StructField("list",
ArrayType(BUDT.sqlType, containsNull = false),
nullable = true)))
override def userClass: Class[A] = classOf[A]
override def serialize(obj: Any): Any =
obj match {
case A(list) =>
val row = new GenericMutableRow(1)
val elements =
list.map(_.asInstanceOf[Any])
.map(e => BUDT.serialize(e))
.toArray
row.update(0, new GenericArrayData(elements))
row
}
override def deserialize(datum: Any): A = {
datum match {
case row: InternalRow =>
val first = row.getArray(0)
val bs:Array[InternalRow] = first.toArray(BUDT.sqlType)
val bseq = bs.toSeq.map(e => BUDT.deserialize(e))
val a = new A(bseq)
a
}
}
}
所有四个更改具有相同的特征:A
的处理和 B
的处理之间的关系现在非常明确:模式类型、序列化和反序列化。原始代码似乎是基于 Spark SQL 将 "just figure it out" 的假设,这可能是合理的,但显然不是。
我想保存(作为 parquet 文件)包含自定义 class 作为列的 Spark DataFrame。这个class是由另一个习惯class的一个Seq组成的。为此,我以与 VectorUDT 类似的方式为每个 class 创建了一个 UserDefinedType class。我可以按预期使用数据框,但无法将其作为镶木地板(或杰森)保存到磁盘 我将其报告为错误,但可能是我的代码有问题。我已经实现了一个更简单的示例来说明问题:
import org.apache.spark.sql.SaveMode
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
import org.apache.spark.sql.types._
@SQLUserDefinedType(udt = classOf[AUDT])
case class A(list:Seq[B])
class AUDT extends UserDefinedType[A] {
override def sqlType: DataType = StructType(Seq(StructField("list", ArrayType(BUDT, containsNull = false), nullable = true)))
override def userClass: Class[A] = classOf[A]
override def serialize(obj: Any): Any = obj match {
case A(list) =>
val row = new GenericMutableRow(1)
row.update(0, new GenericArrayData(list.map(_.asInstanceOf[Any]).toArray))
row
}
override def deserialize(datum: Any): A = {
datum match {
case row: InternalRow => new A(row.getArray(0).toArray(BUDT).toSeq)
}
}
}
object AUDT extends AUDT
@SQLUserDefinedType(udt = classOf[BUDT])
case class B(num:Int)
class BUDT extends UserDefinedType[B] {
override def sqlType: DataType = StructType(Seq(StructField("num", IntegerType, nullable = false)))
override def userClass: Class[B] = classOf[B]
override def serialize(obj: Any): Any = obj match {
case B(num) =>
val row = new GenericMutableRow(1)
row.setInt(0, num)
row
}
override def deserialize(datum: Any): B = {
datum match {
case row: InternalRow => new B(row.getInt(0))
}
}
}
object BUDT extends BUDT
object TestNested {
def main(args:Array[String]) = {
val col = Seq(new A(Seq(new B(1), new B(2))),
new A(Seq(new B(3), new B(4))))
val sc = new SparkContext(new SparkConf().setMaster("local[1]").setAppName("TestSpark"))
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
val df = sc.parallelize(1 to 2 zip col).toDF()
df.show()
df.write.mode(SaveMode.Overwrite).save(...)
}
}
这会导致以下错误:
15/09/16 16:44:39 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1) java.lang.IllegalArgumentException: Nested type should be repeated: required group array { required int32 num; } at org.apache.parquet.schema.ConversionPatterns.listWrapper(ConversionPatterns.java:42) at org.apache.parquet.schema.ConversionPatterns.listType(ConversionPatterns.java:97) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convertField(CatalystSchemaConverter.scala:460) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convertField(CatalystSchemaConverter.scala:318) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter$$anonfun$convertField.apply(CatalystSchemaConverter.scala:522) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter$$anonfun$convertField.apply(CatalystSchemaConverter.scala:521) at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51) at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60) at scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:108) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convertField(CatalystSchemaConverter.scala:521) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convertField(CatalystSchemaConverter.scala:318) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convertField(CatalystSchemaConverter.scala:526) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convertField(CatalystSchemaConverter.scala:318) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter$$anonfun$convert.apply(CatalystSchemaConverter.scala:311) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter$$anonfun$convert.apply(CatalystSchemaConverter.scala:311) at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:244) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at org.apache.spark.sql.types.StructType.foreach(StructType.scala:92) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at org.apache.spark.sql.types.StructType.map(StructType.scala:92) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convert(CatalystSchemaConverter.scala:311) at org.apache.spark.sql.execution.datasources.parquet.ParquetTypesConverter$.convertFromAttributes(ParquetTypesConverter.scala:58) at org.apache.spark.sql.execution.datasources.parquet.RowWriteSupport.init(ParquetTableSupport.scala:55) at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:288) at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:262) at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.(ParquetRelation.scala:94) at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anon.newInstance(ParquetRelation.scala:272) at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:234) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$$anonfun$apply$mcV$sp.apply(InsertIntoHadoopFsRelation.scala:150) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$$anonfun$apply$mcV$sp.apply(InsertIntoHadoopFsRelation.scala:150) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/09/16 16:44:39 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, localhost):
如果使用 B 而不是 A 保存数据帧,则不存在任何问题,因为 B 没有嵌套自定义 class。我错过了什么吗?
我必须对您的代码进行四处更改才能使其正常工作(在 Linux 上的 Spark 1.6.0 中进行测试),我想我可以主要 解释原因他们是需要的。然而,我确实想知道是否有更简单的解决方案。全部变化都在AUDT
,如下:
- 定义
sqlType
时,使其依赖于BUDT.sqlType
,而不仅仅是BUDT
。 - 在
serialize()
中,对每个列表元素调用BUDT.serialize()
。 - 在
deserialize()
中:- 调用
toArray(BUDT.sqlType)
而不是toArray(BUDT)
- 对每个元素调用
BUDT.deserialize()
- 调用
这是生成的代码:
class AUDT extends UserDefinedType[A] {
override def sqlType: DataType =
StructType(
Seq(StructField("list",
ArrayType(BUDT.sqlType, containsNull = false),
nullable = true)))
override def userClass: Class[A] = classOf[A]
override def serialize(obj: Any): Any =
obj match {
case A(list) =>
val row = new GenericMutableRow(1)
val elements =
list.map(_.asInstanceOf[Any])
.map(e => BUDT.serialize(e))
.toArray
row.update(0, new GenericArrayData(elements))
row
}
override def deserialize(datum: Any): A = {
datum match {
case row: InternalRow =>
val first = row.getArray(0)
val bs:Array[InternalRow] = first.toArray(BUDT.sqlType)
val bseq = bs.toSeq.map(e => BUDT.deserialize(e))
val a = new A(bseq)
a
}
}
}
所有四个更改具有相同的特征:A
的处理和 B
的处理之间的关系现在非常明确:模式类型、序列化和反序列化。原始代码似乎是基于 Spark SQL 将 "just figure it out" 的假设,这可能是合理的,但显然不是。