根据给定条件从数组 <struct> 中选择一行

Selecting a row from array<struct> based on given condition

我有一个具有以下架构的数据框 -

|-- ID: string (nullable = true)
|-- VALUES: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- _v1: string (nullable = true)
|    |    |-- _v2: string (nullable = true)

VALUES 就像 -

[["ABC","a"],["PQR","c"],["XYZ","b"],["TUV","d"]]
[["PQR","g"]]
[["TUV","f"],["ABC","e"]]

我必须 select 根据 _v1 的值从此数组中提取一个结构。这些值中有一个层次结构,例如 -

"ABC" -> "XYZ" -> "PQR" -> "TUV"

现在,如果 "TUV" 存在,我们将 select _v1 中包含 "TUV" 的行。否则我们将检查 "PQR"。如果存在 "PQR",则取其行。否则检查 "XYZ" 等等。

结果 df 应该类似于 -(现在是 StructType,而不是 Array[Struct])

["TUV","d"]
["PQR","g"]
["TUV","f"]

有人可以指导我如何通过创建 udf 来解决这个问题吗? 提前致谢。

你可以像下面那样做

import org.apache.spark.sql.functions._
def getIndex = udf((array : mutable.WrappedArray[String]) => {
  if(array.contains("TUV")) array.indexOf("TUV")
  else if(array.contains("PQR")) array.indexOf("PQR")
  else if(array.contains("XYZ")) array.indexOf("XYZ")
  else if(array.contains("ABC")) array.indexOf("ABC")
  else 0
})

df.select($"VALUES"(getIndex($"VALUES._v1")).as("selected"))

你应该有以下输出

+--------+
|selected|
+--------+
|[TUV,d] |
|[PQR,g] |
|[TUV,f] |
+--------+

希望回答对你有帮助

已更新

您可以使用 . 表示法 select struct 列的元素。这里 $"VALUES._v1" 是 selecting 所有 struct_v1 并以相同的顺序将它们传递给 udf 作为 Array 函数。

例如:对于 [["ABC","a"],["PQR","c"],["XYZ","b"],["TUV","d"]]$"VALUES._v1" 将 return ["ABC","PQR","XYZ","TUV"] 传递给 udf 函数

udf 函数中,匹配字符串的数组索引被 return 编辑。例如:对于 ["ABC","PQR","XYZ","TUV"], "TUV" 匹配,所以它将 return 3.

对于第一行,getIndex($"VALUES._v1") 会 return 3 所以 $"VALUES"(getIndex($"VALUES._v1") 等同于 $"VALUES"(3),这是 [=19= 的第四个元素] 即 ["TUV","d"]

希望解释清楚。

只要每行最多只包含每个 _v1 值一次,这就应该有效。 UDF 将 return hierarchy 列表中最佳值的索引。然后在 _v1 中包含此值的结构将被选择并放入 "select" 列。

val hierarchy = List("TUV", "PQR", "XYZ", "ABC")

val findIndex = udf((rows: Seq[String]) => {
  val s = rows.toSet
  val best = hierarchy.filter(h => s contains h).head
  rows.indexOf(best)
})

df.withColumn("select", $"VALUES"(findIndex($"VALUES._v2")))

订单使用列表,方便扩展到 4 个以上的值。