如何减少ByKey?

How to reduceByKey?

我在 Cloudera QuickStart VM 中使用 Spark 控制台。

下面提供了一个输出文件。它显示前 20 条记录。每条记录都是电视频道名称及其相应观众人数的组合。有几百条记录。

目标是按电视频道名称对这个 RDD (channel_views) 进行分组,以便每条记录都是电视频道名称及其相应观众人数总和的唯一显示。

channel_views = joined_dataset.map(extract_channel_views)

下面是我正在努力生成上述所需 output/goal 的一组代码

def some_function(a,b):
  some_result = a + b
  return some_result

channel_views.reduceByKey(some_function).collect()

以下代码的输出:

channel_views.take(20)

[(1038, u'DEF'),  
 (1038, u'CNO'),  
 (1038, u'CNO'),  
 (1038, u'NOX'),  
 (1038, u'MAN'),  
 (1038, u'MAN'),  
 (1038, u'XYZ'),  
 (1038, u'BAT'),  
 (1038, u'CAB'),  
 (1038, u'DEF'),  
 (415, u'DEF'),  
 (415, u'CNO'),  
 (415, u'CNO'),  
 (415, u'NOX'),  
 (415, u'MAN'),  
 (415, u'MAN'),  
 (415, u'XYZ'),  
 (415, u'BAT'),  
 (415, u'CAB'),  
 (415, u'DEF')]

您正在处理一个倒退的数据集。使用 map(或更改您的摘录)将元组从 (count,name) 交换为 (name, count)

byKey 方法使用元组中的第一项作为键,因此您的代码将连接字符串,按原样键入计数。

我不知道 python 所以我是用 Scala 做的。您可以转换为 python。那么,给你

scala> val input = sc.parallelize(Seq((1038, "DEF"),
     | (1038, "CNO"),
     | (1038, "CNO"),
     | (1038, "NOX"),
     | (1038, "MAN"),
     | (1038, "MAN"),
     | (1038, "XYZ"),
     | (1038, "BAT"),
     | (1038, "CAB"),
     | (1038, "DEF"),
     | (415, "DEF"),
     | (415, "CNO"),
     | (415, "CNO"),
     | (415, "NOX"),
     | (415, "MAN"),
     | (415, "MAN"),
     | (415, "XYZ"),
     | (415, "BAT"),
     | (415, "CAB"),
     | (415, "DEF"))
     | )
input: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[12] at parallelize at <console>:22

scala> val data = input.map( v => (v._2,v._1) )
data: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[13] at map at <console>:24

scala> data.foreach(println)
(BAT,1038)
(DEF,415)
(CNO,415)
(BAT,415)
(CAB,415)
(DEF,415)
(MAN,1038)
(XYZ,1038)
(CNO,1038)
(NOX,1038)
(DEF,1038)
(MAN,1038)
(CNO,415)
(MAN,415)
(CAB,1038)
(XYZ,415)
(NOX,415)
(CNO,1038)
(MAN,415)
(DEF,1038)

scala> val result = data.reduceByKey( (x,y) => x+y)
result: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[14] at reduceByKey at <console>:26

scala> result.foreach(println)
(NOX,1453)
(MAN,2906)
(CNO,2906)
(CAB,1453)
(DEF,2906)
(BAT,1453)
(XYZ,1453)

scala>

这是 pyspark 代码:

for i in channel_views.map(lambda rec: (rec[0], rec[1])).reduceByKey(lambda acc, value: acc+value): print(i)