如何在 Spark 2 中搜索结构?

How to search through struct in Spark 2?

我在Spark 2.2.0Scala 2.11.8中有以下两个DataFrames

df1 =

+----------+-------------------------------+
|item      |        other_items            |
+----------+-------------------------------+
|  111     |[[444,1.0],[333,0.5],[666,0.4]]|
|  222     |[[444,1.0],[333,0.5]]          |
|  333     |[]                             |
|  444     |[[111,2.0],[555,0.5],[777,0.2]]|
+----------+-------------------------------+

printScheme 给出以下输出:

 |-- item: string (nullable = true)
 |-- other_items: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- item: string (nullable = true)
 |    |    |-- rank: double (nullable = true)

并且:

df2 = 

+----------+-------------+
|itemA     | itemB       |
+----------+-------------+
|  111     | 333         |
|  222     | 444         |
|  333     | 555         |
|  444     | 777         |
+----------+-------------+

对于 df2 中的每一对,我想从 df1 中找到 rank。为此,我应该在 df1 中找到相同的对,以便 df1.item 等于 df2.itemA 并且 other_items.struct.[item] 等于 df2.itemB。如果找不到这样的对,则排名应为 0。

结果应该是这个:

+----------+-------------+-------------+
|itemA     | itemB       |  rank       |
+----------+-------------+-------------+
|  111     | 333         |   0.5       |
|  222     | 444         |   1.0       |
|  333     | 555         |   0.0       |
|  444     | 777         |   0.2       |
+----------+-------------+-------------+

我该怎么做?

这应该可以满足您的要求。诀窍是在加入之前爆炸other_items:

df2.as("df2").join(
   df1.select($"item", explode($"other_items").as("other_items")).as("df1"),
    $"df2.itemA" === $"df1.item" and $"df2.itemB" === $"df1.other_items.item"
    , "left"
 )
 .select($"itemA", $"itemB", coalesce($"df1.other_items.rank", lit(0.0)).as("rank"))
 .show()

您可以通过定义一个 udf 函数并在 join 两个 dataframe 作为

之后调用该 udf 函数来实现您的要求
import org.apache.spark.sql.functions._
def findRank = udf((items: mutable.WrappedArray[String], ranks: mutable.WrappedArray[Double], itemB: String) => {
  val index = items.indexOf(itemB)
  if(index != -1) ranks(index) else 0.0
})
df1.join(df2, df1("item") === df2("itemA"), "right")
    .select(df2("itemA"), df2("itemB"), findRank(df1("other_items.item"), df1("other_items.rank"), df2("itemB")).as("rank"))
  .show(false)

你应该 dataframe

+-----+-----+----+
|itemA|itemB|rank|
+-----+-----+----+
|111  |333  |0.5 |
|222  |444  |1.0 |
|333  |555  |0.0 |
|444  |777  |0.2 |
+-----+-----+----+