如何减少ByKey?
How to reduceByKey?
我在 Cloudera QuickStart VM 中使用 Spark 控制台。
下面提供了一个输出文件。它显示前 20 条记录。每条记录都是电视频道名称及其相应观众人数的组合。有几百条记录。
目标是按电视频道名称对这个 RDD (channel_views) 进行分组,以便每条记录都是电视频道名称及其相应观众人数总和的唯一显示。
channel_views = joined_dataset.map(extract_channel_views)
下面是我正在努力生成上述所需 output/goal 的一组代码
def some_function(a,b):
some_result = a + b
return some_result
channel_views.reduceByKey(some_function).collect()
以下代码的输出:
channel_views.take(20)
[(1038, u'DEF'),
(1038, u'CNO'),
(1038, u'CNO'),
(1038, u'NOX'),
(1038, u'MAN'),
(1038, u'MAN'),
(1038, u'XYZ'),
(1038, u'BAT'),
(1038, u'CAB'),
(1038, u'DEF'),
(415, u'DEF'),
(415, u'CNO'),
(415, u'CNO'),
(415, u'NOX'),
(415, u'MAN'),
(415, u'MAN'),
(415, u'XYZ'),
(415, u'BAT'),
(415, u'CAB'),
(415, u'DEF')]
您正在处理一个倒退的数据集。使用 map
(或更改您的摘录)将元组从 (count,name)
交换为 (name, count)
byKey
方法使用元组中的第一项作为键,因此您的代码将连接字符串,按原样键入计数。
我不知道 python 所以我是用 Scala 做的。您可以转换为 python。那么,给你
scala> val input = sc.parallelize(Seq((1038, "DEF"),
| (1038, "CNO"),
| (1038, "CNO"),
| (1038, "NOX"),
| (1038, "MAN"),
| (1038, "MAN"),
| (1038, "XYZ"),
| (1038, "BAT"),
| (1038, "CAB"),
| (1038, "DEF"),
| (415, "DEF"),
| (415, "CNO"),
| (415, "CNO"),
| (415, "NOX"),
| (415, "MAN"),
| (415, "MAN"),
| (415, "XYZ"),
| (415, "BAT"),
| (415, "CAB"),
| (415, "DEF"))
| )
input: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[12] at parallelize at <console>:22
scala> val data = input.map( v => (v._2,v._1) )
data: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[13] at map at <console>:24
scala> data.foreach(println)
(BAT,1038)
(DEF,415)
(CNO,415)
(BAT,415)
(CAB,415)
(DEF,415)
(MAN,1038)
(XYZ,1038)
(CNO,1038)
(NOX,1038)
(DEF,1038)
(MAN,1038)
(CNO,415)
(MAN,415)
(CAB,1038)
(XYZ,415)
(NOX,415)
(CNO,1038)
(MAN,415)
(DEF,1038)
scala> val result = data.reduceByKey( (x,y) => x+y)
result: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[14] at reduceByKey at <console>:26
scala> result.foreach(println)
(NOX,1453)
(MAN,2906)
(CNO,2906)
(CAB,1453)
(DEF,2906)
(BAT,1453)
(XYZ,1453)
scala>
这是 pyspark 代码:
for i in channel_views.map(lambda rec: (rec[0], rec[1])).reduceByKey(lambda acc, value: acc+value): print(i)
我在 Cloudera QuickStart VM 中使用 Spark 控制台。
下面提供了一个输出文件。它显示前 20 条记录。每条记录都是电视频道名称及其相应观众人数的组合。有几百条记录。
目标是按电视频道名称对这个 RDD (channel_views) 进行分组,以便每条记录都是电视频道名称及其相应观众人数总和的唯一显示。
channel_views = joined_dataset.map(extract_channel_views)
下面是我正在努力生成上述所需 output/goal 的一组代码
def some_function(a,b):
some_result = a + b
return some_result
channel_views.reduceByKey(some_function).collect()
以下代码的输出:
channel_views.take(20)
[(1038, u'DEF'),
(1038, u'CNO'),
(1038, u'CNO'),
(1038, u'NOX'),
(1038, u'MAN'),
(1038, u'MAN'),
(1038, u'XYZ'),
(1038, u'BAT'),
(1038, u'CAB'),
(1038, u'DEF'),
(415, u'DEF'),
(415, u'CNO'),
(415, u'CNO'),
(415, u'NOX'),
(415, u'MAN'),
(415, u'MAN'),
(415, u'XYZ'),
(415, u'BAT'),
(415, u'CAB'),
(415, u'DEF')]
您正在处理一个倒退的数据集。使用 map
(或更改您的摘录)将元组从 (count,name)
交换为 (name, count)
byKey
方法使用元组中的第一项作为键,因此您的代码将连接字符串,按原样键入计数。
我不知道 python 所以我是用 Scala 做的。您可以转换为 python。那么,给你
scala> val input = sc.parallelize(Seq((1038, "DEF"),
| (1038, "CNO"),
| (1038, "CNO"),
| (1038, "NOX"),
| (1038, "MAN"),
| (1038, "MAN"),
| (1038, "XYZ"),
| (1038, "BAT"),
| (1038, "CAB"),
| (1038, "DEF"),
| (415, "DEF"),
| (415, "CNO"),
| (415, "CNO"),
| (415, "NOX"),
| (415, "MAN"),
| (415, "MAN"),
| (415, "XYZ"),
| (415, "BAT"),
| (415, "CAB"),
| (415, "DEF"))
| )
input: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[12] at parallelize at <console>:22
scala> val data = input.map( v => (v._2,v._1) )
data: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[13] at map at <console>:24
scala> data.foreach(println)
(BAT,1038)
(DEF,415)
(CNO,415)
(BAT,415)
(CAB,415)
(DEF,415)
(MAN,1038)
(XYZ,1038)
(CNO,1038)
(NOX,1038)
(DEF,1038)
(MAN,1038)
(CNO,415)
(MAN,415)
(CAB,1038)
(XYZ,415)
(NOX,415)
(CNO,1038)
(MAN,415)
(DEF,1038)
scala> val result = data.reduceByKey( (x,y) => x+y)
result: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[14] at reduceByKey at <console>:26
scala> result.foreach(println)
(NOX,1453)
(MAN,2906)
(CNO,2906)
(CAB,1453)
(DEF,2906)
(BAT,1453)
(XYZ,1453)
scala>
这是 pyspark 代码:
for i in channel_views.map(lambda rec: (rec[0], rec[1])).reduceByKey(lambda acc, value: acc+value): print(i)