如何将 Kafka header 的值作为单个列获取到 Spark 数据集?
How to get Kafka header's value to Spark Dataset as a single column?
我们有一个启用了 headers 的 Kafka 流
.option("includeHeaders", true)
从而使它们存储为高级数据集的列,承载具有键和值的内部结构数组:
root
|-- topic: string (nullable = true)
|-- key: string (nullable = true)
|-- value: string (nullable = true)
|-- timestamp: string (nullable = true)
|-- headers: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- key: string (nullable = true)
| | |-- value: binary (nullable = true)
我可以访问所需的 header,它在数组中的顺序如下:
val controlDataFrame = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafkaLocation)
.option("includeHeaders", true)
.option("failOnDataLoss", value = false)
.option("subscribe", "mytopic")
.load()
.withColumn("acceptTimestamp", element_at(col("headers"),1))
.withColumn("acceptTimestamp2", col("acceptTimestamp.value").cast("STRING"))
但是这个解决方案看起来很脆弱,因为在另一端生成的 header 的顺序总是可以随着更新而改变,而只有键名在那里看起来是稳定的。我如何查找结构键并提取所需的结构而不是指向数组索引?
UPD.
感谢 Alex Ott 的帮助,我找到了将我想要的内容放入以下列的解决方案:
.withColumn("headers1", map_from_entries(col("headers")))
.withColumn("acceptTimestamp2", col("headers1.acceptTimestamp").cast("STRING"))
您可以使用 map_from_entries 函数将结构数组转换为映射,您可以在映射中按名称访问条目。
import org.apache.spark.sql.functions.map_from_entries
....
select(map_from_entries("headers").alias("headers"), ...)
但我记得,header 名称可能不是唯一的,这是将它们作为 key/value 对数组发送的主要原因。
另一种方法是使用 filter 函数按名称查找 headers - 这将允许处理 non-unique headers.
P.S。我使用了 Python 文档,因为我可以 link 单独的函数——这在 Scala 文档中并不容易。
我们有一个启用了 headers 的 Kafka 流
.option("includeHeaders", true)
从而使它们存储为高级数据集的列,承载具有键和值的内部结构数组:
root
|-- topic: string (nullable = true)
|-- key: string (nullable = true)
|-- value: string (nullable = true)
|-- timestamp: string (nullable = true)
|-- headers: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- key: string (nullable = true)
| | |-- value: binary (nullable = true)
我可以访问所需的 header,它在数组中的顺序如下:
val controlDataFrame = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafkaLocation)
.option("includeHeaders", true)
.option("failOnDataLoss", value = false)
.option("subscribe", "mytopic")
.load()
.withColumn("acceptTimestamp", element_at(col("headers"),1))
.withColumn("acceptTimestamp2", col("acceptTimestamp.value").cast("STRING"))
但是这个解决方案看起来很脆弱,因为在另一端生成的 header 的顺序总是可以随着更新而改变,而只有键名在那里看起来是稳定的。我如何查找结构键并提取所需的结构而不是指向数组索引?
UPD.
感谢 Alex Ott 的帮助,我找到了将我想要的内容放入以下列的解决方案:
.withColumn("headers1", map_from_entries(col("headers")))
.withColumn("acceptTimestamp2", col("headers1.acceptTimestamp").cast("STRING"))
您可以使用 map_from_entries 函数将结构数组转换为映射,您可以在映射中按名称访问条目。
import org.apache.spark.sql.functions.map_from_entries
....
select(map_from_entries("headers").alias("headers"), ...)
但我记得,header 名称可能不是唯一的,这是将它们作为 key/value 对数组发送的主要原因。
另一种方法是使用 filter 函数按名称查找 headers - 这将允许处理 non-unique headers.
P.S。我使用了 Python 文档,因为我可以 link 单独的函数——这在 Scala 文档中并不容易。