根据在另一个 RDD 中的排序来排序 Spark RDD

Order Spark RDD based on ordering in another RDD

我有一个带有这样字符串的 RDD(以特定方式排序):

["A","B","C","D"]

还有另一个 RDD,列表如下:

["C","B","F","K"],
["B","A","Z","M"],
["X","T","D","C"]

我想根据元素在第一个 RDD 中出现的顺序对第二个 RDD 中每个列表中的元素进行排序。没有出现在第一个列表中的元素的顺序无关紧要。

根据上面的例子,我想得到这样一个RDD:

["B","C","F","K"],
["A","B","Z","M"],
["C","D","X","T"]

我知道我应该在处理第二个 RDD 中的每个列表时使用广播变量来广播第一个 RDD。但是我对 Spark/Scala(以及一般的函数式编程)还很陌生,所以我不确定该怎么做。

我假设第一个 RDD 很小,因为你谈到广播它。在那种情况下你是对的,广播顺序是解决你问题的好方法。

// generating data
val ordering_rdd = sc.parallelize(Seq("A","B","C","D"))
val other_rdd = sc.parallelize(Seq(
    Seq("C","B","F","K"),
    Seq("B","A","Z","M"),
    Seq("X","T","D","C")
))
// let's start by collecting the ordering onto the driver
val ordering = ordering_rdd.collect()
// Let's broadcast the list:
val ordering_br = sc.broadcast(ordering)

// Finally, let's use the ordering to sort your records:
val result = other_rdd
    .map( _.sortBy(x => {
        val index = ordering_br.value.indexOf(x)
        if(index == -1) Int.MaxValue else index
    }))

请注意,如果在列表中找不到元素,则 indexOf returns -1。如果我们保持原样,所有未找到的元素都会在开头结束。我知道你想把它们放在最后,所以我将 -1 调大了一些。

打印结果:

scala> result.collect().foreach(println)
List(B, C, F, K)
List(A, B, Z, M)
List(C, D, X, T)