Spark中`join`和`union`后跟`groupByKey`的区别?

Difference between `join` and `union` followed by `groupByKey` in Spark?

我找不到很好的理由:

anRDD.join(anotherRDD)

应该不同于:

anRDD.union(anotherRDD).groupByKey()

但是,后者给我一个错误,而前者没有。如果绝对需要,我可以提供一个例子,但我想从功能抽象的角度来了解。我问过的人都不能给我一个很好的解释。

前者和后者的结果集不同:

  • 前任:

    (K, V).join(K, W) = (K, (V, W))
    

    前一个结果是等值连接,SQL类比:

    anRDD.K = anotherRDD.K
    
  • 后者:

    不仅包括equi-join结果,还包括来自anRDD的非匹配部分和anotherRDD的非匹配部分。

这里有一些要点,我将在下面用一些代码来说明:

  • join 与两个 rdds 一起工作,每个 rdds 由对组成,并且具有需要匹配的相同密钥。两个rdds的值类型不需要匹配。生成的 rdd 将始终具有 (Key, (Value1, Value2))
  • 类型的条目 如果 anRDDanotherRDD 具有不同类型的值,
  • anRDD.union(anotherRDD).groupByKey() 将产生错误;如果键和值的类型相同,则不会产生错误。结果将是类型为 (Key, Iterable[Value]) 的条目,其中 Iterable 不需要像 join 那样具有长度 2。

示例:

val rdd1 = sc.parallelize(Seq(  ("a", 1) , ("b", 1)))
val rdd2 = sc.parallelize(Seq(  ("a", 2) , ("b", 2)))
val rdd3 = sc.parallelize(Seq(  ("a", 2.0) , ("b", 2.0))) // different Value type
val rdd4 = sc.parallelize(Seq(  ("a", 1) , ("b", 1), ("a", 5) , ("b", 5)))
val rdd5 = sc.parallelize(Seq(  ("a", 2) , ("b", 2), ("a", 5) , ("b", 5)))

生成以下内容:

scala> rdd1.join(rdd2)
res18: org.apache.spark.rdd.RDD[(String, (Int, Int))] = MapPartitionsRDD[77] at join at <console>:26

scala> rdd1.union(rdd2).groupByKey
res19: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[79] at groupByKey at <console>:26

scala> rdd1.union(rdd3).groupByKey
<console>:26: error: type mismatch;
 found   : org.apache.spark.rdd.RDD[(String, Double)]
 required: org.apache.spark.rdd.RDD[(String, Int)]
              rdd1.union(rdd3).groupByKey

然而请注意,如果您在 rdds 中重复键,会产生不同的结果:

scala> rdd4.union(rdd5).groupByKey.collect.mkString("\n")
res21: String = 
(a,CompactBuffer(1, 5, 2, 5))
(b,CompactBuffer(1, 5, 2, 5))

scala> rdd4.join(rdd5).collect.mkString("\n")
res22: String = 
(a,(1,2))
(a,(1,5))
(a,(5,2))
(a,(5,5))
(b,(1,2))
(b,(1,5))
(b,(5,2))
(b,(5,5))

编辑:OP 使用的是 Python,而不是 Scala。 Python 和 Scala 在类型安全方面存在差异。如上所示,Scala 会捕捉到两个 RDD 之间类型不匹配的情况; Python 不会立即捕获它,但稍后当您尝试将方法应用于错误类型的对象时会产生神秘错误。请记住,Spark 是用 Python API 用 Scala 编写的。

确实,我在评论中尝试了 OP 代码,在 pyspark 中,它可以处理像 count() 这样的简单操作。但是,如果您尝试对每个值求平方(您可以对整数进行计算,但不能对字符串进行计算)

,则会产生错误

这是数据:注意我省略了列表,我只有值 1 和 0。

B = [('b',1), ('c',0)]
C = [('b', 'bs'), ('c', 'cs')]
anRDD = sc.parallelize(B)
anotherRDD = sc.parallelize(C)

这是输出:

>>> anRDD.join(anotherRDD).count()
2
>>> anRDD.union(anotherRDD).groupByKey().count()
2
>>> for y in anRDD.map(lambda (a, x): (a, x*x)).collect():
...   print y
... 
('b', 1)
('c', 0)
>>> for y in anRDD.union(anotherRDD).map(lambda (a, x): (a, x*x)).collect():
...   print y
... 
15/12/13 15:18:51 ERROR Executor: Exception in task 5.0 in stage 23.0 (TID 169)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):