在 Spark 上创建组合键
Create Composite Key on Spark
我正在研究 Spark 上的基本协同过滤算法,但我遇到了 RDD 转换问题。我的输入 RDD 是这样的:
["John", "a", "5"], ["John", "b", "3"],["John", "c", "2"],["Mark", "a", "3"] ["Mark", "b", "4"] ["Lucy", "b", "2"] ["Lucy", "c", "5"]
在每个RDD元素中,第一个值是用户,第二个值是产品名称("a"、"b"或"c"),第三个值是它的评级.
我想通过按名称分组,然后按产品组合来转换输入 RDD,所以我的最终结果 RDD 将是
[("a","b"),("5","2")] [("a","b"),("3","4")] [("a","c"),("5","2")]
在上面的结果中,因为 John 和 Mark 在 a 和 b 上都有 "rating",所以我有两个 RDD 元素,以 (a,b) 为键,他们的评分为值。只有 John 对 a 和 c 都有评级,因此我只有一个以 (a,c) 为键的 RDD 元素。
您可以执行以下操作:
val keyedElems = rdd1.map { case (a, b, c) => (a, (b, c)) }
val groupedCombinations = keyedElems.groupByKey().flatMapValues(_.toList.combinations(2))
val productScoreCombinations = groupedCombinations.mapValues { case (elems: List[(String, String)]) => ((elems(0)._1, elems(1)._1), (elems(0)._2, elems(1)._2)) }.values
我们在这里做的是按用户键入您的输入数据集,通过按键分组生成(产品,评级)的可迭代列表,生成每个列表的 2 个组合,展平该列表以将每个组合放入它自己的记录,最后重新排序元素以在它们自己的元组中包含产品和评级。
当 运行 在 Spark 本地时,我看到以下内容:
scala> val rdd1 = sc.parallelize(Array(("John", "a", "5"),("John", "b", "3"),("John", "c", "2"),("Mark", "a", "3"),("Mark", "b", "4"),("Lucy", "b", "2"),("Lucy", "c", "5")))
rdd1: org.apache.spark.rdd.RDD[(String, String, String)] = ParallelCollectionRDD[0] at parallelize at <console>:21
scala> val rdd2 = rdd1.map { case (a, b, c) => (a, (b, c)) }
rdd2: org.apache.spark.rdd.RDD[(String, (String, String))] = MapPartitionsRDD[1] at map at <console>:23
scala> val rdd3 = rdd2.groupByKey().flatMapValues(_.toList.combinations(2))
rdd3: org.apache.spark.rdd.RDD[(String, List[(String, String)])] = MapPartitionsRDD[3] at flatMapValues at <console>:25
scala> val rdd4 = rdd3.mapValues { case (elems: List[(String, String)]) => ((elems(0)._1, elems(1)._1), (elems(0)._2, elems(1)._2)) }.values
rdd4: org.apache.spark.rdd.RDD[((String, String), (String, String))] = MapPartitionsRDD[7] at values at <console>:27
scala> rdd4.foreach(println)
...
((a,b),(3,4))
((b,c),(2,5))
((a,b),(5,3))
((a,c),(5,2))
((b,c),(3,2))
您可以 运行 对此进行简单筛选,以查找包含产品 "a" 的所有行。
(编辑:)
我没有注意到您已将其标记为 pyspark,因此我更新了下面的 python 解决方案(基本上从上面的 scala 映射而来):
import itertools
keyedElems = input.map(lambda x: (x[0], (x[1], x[2])))
groupedCombinations = keyedElems.groupByKey().flatMapValues(lambda arr: itertools.combinations(arr, 2))
productScoreCombinations = groupedCombinations.mapValues(lambda elems: ((elems[0][0], elems[1][0]), (elems[0][1], elems[1][1]))).map(lambda x: x[1])
当我 运行 上述代码时,我在 pyspark 中看到以下内容:
>>> input = sc.parallelize([("John", "a", "5"),("John", "b", "3"),("John", "c", "2"),("Mark", "a", "3"),("Mark", "b", "4"),("Lucy", "b", "2"),("Lucy", "c", "5")])
...
>>> productScoreCombinations.take(6)
...
[(('b', 'c'), ('2', '5')), (('a', 'b'), ('5', '3')), (('a', 'c'), ('5', '2')), (('b', 'c'), ('3', '2')), (('a', 'b'), ('3', '4'))]
我正在研究 Spark 上的基本协同过滤算法,但我遇到了 RDD 转换问题。我的输入 RDD 是这样的:
["John", "a", "5"], ["John", "b", "3"],["John", "c", "2"],["Mark", "a", "3"] ["Mark", "b", "4"] ["Lucy", "b", "2"] ["Lucy", "c", "5"]
在每个RDD元素中,第一个值是用户,第二个值是产品名称("a"、"b"或"c"),第三个值是它的评级.
我想通过按名称分组,然后按产品组合来转换输入 RDD,所以我的最终结果 RDD 将是
[("a","b"),("5","2")] [("a","b"),("3","4")] [("a","c"),("5","2")]
在上面的结果中,因为 John 和 Mark 在 a 和 b 上都有 "rating",所以我有两个 RDD 元素,以 (a,b) 为键,他们的评分为值。只有 John 对 a 和 c 都有评级,因此我只有一个以 (a,c) 为键的 RDD 元素。
您可以执行以下操作:
val keyedElems = rdd1.map { case (a, b, c) => (a, (b, c)) }
val groupedCombinations = keyedElems.groupByKey().flatMapValues(_.toList.combinations(2))
val productScoreCombinations = groupedCombinations.mapValues { case (elems: List[(String, String)]) => ((elems(0)._1, elems(1)._1), (elems(0)._2, elems(1)._2)) }.values
我们在这里做的是按用户键入您的输入数据集,通过按键分组生成(产品,评级)的可迭代列表,生成每个列表的 2 个组合,展平该列表以将每个组合放入它自己的记录,最后重新排序元素以在它们自己的元组中包含产品和评级。
当 运行 在 Spark 本地时,我看到以下内容:
scala> val rdd1 = sc.parallelize(Array(("John", "a", "5"),("John", "b", "3"),("John", "c", "2"),("Mark", "a", "3"),("Mark", "b", "4"),("Lucy", "b", "2"),("Lucy", "c", "5")))
rdd1: org.apache.spark.rdd.RDD[(String, String, String)] = ParallelCollectionRDD[0] at parallelize at <console>:21
scala> val rdd2 = rdd1.map { case (a, b, c) => (a, (b, c)) }
rdd2: org.apache.spark.rdd.RDD[(String, (String, String))] = MapPartitionsRDD[1] at map at <console>:23
scala> val rdd3 = rdd2.groupByKey().flatMapValues(_.toList.combinations(2))
rdd3: org.apache.spark.rdd.RDD[(String, List[(String, String)])] = MapPartitionsRDD[3] at flatMapValues at <console>:25
scala> val rdd4 = rdd3.mapValues { case (elems: List[(String, String)]) => ((elems(0)._1, elems(1)._1), (elems(0)._2, elems(1)._2)) }.values
rdd4: org.apache.spark.rdd.RDD[((String, String), (String, String))] = MapPartitionsRDD[7] at values at <console>:27
scala> rdd4.foreach(println)
...
((a,b),(3,4))
((b,c),(2,5))
((a,b),(5,3))
((a,c),(5,2))
((b,c),(3,2))
您可以 运行 对此进行简单筛选,以查找包含产品 "a" 的所有行。
(编辑:)
我没有注意到您已将其标记为 pyspark,因此我更新了下面的 python 解决方案(基本上从上面的 scala 映射而来):
import itertools
keyedElems = input.map(lambda x: (x[0], (x[1], x[2])))
groupedCombinations = keyedElems.groupByKey().flatMapValues(lambda arr: itertools.combinations(arr, 2))
productScoreCombinations = groupedCombinations.mapValues(lambda elems: ((elems[0][0], elems[1][0]), (elems[0][1], elems[1][1]))).map(lambda x: x[1])
当我 运行 上述代码时,我在 pyspark 中看到以下内容:
>>> input = sc.parallelize([("John", "a", "5"),("John", "b", "3"),("John", "c", "2"),("Mark", "a", "3"),("Mark", "b", "4"),("Lucy", "b", "2"),("Lucy", "c", "5")])
...
>>> productScoreCombinations.take(6)
...
[(('b', 'c'), ('2', '5')), (('a', 'b'), ('5', '3')), (('a', 'c'), ('5', '2')), (('b', 'c'), ('3', '2')), (('a', 'b'), ('3', '4'))]