Spark中`join`和`union`后跟`groupByKey`的区别?
Difference between `join` and `union` followed by `groupByKey` in Spark?
我找不到很好的理由:
anRDD.join(anotherRDD)
应该不同于:
anRDD.union(anotherRDD).groupByKey()
但是,后者给我一个错误,而前者没有。如果绝对需要,我可以提供一个例子,但我想从功能抽象的角度来了解。我问过的人都不能给我一个很好的解释。
前者和后者的结果集不同:
前任:
(K, V).join(K, W) = (K, (V, W))
前一个结果是等值连接,SQL类比:
anRDD.K = anotherRDD.K
后者:
不仅包括equi-join结果,还包括来自anRDD的非匹配部分和anotherRDD的非匹配部分。
这里有一些要点,我将在下面用一些代码来说明:
join
与两个 rdds 一起工作,每个 rdds 由对组成,并且具有需要匹配的相同密钥。两个rdds的值类型不需要匹配。生成的 rdd 将始终具有 (Key, (Value1, Value2)) 类型的条目
如果 anRDD
和 anotherRDD
具有不同类型的值,anRDD.union(anotherRDD).groupByKey()
将产生错误;如果键和值的类型相同,则不会产生错误。结果将是类型为 (Key, Iterable[Value]) 的条目,其中 Iterable 不需要像 join 那样具有长度 2。
示例:
val rdd1 = sc.parallelize(Seq( ("a", 1) , ("b", 1)))
val rdd2 = sc.parallelize(Seq( ("a", 2) , ("b", 2)))
val rdd3 = sc.parallelize(Seq( ("a", 2.0) , ("b", 2.0))) // different Value type
val rdd4 = sc.parallelize(Seq( ("a", 1) , ("b", 1), ("a", 5) , ("b", 5)))
val rdd5 = sc.parallelize(Seq( ("a", 2) , ("b", 2), ("a", 5) , ("b", 5)))
生成以下内容:
scala> rdd1.join(rdd2)
res18: org.apache.spark.rdd.RDD[(String, (Int, Int))] = MapPartitionsRDD[77] at join at <console>:26
scala> rdd1.union(rdd2).groupByKey
res19: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[79] at groupByKey at <console>:26
scala> rdd1.union(rdd3).groupByKey
<console>:26: error: type mismatch;
found : org.apache.spark.rdd.RDD[(String, Double)]
required: org.apache.spark.rdd.RDD[(String, Int)]
rdd1.union(rdd3).groupByKey
然而请注意,如果您在 rdds 中重复键,会产生不同的结果:
scala> rdd4.union(rdd5).groupByKey.collect.mkString("\n")
res21: String =
(a,CompactBuffer(1, 5, 2, 5))
(b,CompactBuffer(1, 5, 2, 5))
scala> rdd4.join(rdd5).collect.mkString("\n")
res22: String =
(a,(1,2))
(a,(1,5))
(a,(5,2))
(a,(5,5))
(b,(1,2))
(b,(1,5))
(b,(5,2))
(b,(5,5))
编辑:OP 使用的是 Python,而不是 Scala。 Python 和 Scala 在类型安全方面存在差异。如上所示,Scala 会捕捉到两个 RDD 之间类型不匹配的情况; Python 不会立即捕获它,但稍后当您尝试将方法应用于错误类型的对象时会产生神秘错误。请记住,Spark 是用 Python API 用 Scala 编写的。
确实,我在评论中尝试了 OP 代码,在 pyspark 中,它可以处理像 count()
这样的简单操作。但是,如果您尝试对每个值求平方(您可以对整数进行计算,但不能对字符串进行计算)
,则会产生错误
这是数据:注意我省略了列表,我只有值 1 和 0。
B = [('b',1), ('c',0)]
C = [('b', 'bs'), ('c', 'cs')]
anRDD = sc.parallelize(B)
anotherRDD = sc.parallelize(C)
这是输出:
>>> anRDD.join(anotherRDD).count()
2
>>> anRDD.union(anotherRDD).groupByKey().count()
2
>>> for y in anRDD.map(lambda (a, x): (a, x*x)).collect():
... print y
...
('b', 1)
('c', 0)
>>> for y in anRDD.union(anotherRDD).map(lambda (a, x): (a, x*x)).collect():
... print y
...
15/12/13 15:18:51 ERROR Executor: Exception in task 5.0 in stage 23.0 (TID 169)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
我找不到很好的理由:
anRDD.join(anotherRDD)
应该不同于:
anRDD.union(anotherRDD).groupByKey()
但是,后者给我一个错误,而前者没有。如果绝对需要,我可以提供一个例子,但我想从功能抽象的角度来了解。我问过的人都不能给我一个很好的解释。
前者和后者的结果集不同:
前任:
(K, V).join(K, W) = (K, (V, W))
前一个结果是等值连接,SQL类比:
anRDD.K = anotherRDD.K
后者:
不仅包括equi-join结果,还包括来自anRDD的非匹配部分和anotherRDD的非匹配部分。
这里有一些要点,我将在下面用一些代码来说明:
join
与两个 rdds 一起工作,每个 rdds 由对组成,并且具有需要匹配的相同密钥。两个rdds的值类型不需要匹配。生成的 rdd 将始终具有 (Key, (Value1, Value2)) 类型的条目
如果 anRDD.union(anotherRDD).groupByKey()
将产生错误;如果键和值的类型相同,则不会产生错误。结果将是类型为 (Key, Iterable[Value]) 的条目,其中 Iterable 不需要像 join 那样具有长度 2。
anRDD
和 anotherRDD
具有不同类型的值,示例:
val rdd1 = sc.parallelize(Seq( ("a", 1) , ("b", 1)))
val rdd2 = sc.parallelize(Seq( ("a", 2) , ("b", 2)))
val rdd3 = sc.parallelize(Seq( ("a", 2.0) , ("b", 2.0))) // different Value type
val rdd4 = sc.parallelize(Seq( ("a", 1) , ("b", 1), ("a", 5) , ("b", 5)))
val rdd5 = sc.parallelize(Seq( ("a", 2) , ("b", 2), ("a", 5) , ("b", 5)))
生成以下内容:
scala> rdd1.join(rdd2)
res18: org.apache.spark.rdd.RDD[(String, (Int, Int))] = MapPartitionsRDD[77] at join at <console>:26
scala> rdd1.union(rdd2).groupByKey
res19: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[79] at groupByKey at <console>:26
scala> rdd1.union(rdd3).groupByKey
<console>:26: error: type mismatch;
found : org.apache.spark.rdd.RDD[(String, Double)]
required: org.apache.spark.rdd.RDD[(String, Int)]
rdd1.union(rdd3).groupByKey
然而请注意,如果您在 rdds 中重复键,会产生不同的结果:
scala> rdd4.union(rdd5).groupByKey.collect.mkString("\n")
res21: String =
(a,CompactBuffer(1, 5, 2, 5))
(b,CompactBuffer(1, 5, 2, 5))
scala> rdd4.join(rdd5).collect.mkString("\n")
res22: String =
(a,(1,2))
(a,(1,5))
(a,(5,2))
(a,(5,5))
(b,(1,2))
(b,(1,5))
(b,(5,2))
(b,(5,5))
编辑:OP 使用的是 Python,而不是 Scala。 Python 和 Scala 在类型安全方面存在差异。如上所示,Scala 会捕捉到两个 RDD 之间类型不匹配的情况; Python 不会立即捕获它,但稍后当您尝试将方法应用于错误类型的对象时会产生神秘错误。请记住,Spark 是用 Python API 用 Scala 编写的。
确实,我在评论中尝试了 OP 代码,在 pyspark 中,它可以处理像 count()
这样的简单操作。但是,如果您尝试对每个值求平方(您可以对整数进行计算,但不能对字符串进行计算)
这是数据:注意我省略了列表,我只有值 1 和 0。
B = [('b',1), ('c',0)]
C = [('b', 'bs'), ('c', 'cs')]
anRDD = sc.parallelize(B)
anotherRDD = sc.parallelize(C)
这是输出:
>>> anRDD.join(anotherRDD).count()
2
>>> anRDD.union(anotherRDD).groupByKey().count()
2
>>> for y in anRDD.map(lambda (a, x): (a, x*x)).collect():
... print y
...
('b', 1)
('c', 0)
>>> for y in anRDD.union(anotherRDD).map(lambda (a, x): (a, x*x)).collect():
... print y
...
15/12/13 15:18:51 ERROR Executor: Exception in task 5.0 in stage 23.0 (TID 169)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):