为什么 Spark DataFrames 不更改其模式以及如何处理它?

Why do Spark DataFrames not change their schema and what to do about it?

我正在使用 Spark 2.1 的结构化流读取内容为二进制 avro 编码的 Kafka 主题。

因此,设置DataFrame后:

val messages = spark
  .readStream
  .format("kafka")
  .options(kafkaConf)
  .option("subscribe", config.getString("kafka.topic"))
  .load()

如果我打印此 DataFrame (messages.printSchema()) 的架构,我将得到以下信息:

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- timestampType: integer (nullable = true)

这个问题应该与 avro 解码问题正交,但假设我想以某种方式将消息 DataFrame 中的 value 内容转换为 Dataset[BusinessObject],通过一个函数 Array[Byte] => BusinessObject。例如完整性,函数可能只是(使用 avro4s):

case class BusinessObject(userId: String, eventId: String)

def fromAvro(bytes: Array[Byte]): BusinessObject =
    AvroInputStream.binary[BusinessObject](
        new ByteArrayInputStream(bytes)
    ).iterator.next

当然,as miguno says in this related question 我不能只用 DataFrame.map() 应用转换,因为我需要为这样的 BusinessObject.

提供一个隐式编码器

可以定义为:

implicit val myEncoder : Encoder[BusinessObject] = org.apache.spark.sql.Encoders.kryo[BusinessObject]

现在,执行映射:

val transformedMessages : Dataset[BusinessObjecŧ] = messages.map(row => fromAvro(row.getAs[Array[Byte]]("value")))

但是如果我查询新架构,我会得到以下信息:

root
 |-- value: binary (nullable = true)

而且我认为这没有任何意义,因为数据集应该使用 BusinessObject 案例的 Product 属性 - class 并获得正确的值。

我在 reader 中看到了一些关于 Spark SQL 使用 .schema(StructType) 的示例,但我不能这样做,不仅仅是因为我使用 readStream , 但因为我实际上必须先转换列才能在这些字段中进行操作。

我希望告诉 Spark SQL 引擎,transformedMessages 数据集模式是一个 StructField,带有 case class' 字段。

我会说你得到了你想要的。正如我 Encoders.kryo 生成一个带有序列化对象的 blob 。它的内部结构对于 SQL 引擎来说是不透明的,并且在不反序列化对象的情况下无法访问。如此有效,您的代码所做的就是采用一种序列化格式并将其替换为另一种格式。

您遇到的另一个问题是您尝试将动态类型的 DataFrame (Dataset[Row]) 与静态类型的对象混合使用。排除 UDT API Spark SQL 不会像这样工作。您可以静态地使用 DatasetDataFrame 以及使用 struct 层次结构编码的对象结构。

好消息是像 BusinessObject 这样简单的产品类型应该可以正常工作,而不需要笨拙的 Encoders.kryo。只需跳过 Kryo 编码器定义并确保导入隐式编码器:

import spark.implicits._