从流中派生新列

Derive new column from stream

我正在使用 Spark Structured Streaming 从 Kafka 读取数据,我正在尝试根据列表的内容创建一个新列。

我这样建模我的数据:

case class Product(properties: List[Property])
case class Property(code: String, value: String)

我是这样读的:

spark
  .readStream
  .load()
  .select($"value".cast("STRING").as("value"))
  .select(from_json($"value", schema).as("product").as[Product])
  .withColumn("articleType", when(array_contains($"properties.code", "ATY"), $"properties.value")

此方法会创建一个名为 'articleType' 的新列,其中包含 属性 存在时的所有 属性 值,但我只希望 ATY 值的值位于该列中。

基本上我想做这样的事情

properties.filter(_.code == "ATY").map(_.value)

我对 Spark 还很陌生,所以这可能不是正确的方法,但任何指示都会有所帮助。

设法使用 udf 解决了这个问题。

val getArticleType = udf((properties: Seq[Row]) => {
  properties.size.toString
  properties.find(_.getString(2) == "ATY").map(_.getString(1))
}, StringType)

并像这样使用它:

.withColumn("articleType", getArticleType(col("properties")))