是否将同一个 RDD 上的多个 reduceByKey 编译成单个扫描?
Are multiple reduceByKey on the same RDD compiled into a single scan?
假设我有一个 RDD (50M records/dayredu),我想用几种不同的方式对其进行总结。
RDD 记录是 4 元组:(keep, foo, bar, baz)
。
keep
- 布尔值
foo
、bar
、baz
- 0/1 整数
我想计算每个 foo
&c 有多少被保留和丢弃,即,我必须为 foo
执行以下操作(对于 bar
和 baz
):
rdd.filter(lambda keep, foo, bar, baz: foo == 1)
.map(lambda keep, foo, bar, baz: keep, 1)
.reduceByKey(operator.add)
这将 return(在 collect
之后)像 [(True,40000000),(False,10000000)]
.
这样的列表
问题是:是否有一种简单方法来避免扫描rdd
3次(每个foo
, bar
, baz
)?
我的意思是不是一种重写上述代码来处理所有3个字段的方法,而是告诉spark一次处理所有3个管道.
如果我没看错你的问题,你需要 RDD.aggregate.
val zeroValue = (0L, 0L, 0L, 0L, 0L, 0L) // tfoo, tbar, tbaz, ffoo, fbar, fbaz
rdd.aggregate(zeroValue)(
(prior, current) => if (current._1) {
(prior._1 + current._2, prior._2 + current._3, prior._3 + current._4,
prior._4, prior._5, prior._6)
} else {
(prior._1, prior._2, prior._3,
prior._4 + current._2, prior._5 + current._3, prior._6 + current._4)
},
(left, right) =>
(left._1 + right._1,
left._2 + right._2,
left._3 + right._3,
left._4 + right._4,
left._5 + right._5,
left._6 + right._6)
)
聚合在概念上类似于列表上的概念化简函数,但 RDD 不是列表,它们是分布式的,因此您提供两个函数参数,一个用于对每个分区进行操作,一个用于合并结果处理分区。
可以通过使用不同的线程提交作业来并行执行三个管道,但这将通过 RDD 三次,并且需要集群上最多 3 倍的资源。
可以通过重写作业一次处理所有计数来一次完成作业 - 关于 aggregate
的答案是一个选项。成对拆分数据 (keep, foo) (keep, bar), (keep, baz)
将是另一个。
不可能在不更改任何代码的情况下一次性完成工作,因为 Spark 无法知道这些工作与同一数据集相关.至多,第一个之后的后续作业的速度可以通过 caching
初始 rdd 和 rdd.cache
在 .filter().map().reduce()
步骤之前提高;这仍将通过 RDD 3 次,但如果所有数据都适合集群的内存,则第 2 次和第 3 次可能会快得多:
rdd.cache
// first reduceByKey action will trigger the cache and rdd data will be kept in memory
val foo = rdd.filter(fooFilter).map(fooMap).reduceByKey(???)
// subsequent operations will execute faster as the rdd is now available in mem
val bar = rdd.filter(barFilter).map(barMap).reduceByKey(???)
val baz = rdd.filter(bazFilter).map(bazMap).reduceByKey(???)
如果我这样做,我会创建成对的相关数据并一次性计算它们:
// We split the initial tuple into pairs keyed by the data type ("foo", "bar", "baz") and the keep information. dataPairs will contain data like: (("bar",true),1), (("foo",false),1)
val dataPairs = rdd.flatmap{case (keep, foo, bar, baz) =>
def condPair(name:String, x:Int):Option[((String,Boolean), Int)] = if (x==1) Some(((name,keep),x)) else None
Seq(condPair("foo",foo), condPair("bar",bar), condPair("baz",baz)).flatten
}
val totals = dataPairs.reduceByKey(_ + _)
这简单,只传递一次数据,但需要重写代码。我会说它在回答问题时得分 66,66%。
假设我有一个 RDD (50M records/dayredu),我想用几种不同的方式对其进行总结。
RDD 记录是 4 元组:(keep, foo, bar, baz)
。
keep
- 布尔值foo
、bar
、baz
- 0/1 整数
我想计算每个 foo
&c 有多少被保留和丢弃,即,我必须为 foo
执行以下操作(对于 bar
和 baz
):
rdd.filter(lambda keep, foo, bar, baz: foo == 1)
.map(lambda keep, foo, bar, baz: keep, 1)
.reduceByKey(operator.add)
这将 return(在 collect
之后)像 [(True,40000000),(False,10000000)]
.
问题是:是否有一种简单方法来避免扫描rdd
3次(每个foo
, bar
, baz
)?
我的意思是不是一种重写上述代码来处理所有3个字段的方法,而是告诉spark一次处理所有3个管道.
如果我没看错你的问题,你需要 RDD.aggregate.
val zeroValue = (0L, 0L, 0L, 0L, 0L, 0L) // tfoo, tbar, tbaz, ffoo, fbar, fbaz
rdd.aggregate(zeroValue)(
(prior, current) => if (current._1) {
(prior._1 + current._2, prior._2 + current._3, prior._3 + current._4,
prior._4, prior._5, prior._6)
} else {
(prior._1, prior._2, prior._3,
prior._4 + current._2, prior._5 + current._3, prior._6 + current._4)
},
(left, right) =>
(left._1 + right._1,
left._2 + right._2,
left._3 + right._3,
left._4 + right._4,
left._5 + right._5,
left._6 + right._6)
)
聚合在概念上类似于列表上的概念化简函数,但 RDD 不是列表,它们是分布式的,因此您提供两个函数参数,一个用于对每个分区进行操作,一个用于合并结果处理分区。
可以通过使用不同的线程提交作业来并行执行三个管道,但这将通过 RDD 三次,并且需要集群上最多 3 倍的资源。
可以通过重写作业一次处理所有计数来一次完成作业 - 关于 aggregate
的答案是一个选项。成对拆分数据 (keep, foo) (keep, bar), (keep, baz)
将是另一个。
不可能在不更改任何代码的情况下一次性完成工作,因为 Spark 无法知道这些工作与同一数据集相关.至多,第一个之后的后续作业的速度可以通过 caching
初始 rdd 和 rdd.cache
在 .filter().map().reduce()
步骤之前提高;这仍将通过 RDD 3 次,但如果所有数据都适合集群的内存,则第 2 次和第 3 次可能会快得多:
rdd.cache
// first reduceByKey action will trigger the cache and rdd data will be kept in memory
val foo = rdd.filter(fooFilter).map(fooMap).reduceByKey(???)
// subsequent operations will execute faster as the rdd is now available in mem
val bar = rdd.filter(barFilter).map(barMap).reduceByKey(???)
val baz = rdd.filter(bazFilter).map(bazMap).reduceByKey(???)
如果我这样做,我会创建成对的相关数据并一次性计算它们:
// We split the initial tuple into pairs keyed by the data type ("foo", "bar", "baz") and the keep information. dataPairs will contain data like: (("bar",true),1), (("foo",false),1)
val dataPairs = rdd.flatmap{case (keep, foo, bar, baz) =>
def condPair(name:String, x:Int):Option[((String,Boolean), Int)] = if (x==1) Some(((name,keep),x)) else None
Seq(condPair("foo",foo), condPair("bar",bar), condPair("baz",baz)).flatten
}
val totals = dataPairs.reduceByKey(_ + _)
这简单,只传递一次数据,但需要重写代码。我会说它在回答问题时得分 66,66%。