Apache Spark - 查找 Array/List/Set 个子集
Apache Spark - Finding Array/List/Set subsets
我有 2 个数据帧,每个数据帧都有 Array[String] 作为列之一。对于一个数据框中的每个条目,我需要在另一个数据框中找出子集(如果有的话)。示例如下:
DF1:
----------------------------------------------------
id : Long | labels : Array[String]
---------------------------------------------------
10 | [label1, label2, label3]
11 | [label4, label5]
12 | [label6, label7]
DF2:
----------------------------------------------------
item : String | labels : Array[String]
---------------------------------------------------
item1 | [label1, label2, label3, label4, label5]
item2 | [label4, label5]
item3 | [label4, label5, label6, label7]
经过我描述的子集操作后,预期的o/p应该是
DF3:
----------------------------------------------------
item : String | id : Long
---------------------------------------------------
item1 | [10, 11]
item2 | [11]
item3 | [11, 12]
保证DF2在DF1中总是有相应的子集,所以不会有任何遗留元素。
有人可以在这里提供正确的方法吗?看起来对于 DF2 中的每个元素,我需要扫描 DF1 并在第 2 列上执行子集操作(或设置减法),直到找到所有子集并耗尽该行中的标签,同时累积 [= 的列表28=] 字段。我如何以紧凑高效的方式执行此操作?任何帮助是极大的赞赏。实际上,我可能在 DF1 中有 100 个元素,在 DF2 中有 1000 个元素。
我不知道有什么方法可以有效地执行这种操作。但是,这是使用 UDF
以及笛卡尔连接的一种可能解决方案。
UDF
接受两个序列并检查第一个序列中的所有字符串是否都存在于第二个序列中:
val matchLabel = udf((array1: Seq[String], array2: Seq[String]) => {
array1.forall{x => array2.contains(x)}
})
要使用笛卡尔连接,需要启用它,因为它的计算量很大。
val spark = SparkSession.builder.getOrCreate()
spark.conf.set("spark.sql.crossJoin.enabled", true)
这两个数据帧使用 UDF
连接在一起。之后,生成的数据框按 item
列分组,以收集所有 ID 的列表。使用与问题中相同的 DF1
和 DF2
:
val DF3 = DF2.join(DF1, matchLabel(DF1("labels"), DF2("labels")))
.groupBy("item")
.agg(collect_list("id").as("id"))
结果如下:
+-----+--------+
| item| id|
+-----+--------+
|item3|[11, 12]|
|item2| [11]|
|item1|[10, 11]|
+-----+--------+
我有 2 个数据帧,每个数据帧都有 Array[String] 作为列之一。对于一个数据框中的每个条目,我需要在另一个数据框中找出子集(如果有的话)。示例如下:
DF1:
----------------------------------------------------
id : Long | labels : Array[String]
---------------------------------------------------
10 | [label1, label2, label3]
11 | [label4, label5]
12 | [label6, label7]
DF2:
----------------------------------------------------
item : String | labels : Array[String]
---------------------------------------------------
item1 | [label1, label2, label3, label4, label5]
item2 | [label4, label5]
item3 | [label4, label5, label6, label7]
经过我描述的子集操作后,预期的o/p应该是
DF3:
----------------------------------------------------
item : String | id : Long
---------------------------------------------------
item1 | [10, 11]
item2 | [11]
item3 | [11, 12]
保证DF2在DF1中总是有相应的子集,所以不会有任何遗留元素。
有人可以在这里提供正确的方法吗?看起来对于 DF2 中的每个元素,我需要扫描 DF1 并在第 2 列上执行子集操作(或设置减法),直到找到所有子集并耗尽该行中的标签,同时累积 [= 的列表28=] 字段。我如何以紧凑高效的方式执行此操作?任何帮助是极大的赞赏。实际上,我可能在 DF1 中有 100 个元素,在 DF2 中有 1000 个元素。
我不知道有什么方法可以有效地执行这种操作。但是,这是使用 UDF
以及笛卡尔连接的一种可能解决方案。
UDF
接受两个序列并检查第一个序列中的所有字符串是否都存在于第二个序列中:
val matchLabel = udf((array1: Seq[String], array2: Seq[String]) => {
array1.forall{x => array2.contains(x)}
})
要使用笛卡尔连接,需要启用它,因为它的计算量很大。
val spark = SparkSession.builder.getOrCreate()
spark.conf.set("spark.sql.crossJoin.enabled", true)
这两个数据帧使用 UDF
连接在一起。之后,生成的数据框按 item
列分组,以收集所有 ID 的列表。使用与问题中相同的 DF1
和 DF2
:
val DF3 = DF2.join(DF1, matchLabel(DF1("labels"), DF2("labels")))
.groupBy("item")
.agg(collect_list("id").as("id"))
结果如下:
+-----+--------+
| item| id|
+-----+--------+
|item3|[11, 12]|
|item2| [11]|
|item1|[10, 11]|
+-----+--------+