Array[RDD[(String, Set[String])]] Spark Scala 中的转换
Array[RDD[(String, Set[String])]] transformation in Spark Scala
我有一个 Array[RDD[(String, Set[String])]] 类型的 RDD 数组,其中每个 RDD 都是键和值的元组。
键是字符串,值是 Set[String],我想 merge/union 设置相同的键。我正在尝试在 Scala 中执行此操作,但没有任何乐趣。你能帮帮我吗
e.g.
RDD["A",Set("1","2")]
RDD["A",Set("3","4")]
RDD["B",Set("1","2")]
RDD["B",Set("3","4")]
RDD["C",Set("1","2")]
RDD["C",Set("3","4")]
After transformation:
RDD["A",Set("1","2","3","4")]
RDD["B",Set("1","2","3","4")]
RDD["C",Set("1","2","3","4")]
如果单个 RDD
作为输出没问题(真的看不出有任何理由制作许多只有 1 条记录的 RDD),您可以减少 Array
的 RDD
变成一个 RDD
然后做一个 groupByKey
:
arr.reduce( _ ++ _ )
.groupByKey
.mapValues(_.flatMap(identity))
示例:
scala> val x = sc.parallelize( List( ("A", Set(1,2)) ) )
scala> val x2 = sc.parallelize( List( ("A", Set(3,4)) ) )
scala> val arr = Array(x,x2)
arr: Array[org.apache.spark.rdd.RDD[(String, scala.collection.immutable.Set[Int])]] = Array(ParallelCollectionRDD[0] at parallelize at <console>:27, ParallelCollectionRDD[1] at parallelize at <console>:27)
scala> arr.reduce( _ ++ _ ).groupByKey.mapValues(_.flatMap(identity)).foreach(println)
(A,List(1, 2, 3, 4))
@Edit:我发现这是一个非常糟糕的主意,建议您重新考虑它,但是您可以通过从上面获取所有键并多次过滤 RDD 来获得您想要的结果:
val sub = arr.reduce( _ ++ _ ).groupByKey.mapValues(_.flatMap(identity))
val keys = sub.map(_._1).collect()
val result = for(k <- keys) yield sub.filter(_._1 == k)
result: Array[org.apache.spark.rdd.RDD[(String, Iterable[Int])]]
每个RDD
都会有一个元组,真的不觉得它很有用,性能。
我有一个 Array[RDD[(String, Set[String])]] 类型的 RDD 数组,其中每个 RDD 都是键和值的元组。 键是字符串,值是 Set[String],我想 merge/union 设置相同的键。我正在尝试在 Scala 中执行此操作,但没有任何乐趣。你能帮帮我吗
e.g.
RDD["A",Set("1","2")]
RDD["A",Set("3","4")]
RDD["B",Set("1","2")]
RDD["B",Set("3","4")]
RDD["C",Set("1","2")]
RDD["C",Set("3","4")]
After transformation:
RDD["A",Set("1","2","3","4")]
RDD["B",Set("1","2","3","4")]
RDD["C",Set("1","2","3","4")]
如果单个 RDD
作为输出没问题(真的看不出有任何理由制作许多只有 1 条记录的 RDD),您可以减少 Array
的 RDD
变成一个 RDD
然后做一个 groupByKey
:
arr.reduce( _ ++ _ )
.groupByKey
.mapValues(_.flatMap(identity))
示例:
scala> val x = sc.parallelize( List( ("A", Set(1,2)) ) )
scala> val x2 = sc.parallelize( List( ("A", Set(3,4)) ) )
scala> val arr = Array(x,x2)
arr: Array[org.apache.spark.rdd.RDD[(String, scala.collection.immutable.Set[Int])]] = Array(ParallelCollectionRDD[0] at parallelize at <console>:27, ParallelCollectionRDD[1] at parallelize at <console>:27)
scala> arr.reduce( _ ++ _ ).groupByKey.mapValues(_.flatMap(identity)).foreach(println)
(A,List(1, 2, 3, 4))
@Edit:我发现这是一个非常糟糕的主意,建议您重新考虑它,但是您可以通过从上面获取所有键并多次过滤 RDD 来获得您想要的结果:
val sub = arr.reduce( _ ++ _ ).groupByKey.mapValues(_.flatMap(identity))
val keys = sub.map(_._1).collect()
val result = for(k <- keys) yield sub.filter(_._1 == k)
result: Array[org.apache.spark.rdd.RDD[(String, Iterable[Int])]]
每个RDD
都会有一个元组,真的不觉得它很有用,性能。