无法理解 aggregateByKey 和 combineByKey 的工作原理
Can't understand the working of aggregateByKey and combineByKey
我是 Apache Spark 的初学者。目前我正在尝试使用 Python 学习各种聚合。
为了给我所面临的问题提供一些背景信息,我发现很难理解 aggregateByKey 函数的工作原理以通过 "status" 计算订单数。
我正在关注 ITVersity 的 YouTube 播放列表,下面是我正在使用的代码和一些示例输出。
ordersRDD = sc.textFile("/user/cloudera/sqoop_import/orders")
for i in ordersRDD.take(10): print(i)
输出:
1,2013-07-25 00:00:00.0,11599,关闭
2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT
3,2013-07-25 00:00:00.0,12111,完成
4,2013-07-25 00:00:00.0,8827,关闭
5,2013-07-25 00:00:00.0,11318,完成
6,2013-07-25 00:00:00.0,7130,完成
7,2013-07-25 00:00:00.0,4530,完成
8,2013-07-25 00:00:00.0,2911,处理中
9,2013-07-25 00:00:00.0,5657,PENDING_PAYMENT
10,2013-07-25 00:00:00.0,5648,PENDING_PAYMENT
ordersMap = ordersRDD.map(lambda x: (x.split(",")[3], x))
输出:
(u'关闭', u'1,2013-07-25 00:00:00.0,11599,关闭')
(u'PENDING_PAYMENT', u'2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT')
(u'完成', u'3,2013-07-25 00:00:00.0,12111,完成')
(u'关闭', u'4,2013-07-25 00:00:00.0,8827,关闭')
(u'完成', u'5,2013-07-25 00:00:00.0,11318,完成')
(u'完成', u'6,2013-07-25 00:00:00.0,7130,完成')
(u'完成', u'7,2013-07-25 00:00:00.0,4530,完成')
(u'处理', u'8,2013-07-25 00:00:00.0,2911,处理')
(u'PENDING_PAYMENT', u'9,2013-07-25 00:00:00.0,5657,PENDING_PAYMENT')
(u'PENDING_PAYMENT', u'10,2013-07-25 00:00:00.0,5648,PENDING_PAYMENT')
ordersByStatus = ordersMap.aggregateByKey(0, lambda acc, val: acc + 1, lambda acc,val: acc + val)
for i in ordersByStatus.take(10): print(i)
最终输出:
(u'SUSPECTED_FRAUD', 1558)
(u'取消', 1428)
(u'完成', 22899)
(u'PENDING_PAYMENT', 15030)
(u'PENDING', 7610)
(u'关闭', 7556)
(u'ON_HOLD', 3798)
(u'处理中', 8275)
(u'PAYMENT_REVIEW', 729)
我理解困难的问题是:
1、为什么2个lambda函数中取的aggregateByKey函数作为参数?
2. 可视化第一个 lambda 函数的作用?
3. 形象化第二个 lambda 函数的作用?
如果您能用一些简单的框图向我解释上述问题以及 aggregateByKey 的工作原理,那将非常有帮助?或许中间算了几个?
感谢您的帮助!
谢谢,
希夫
Spark RDDs被划分为分区,所以当你对所有数据进行聚合函数时,你首先会聚合每个分区内的数据(分区只是数据的细分)。然后,您需要告诉 Spark 如何聚合分区。
第一个 lambda 函数告诉 Spark 在遇到新值时如何更改 运行 计数(累加器)。因为你在计数,所以你只需将 1 添加到累加器。在一个切片中,如果 运行 计数当前为 4,并且添加了另一个值,则 运行 计数应为 4 + 1 = 5
。因此,您的第一个 lambda 函数是:
lambda acc, val: acc + 1
第二个 lambda 函数告诉 Spark 如何将一个数据切片的 运行 计数与另一数据切片的 运行 计数结合起来。如果一个切片的计数为 5,第二个切片的计数为 7,则合并后的计数为 5 + 7 = 12
。所以你的第二个函数最好写成:
lambda acc1, acc2: acc1 + acc2
剩下的唯一微妙之处是一切都是在 "by key" 的基础上完成的。累加器(计数)因密钥而异。
我是 Apache Spark 的初学者。目前我正在尝试使用 Python 学习各种聚合。
为了给我所面临的问题提供一些背景信息,我发现很难理解 aggregateByKey 函数的工作原理以通过 "status" 计算订单数。
我正在关注 ITVersity 的 YouTube 播放列表,下面是我正在使用的代码和一些示例输出。
ordersRDD = sc.textFile("/user/cloudera/sqoop_import/orders")
for i in ordersRDD.take(10): print(i)
输出:
1,2013-07-25 00:00:00.0,11599,关闭
2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT
3,2013-07-25 00:00:00.0,12111,完成
4,2013-07-25 00:00:00.0,8827,关闭
5,2013-07-25 00:00:00.0,11318,完成
6,2013-07-25 00:00:00.0,7130,完成
7,2013-07-25 00:00:00.0,4530,完成
8,2013-07-25 00:00:00.0,2911,处理中
9,2013-07-25 00:00:00.0,5657,PENDING_PAYMENT
10,2013-07-25 00:00:00.0,5648,PENDING_PAYMENT
ordersMap = ordersRDD.map(lambda x: (x.split(",")[3], x))
输出:
(u'关闭', u'1,2013-07-25 00:00:00.0,11599,关闭')
(u'PENDING_PAYMENT', u'2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT')
(u'完成', u'3,2013-07-25 00:00:00.0,12111,完成')
(u'关闭', u'4,2013-07-25 00:00:00.0,8827,关闭')
(u'完成', u'5,2013-07-25 00:00:00.0,11318,完成')
(u'完成', u'6,2013-07-25 00:00:00.0,7130,完成')
(u'完成', u'7,2013-07-25 00:00:00.0,4530,完成')
(u'处理', u'8,2013-07-25 00:00:00.0,2911,处理')
(u'PENDING_PAYMENT', u'9,2013-07-25 00:00:00.0,5657,PENDING_PAYMENT')
(u'PENDING_PAYMENT', u'10,2013-07-25 00:00:00.0,5648,PENDING_PAYMENT')
ordersByStatus = ordersMap.aggregateByKey(0, lambda acc, val: acc + 1, lambda acc,val: acc + val)
for i in ordersByStatus.take(10): print(i)
最终输出:
(u'SUSPECTED_FRAUD', 1558)
(u'取消', 1428)
(u'完成', 22899)
(u'PENDING_PAYMENT', 15030)
(u'PENDING', 7610)
(u'关闭', 7556)
(u'ON_HOLD', 3798)
(u'处理中', 8275)
(u'PAYMENT_REVIEW', 729)
我理解困难的问题是:
1、为什么2个lambda函数中取的aggregateByKey函数作为参数?
2. 可视化第一个 lambda 函数的作用?
3. 形象化第二个 lambda 函数的作用?
如果您能用一些简单的框图向我解释上述问题以及 aggregateByKey 的工作原理,那将非常有帮助?或许中间算了几个?
感谢您的帮助!
谢谢,
希夫
Spark RDDs被划分为分区,所以当你对所有数据进行聚合函数时,你首先会聚合每个分区内的数据(分区只是数据的细分)。然后,您需要告诉 Spark 如何聚合分区。
第一个 lambda 函数告诉 Spark 在遇到新值时如何更改 运行 计数(累加器)。因为你在计数,所以你只需将 1 添加到累加器。在一个切片中,如果 运行 计数当前为 4,并且添加了另一个值,则 运行 计数应为 4 + 1 = 5
。因此,您的第一个 lambda 函数是:
lambda acc, val: acc + 1
第二个 lambda 函数告诉 Spark 如何将一个数据切片的 运行 计数与另一数据切片的 运行 计数结合起来。如果一个切片的计数为 5,第二个切片的计数为 7,则合并后的计数为 5 + 7 = 12
。所以你的第二个函数最好写成:
lambda acc1, acc2: acc1 + acc2
剩下的唯一微妙之处是一切都是在 "by key" 的基础上完成的。累加器(计数)因密钥而异。