将 org.apache.avro.generic.GenericRecord 转换为 org.apache.spark.sql.Row
Convert org.apache.avro.generic.GenericRecord to org.apache.spark.sql.Row
我有 org.apache.avro.generic.GenericRecord
的列表,avro schema
使用这个我们需要在 SQLContext
API 的帮助下创建 dataframe
,以创建 dataframe
它需要 org.apache.spark.sql.Row
和 avro schema
中的 RDD
。创建 DF 的先决条件是我们应该有 org.apache.spark.sql.Row 的 RDD,它可以使用下面的代码来实现,但有些它不工作并给出错误,示例代码。
1. Convert GenericRecord to Row
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.avro.Schema
import org.apache.spark.sql.types.StructType
def convertGenericRecordToRow(genericRecords: Seq[GenericRecord], avroSchema: Schema, schemaType: StructType): Seq[Row] =
{
val fields = avroSchema.getFields
var rows = new Seq[Row]
for (avroRecord <- genericRecords) {
var avroFieldsSeq = Seq[Any]();
for (i <- 0 to fields.size - 1) {
avroFieldsSeq = avroFieldsSeq :+avroRecord.get(fields.get(i).name)
}
val avroFieldArr = avroFieldsSeq.toArray
val genericRow = new GenericRowWithSchema(avroFieldArr, schemaType)
rows = rows :+ genericRow
}
return rows;
}
2. Convert `Avro schema` to `Structtype`
Use `com.databricks.spark.avro.SchemaConverters -> toSqlType` function , it will convert avro schema to StructType
3. Create `Dataframe` using `SQLContext`
val rowSeq= convertGenericRecordToRow(genericRecords, avroSchema, schemaType)
val rowRdd = sc.parallelize(rowSeq, 1)
val finalDF =sqlContext.createDataFrame(rowRDD,structType)
但是它在创建 DataFrame
时抛出错误。有人可以帮我看看上面的代码有什么问题吗?除此之外,如果有人对 dataframe
的转换和创建有不同的逻辑。
每当我调用 Dataframe 上的任何操作时,它都会执行 DAG 并尝试创建 DF 对象,但在此失败并出现以下异常
ERROR TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job
Error :Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, hdpoc-c01-r06-01, executor 1): java.io.InvalidClassException: org.apache.commons.lang3.time.FastDateFormat; local class incompatible: stream classdesc serialVersionUID = 2, local class serialVersionUID = 1
at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:617)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622)
在此之后,我尝试在 spark 提交的 jar 参数中提供正确版本的 jar,并将其他参数作为 --conf spark.driver.userClassPathFirst=true
但现在 MapR as
失败了
ERROR CLDBRpcCommonUtils: Exception during init
java.lang.UnsatisfiedLinkError: com.mapr.security.JNISecurity.SetClusterOption(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;)
at com.mapr.security.JNISecurity.SetClusterOption(Native Method)
at com.mapr.baseutils.cldbutils.CLDBRpcCommonUtils.init(CLDBRpcCommonUtils.java:163)
at com.mapr.baseutils.cldbutils.CLDBRpcCommonUtils.<init>(CLDBRpcCommonUtils.java:73)
at com.mapr.baseutils.cldbutils.CLDBRpcCommonUtils.<clinit>(CLDBRpcCommonUtils.java:63)
at org.apache.hadoop.conf.CoreDefaultProperties.<clinit>(CoreDefaultProperties.java:69)
at java.lang.Class.forName0(Native Method)
我们正在使用 MapR 分发,在 spark-submit 中 class 路径更改后,它失败并出现上述异常。
有人可以帮忙吗,或者我的基本需要是将 Avro GenericRecord 转换成 Spark Row,这样我就可以用它创建 Dataframe,请帮忙
谢谢。
希望这会有所帮助。在第一部分中,您可以找到如何从 GenericRecord 转换为 Row
从 RDD[GenericRecord] 创建数据帧时有几个步骤
- 首先需要将org.apache.avro.generic.GenericRecord转换成org.apache.spark.sql.Row
Use com.databricks.spark.avro.SchemaConverters.createConverterToSQL(
sourceAvroSchema: Schema,targetSqlType: DataType)
这是spark-avro3.2版本的私有方法。如果我们有相同或小于 3.2,那么将此方法复制到您自己的 util class 中并使用它,否则直接使用它。
- 从行 (rowSeq) 集合创建数据框。
val rdd = ssc.sparkContext.parallelize(rowSeq,numParition) val
dataframe = sparkSession.createDataFrame(rowRDD, schemaType)
这解决了我的问题。
也许这有助于晚些时候加入游戏的人。
由于 spark-avro
已被弃用并且现在已集成到 Spark 中,因此可以通过不同的方式来实现。
import org.apache.spark.sql.avro._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.catalyst.encoders.RowEncoder
...
val avroSchema = data.head.getSchema
val sparkTypes = SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
val converter = new AvroDeserializer(avroSchema, sparkTypes)
val enconder = RowEncoder.apply(sparkTypes).resolveAndBind()
val rows = data.map { record =>
enconder.fromRow(converter.deserialize(record).asInstanceOf[InternalRow])
}
val df = sparkSession.sqlContext.createDataFrame(sparkSession.sparkContext.parallelize(rows), sparkTypes)
我有 org.apache.avro.generic.GenericRecord
的列表,avro schema
使用这个我们需要在 SQLContext
API 的帮助下创建 dataframe
,以创建 dataframe
它需要 org.apache.spark.sql.Row
和 avro schema
中的 RDD
。创建 DF 的先决条件是我们应该有 org.apache.spark.sql.Row 的 RDD,它可以使用下面的代码来实现,但有些它不工作并给出错误,示例代码。
1. Convert GenericRecord to Row
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.avro.Schema
import org.apache.spark.sql.types.StructType
def convertGenericRecordToRow(genericRecords: Seq[GenericRecord], avroSchema: Schema, schemaType: StructType): Seq[Row] =
{
val fields = avroSchema.getFields
var rows = new Seq[Row]
for (avroRecord <- genericRecords) {
var avroFieldsSeq = Seq[Any]();
for (i <- 0 to fields.size - 1) {
avroFieldsSeq = avroFieldsSeq :+avroRecord.get(fields.get(i).name)
}
val avroFieldArr = avroFieldsSeq.toArray
val genericRow = new GenericRowWithSchema(avroFieldArr, schemaType)
rows = rows :+ genericRow
}
return rows;
}
2. Convert `Avro schema` to `Structtype`
Use `com.databricks.spark.avro.SchemaConverters -> toSqlType` function , it will convert avro schema to StructType
3. Create `Dataframe` using `SQLContext`
val rowSeq= convertGenericRecordToRow(genericRecords, avroSchema, schemaType)
val rowRdd = sc.parallelize(rowSeq, 1)
val finalDF =sqlContext.createDataFrame(rowRDD,structType)
但是它在创建 DataFrame
时抛出错误。有人可以帮我看看上面的代码有什么问题吗?除此之外,如果有人对 dataframe
的转换和创建有不同的逻辑。
每当我调用 Dataframe 上的任何操作时,它都会执行 DAG 并尝试创建 DF 对象,但在此失败并出现以下异常
ERROR TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job
Error :Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, hdpoc-c01-r06-01, executor 1): java.io.InvalidClassException: org.apache.commons.lang3.time.FastDateFormat; local class incompatible: stream classdesc serialVersionUID = 2, local class serialVersionUID = 1
at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:617)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622)
在此之后,我尝试在 spark 提交的 jar 参数中提供正确版本的 jar,并将其他参数作为 --conf spark.driver.userClassPathFirst=true 但现在 MapR as
失败了ERROR CLDBRpcCommonUtils: Exception during init
java.lang.UnsatisfiedLinkError: com.mapr.security.JNISecurity.SetClusterOption(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;)
at com.mapr.security.JNISecurity.SetClusterOption(Native Method)
at com.mapr.baseutils.cldbutils.CLDBRpcCommonUtils.init(CLDBRpcCommonUtils.java:163)
at com.mapr.baseutils.cldbutils.CLDBRpcCommonUtils.<init>(CLDBRpcCommonUtils.java:73)
at com.mapr.baseutils.cldbutils.CLDBRpcCommonUtils.<clinit>(CLDBRpcCommonUtils.java:63)
at org.apache.hadoop.conf.CoreDefaultProperties.<clinit>(CoreDefaultProperties.java:69)
at java.lang.Class.forName0(Native Method)
我们正在使用 MapR 分发,在 spark-submit 中 class 路径更改后,它失败并出现上述异常。
有人可以帮忙吗,或者我的基本需要是将 Avro GenericRecord 转换成 Spark Row,这样我就可以用它创建 Dataframe,请帮忙
谢谢。
希望这会有所帮助。在第一部分中,您可以找到如何从 GenericRecord 转换为 Row
从 RDD[GenericRecord] 创建数据帧时有几个步骤
- 首先需要将org.apache.avro.generic.GenericRecord转换成org.apache.spark.sql.Row
Use com.databricks.spark.avro.SchemaConverters.createConverterToSQL( sourceAvroSchema: Schema,targetSqlType: DataType)
这是spark-avro3.2版本的私有方法。如果我们有相同或小于 3.2,那么将此方法复制到您自己的 util class 中并使用它,否则直接使用它。
- 从行 (rowSeq) 集合创建数据框。
val rdd = ssc.sparkContext.parallelize(rowSeq,numParition) val dataframe = sparkSession.createDataFrame(rowRDD, schemaType)
这解决了我的问题。
也许这有助于晚些时候加入游戏的人。
由于 spark-avro
已被弃用并且现在已集成到 Spark 中,因此可以通过不同的方式来实现。
import org.apache.spark.sql.avro._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.catalyst.encoders.RowEncoder
...
val avroSchema = data.head.getSchema
val sparkTypes = SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
val converter = new AvroDeserializer(avroSchema, sparkTypes)
val enconder = RowEncoder.apply(sparkTypes).resolveAndBind()
val rows = data.map { record =>
enconder.fromRow(converter.deserialize(record).asInstanceOf[InternalRow])
}
val df = sparkSession.sqlContext.createDataFrame(sparkSession.sparkContext.parallelize(rows), sparkTypes)