如何将 RDD[GenericRecord] 转换为 Scala 中的数据帧?

How to convert RDD[GenericRecord] to dataframe in scala?

我使用 Avro(序列化器和反序列化器)从 kafka 主题获取推文。 然后我创建了一个 spark 消费者,它在 RDD [GenericRecord] 的 Dstream 中提取推文。 现在我想将每个 rdd 转换为数据框,以通过 SQL 分析这些推文。 请将 RDD[GenericRecord] 转换为数据帧的任何解决方案?

您可以使用 createDataFrame(rowRDD: RDD[Row], schema: StructType),它在 SQLContext 对象中可用。转换旧 DataFrame 的 RDD 的示例:

import sqlContext.implicits.
val rdd = oldDF.rdd
val newDF = oldDF.sqlContext.createDataFrame(rdd, oldDF.schema)

请注意,无需显式设置任何架构列。我们重用了旧的 DF 的模式,它是 StructType class 并且可以很容易地扩展。但是,这种方法有时是行不通的,并且在某些情况下可能不如第一种方法有效。

我花了一些时间来尝试完成这项工作(特别是如何正确反序列化数据,但看起来您已经涵盖了这一点)...已更新

  //Define function to convert from GenericRecord to Row
  def genericRecordToRow(record: GenericRecord, sqlType : SchemaConverters.SchemaType): Row = {
    val objectArray = new Array[Any](record.asInstanceOf[GenericRecord].getSchema.getFields.size)
    import scala.collection.JavaConversions._
    for (field <- record.getSchema.getFields) {
      objectArray(field.pos) = record.get(field.pos)
    }

    new GenericRowWithSchema(objectArray, sqlType.dataType.asInstanceOf[StructType])
  }

//Inside your stream foreachRDD
val yourGenericRecordRDD = ... 
val schema = new Schema.Parser().parse(...) // your schema
val sqlType = SchemaConverters.toSqlType(new Schema.Parser().parse(strSchema))

var rowRDD = yourGeneircRecordRDD.map(record => genericRecordToRow(record, sqlType))
val df = sqlContext.createDataFrame(rowRDD , sqlType.dataType.asInstanceOf[StructType])

如您所见,我正在使用 SchemaConverter 从您用于反序列化的模式中获取数据帧结构(这对于模式注册表来说可能会更痛苦)。为此,您需要以下依赖项

    <dependency>
        <groupId>com.databricks</groupId>
        <artifactId>spark-avro_2.11</artifactId>
        <version>3.2.0</version>
    </dependency>

您需要根据您的版本更改您的 spark 版本。

更新:上面的代码仅适用于 flat avro 模式。

对于嵌套结构,我使用了一些不同的东西。您可以复制 class SchemaConverters, it has to be inside of com.databricks.spark.avro (it uses some protected classes from the databricks package) or you can try to use the spark-bigquery 依赖项。默认情况下 class 将无法访问,因此您需要在包 com.databricks.spark.avro 中创建一个 class 以访问工厂方法。

package com.databricks.spark.avro

import com.databricks.spark.avro.SchemaConverters.createConverterToSQL
import org.apache.avro.Schema
import org.apache.spark.sql.types.StructType

class SchemaConverterUtils {

  def converterSql(schema : Schema, sqlType : StructType) = {
    createConverterToSQL(schema, sqlType)
  }

}

之后你应该可以像

一样转换数据
val schema = .. // your schema
val sqlType = SchemaConverters.toSqlType(schema).dataType.asInstanceOf[StructType]
....
//inside foreach RDD
var genericRecordRDD = deserializeAvroData(rdd)
/// 
var converter = SchemaConverterUtils.converterSql(schema, sqlType)
... 
val rowRdd = genericRecordRDD.flatMap(record => {
        Try(converter(record).asInstanceOf[Row]).toOption
      })
//To DataFrame
 val df = sqlContext.createDataFrame(rowRdd, sqlType)

虽然这样的事情可能对你有帮助,

val stream = ...

val dfStream = stream.transform(rdd:RDD[GenericRecord]=>{
     val df = rdd.map(_.toSeq)
              .map(seq=> Row.fromSeq(seq))
              .toDF(col1,col2, ....)

     df
})

我想向您推荐另一种方法。使用 Spark 2.x,您可以跳过创建 DStreams 的整个过程。相反,您可以使用结构化流来执行类似的操作,

val df = ss.readStream
  .format("com.databricks.spark.avro")
  .load("/path/to/files")

这将为您提供一个可以直接查询的数据框。这里,ss 是 spark session 的实例。 /path/to/files 是从 kafka 转储所有 avro 文件的地方。

PS: 您可能需要导入 spark-avro

libraryDependencies += "com.databricks" %% "spark-avro" % "4.0.0"

希望这对您有所帮助。干杯

and 的组合对我有用。

我使用以下内容创建了 MySchemaConversions

package com.databricks.spark.avro

import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.DataType

object MySchemaConversions {
  def createConverterToSQL(avroSchema: Schema, sparkSchema: DataType): (GenericRecord) => Row =
    SchemaConverters.createConverterToSQL(avroSchema, sparkSchema).asInstanceOf[(GenericRecord) => Row]
}

然后我用了

val myAvroType = SchemaConverters.toSqlType(schema).dataType
val myAvroRecordConverter = MySchemaConversions.createConverterToSQL(schema, myAvroType)

// unionedResultRdd 是 unionRDD[GenericRecord]

var rowRDD = unionedResultRdd.map(record => MyObject.myConverter(record, myAvroRecordConverter))
 val df = sparkSession.createDataFrame(rowRDD , myAvroType.asInstanceOf[StructType])

在对象 MyObject 中使用 myConverter 的好处是您不会遇到序列化问题 (java.io.NotSerializableException)。

object MyObject{
    def myConverter(record: GenericRecord,
        myAvroRecordConverter: (GenericRecord) => Row): Row =
            myAvroRecordConverter.apply(record)
}