如何将 Kafka header 的值作为单个列获取到 Spark 数据集?

How to get Kafka header's value to Spark Dataset as a single column?

我们有一个启用了 headers 的 Kafka 流

  .option("includeHeaders", true)

从而使它们存储为高级数据集的列,承载具有键和值的内部结构数组:

root
 |-- topic: string (nullable = true)
 |-- key: string (nullable = true)
 |-- value: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- headers: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- key: string (nullable = true)
 |    |    |-- value: binary (nullable = true)

我可以访问所需的 header,它在数组中的顺序如下:

val controlDataFrame = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", kafkaLocation)
      .option("includeHeaders", true)
      .option("failOnDataLoss", value = false)
      .option("subscribe", "mytopic")
      .load()
      .withColumn("acceptTimestamp", element_at(col("headers"),1))
      .withColumn("acceptTimestamp2", col("acceptTimestamp.value").cast("STRING"))
 

但是这个解决方案看起来很脆弱,因为在另一端生成的 header 的顺序总是可以随着更新而改变,而只有键名在那里看起来是稳定的。我如何查找结构键并提取所需的结构而不是指向数组索引?

UPD.

感谢 Alex Ott 的帮助,我找到了将我想要的内容放入以下列的解决方案:

.withColumn("headers1", map_from_entries(col("headers")))
.withColumn("acceptTimestamp2", col("headers1.acceptTimestamp").cast("STRING"))

您可以使用 map_from_entries 函数将结构数组转换为映射,您可以在映射中按名称访问条目。

import org.apache.spark.sql.functions.map_from_entries

....
select(map_from_entries("headers").alias("headers"), ...)

但我记得,header 名称可能不是唯一的,这是将它们作为 key/value 对数组发送的主要原因。

另一种方法是使用 filter 函数按名称查找 headers - 这将允许处理 non-unique headers.

P.S。我使用了 Python 文档,因为我可以 link 单独的函数——这在 Scala 文档中并不容易。