如何压缩两个 RDD (Scala)

How to zip two RDDs (Scala)

我有两个具有这种形式的 RDD:

RDD A:(列:icchID,X_Coord)

[ICCH 1,10.0]
[ICCH 2,10.0]
[ICCH 4,100.0]
[ICCH 4,100.0]
[ICCH 2,13.0]

RDD B:(列:Y_Coord,Class)

[10.0,A]
[64.0,B]
[39.0,A]
[9.0,C]
[80.0,D]

我想合并这两个 RDD,这样我就有了一个具有这种形式的 RDD:

[ICCH 1,10.0,10.0,A]
[ICCH 2,10.0,64.0,B]
[ICCH 4,100.0,39.0,A]
[ICCH 4,100.0,9.0,C]
[ICCH 2,13.0,80.0,D]

请注意,两个 RDD 具有相同的行和列。是否可以使用 Scala 来做到这一点?

P.S。我是 Scala 的菜鸟。我正在使用 Databricks。

您可以考虑使用 RDD zip 方法以及通过 map:

进行的转换
val rddA = sc.parallelize(Seq(
  ("ICCH 1", 10.0), ("ICCH 2", 10.0), ("ICCH 4", 100.0), ("ICCH 5", 100.0), ("ICCH 2", 13.0)
))

val rddB = sc.parallelize(Seq(
  (10.0, "A"), (64.0, "B"), (39.0, "A"), (9.0, "C"), (80.0, "D")
))

val zippedRDD = (rddA zip rddB).map{ case ((id, x), (y, c)) => (id, x, y, c) }
// zippedRDD: org.apache.spark.rdd.RDD[(String, Double, Double, String)] = ...

zippedRDD.collect
// Array[(String, Double, Double, String)] = Array(
//   (ICCH 1,10.0,10.0,A), (ICCH 2,10.0,64.0,B), (ICCH 4,100.0,39.0,A), (ICCH 5,100.0,9.0,C), (ICCH 2,13.0,80.0,D)
// )

请注意,保持两个 RDD 之间的顺序是一件棘手的事情。这是有关订购问题的相关

下面是 Spark API 文档的内容:RDD zip 方法说:

def zip[U](other: RDD[U])(implicit arg0: ClassTag[U]): RDD[(T, U)]

Zips this RDD with another one, returning key-value pairs with the first element in each RDD, second element in each RDD, etc. Assumes that the two RDDs have the same number of partitions and the same number of elements in each partition (e.g. one was made through a map on the other).