从流中派生新列
Derive new column from stream
我正在使用 Spark Structured Streaming 从 Kafka 读取数据,我正在尝试根据列表的内容创建一个新列。
我这样建模我的数据:
case class Product(properties: List[Property])
case class Property(code: String, value: String)
我是这样读的:
spark
.readStream
.load()
.select($"value".cast("STRING").as("value"))
.select(from_json($"value", schema).as("product").as[Product])
.withColumn("articleType", when(array_contains($"properties.code", "ATY"), $"properties.value")
此方法会创建一个名为 'articleType' 的新列,其中包含 属性 存在时的所有 属性 值,但我只希望 ATY 值的值位于该列中。
基本上我想做这样的事情
properties.filter(_.code == "ATY").map(_.value)
我对 Spark 还很陌生,所以这可能不是正确的方法,但任何指示都会有所帮助。
设法使用 udf 解决了这个问题。
val getArticleType = udf((properties: Seq[Row]) => {
properties.size.toString
properties.find(_.getString(2) == "ATY").map(_.getString(1))
}, StringType)
并像这样使用它:
.withColumn("articleType", getArticleType(col("properties")))
我正在使用 Spark Structured Streaming 从 Kafka 读取数据,我正在尝试根据列表的内容创建一个新列。
我这样建模我的数据:
case class Product(properties: List[Property])
case class Property(code: String, value: String)
我是这样读的:
spark
.readStream
.load()
.select($"value".cast("STRING").as("value"))
.select(from_json($"value", schema).as("product").as[Product])
.withColumn("articleType", when(array_contains($"properties.code", "ATY"), $"properties.value")
此方法会创建一个名为 'articleType' 的新列,其中包含 属性 存在时的所有 属性 值,但我只希望 ATY 值的值位于该列中。
基本上我想做这样的事情
properties.filter(_.code == "ATY").map(_.value)
我对 Spark 还很陌生,所以这可能不是正确的方法,但任何指示都会有所帮助。
设法使用 udf 解决了这个问题。
val getArticleType = udf((properties: Seq[Row]) => {
properties.size.toString
properties.find(_.getString(2) == "ATY").map(_.getString(1))
}, StringType)
并像这样使用它:
.withColumn("articleType", getArticleType(col("properties")))