Spark - 嵌套RDD操作
Spark - Nested RDD Operation
我有两个RDD说
rdd1 =
id | created | destroyed | price
1 | 1 | 2 | 10
2 | 1 | 5 | 11
3 | 2 | 3 | 11
4 | 3 | 4 | 12
5 | 3 | 5 | 11
rdd2 =
[1,2,3,4,5] # lets call these value as timestamps (ts)
rdd2 基本上是使用 range(intial_value, end_value, interval) 生成的。这里的参数可以变化。大小可以与 rdd1 相同或不同。这个想法是根据 rdd2 的值使用过滤条件将记录从 rdd1 提取到 rdd2(来自 rdd1 的记录可以在提取时重复,正如您在输出中看到的那样)
过滤条件rdd1.created <= ts < rdd1.destroyed)
预期输出:
ts | prices
1 | 10,11 # i.e. for ids 1,2 of rdd1
2 | 11,11 # ids 2,3
3 | 11,12,11 # ids 2,4,5
4 | 11,11 # ids 2,5
现在我想根据某些条件过滤 RDD1,使用 RDD2 的键。 (如上所述) 和 returns 连接 RDD2 的键和 RDD1
的过滤结果的结果
我也是:
rdd2.map(lambda x : somefilterfunction(x, rdd1))
def somefilterfunction(x, rdd1):
filtered_rdd1 = rdd1.filter(rdd1[1] <= x).filter(rdd1[2] > x)
prices = filtered_rdd1.map(lambda x : x[3])
res = prices.collect()
return (x, list(res))
我得到:
Exception: It appears that you are attempting to broadcast an RDD or
reference an RDD from an action or transformation. RDD transformations
and actions can only be invoked by the driver, not inside of other
transformations; for example, rdd1.map(lambda x: rdd2.values.count() *
x) is invalid because the values transformation and count action
cannot be performed inside of the rdd1.map transformation. For more
information, see SPARK-5063.
我尝试使用 groupBy ,但是因为这里 rdd1 的元素可以一次又一次地重复,而我理解的分组只会将 rdd1 的每个元素放在某个特定的插槽中一次。
现在唯一的方法是使用普通的 for 循环并进行过滤并在最后加入所有内容。
有什么建议吗?
由于您使用的是常规范围,因此根本没有理由创建第二个 RDD。您可以简单地为每条记录生成特定范围内的值:
from __future__ import division # Required only for Python 2.x
from math import ceil
from itertools import takewhile
rdd1 = sc.parallelize([
(1, 1, 2, 10),
(2, 1, 5, 11),
(3, 2, 3, 11),
(4, 3, 4, 12),
(5, 3, 5, 11),
])
def generate(start, end, step):
def _generate(id, created, destroyed, price):
# Smallest ts >= created
start_for_record = int(ceil((created - start) / step) * step + start)
rng = takewhile(
lambda x: created <= x < destroyed,
xrange(start_for_record, end, step)) # In Python 3.x use range
for i in rng:
yield i, price
return _generate
result = rdd1.flatMap(lambda x: generate(1, 6, 1)(*x)).groupByKey()
结果:
result.mapValues(list).collect()
## [(1, [10, 11]), (2, [11, 11]), (3, [11, 12, 11]), (4, [11, 11])]
我有两个RDD说
rdd1 =
id | created | destroyed | price
1 | 1 | 2 | 10
2 | 1 | 5 | 11
3 | 2 | 3 | 11
4 | 3 | 4 | 12
5 | 3 | 5 | 11
rdd2 =
[1,2,3,4,5] # lets call these value as timestamps (ts)
rdd2 基本上是使用 range(intial_value, end_value, interval) 生成的。这里的参数可以变化。大小可以与 rdd1 相同或不同。这个想法是根据 rdd2 的值使用过滤条件将记录从 rdd1 提取到 rdd2(来自 rdd1 的记录可以在提取时重复,正如您在输出中看到的那样)
过滤条件rdd1.created <= ts < rdd1.destroyed)
预期输出:
ts | prices
1 | 10,11 # i.e. for ids 1,2 of rdd1
2 | 11,11 # ids 2,3
3 | 11,12,11 # ids 2,4,5
4 | 11,11 # ids 2,5
现在我想根据某些条件过滤 RDD1,使用 RDD2 的键。 (如上所述) 和 returns 连接 RDD2 的键和 RDD1
的过滤结果的结果我也是:
rdd2.map(lambda x : somefilterfunction(x, rdd1))
def somefilterfunction(x, rdd1):
filtered_rdd1 = rdd1.filter(rdd1[1] <= x).filter(rdd1[2] > x)
prices = filtered_rdd1.map(lambda x : x[3])
res = prices.collect()
return (x, list(res))
我得到:
Exception: It appears that you are attempting to broadcast an RDD or reference an RDD from an action or transformation. RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(lambda x: rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
我尝试使用 groupBy ,但是因为这里 rdd1 的元素可以一次又一次地重复,而我理解的分组只会将 rdd1 的每个元素放在某个特定的插槽中一次。
现在唯一的方法是使用普通的 for 循环并进行过滤并在最后加入所有内容。
有什么建议吗?
由于您使用的是常规范围,因此根本没有理由创建第二个 RDD。您可以简单地为每条记录生成特定范围内的值:
from __future__ import division # Required only for Python 2.x
from math import ceil
from itertools import takewhile
rdd1 = sc.parallelize([
(1, 1, 2, 10),
(2, 1, 5, 11),
(3, 2, 3, 11),
(4, 3, 4, 12),
(5, 3, 5, 11),
])
def generate(start, end, step):
def _generate(id, created, destroyed, price):
# Smallest ts >= created
start_for_record = int(ceil((created - start) / step) * step + start)
rng = takewhile(
lambda x: created <= x < destroyed,
xrange(start_for_record, end, step)) # In Python 3.x use range
for i in rng:
yield i, price
return _generate
result = rdd1.flatMap(lambda x: generate(1, 6, 1)(*x)).groupByKey()
结果:
result.mapValues(list).collect()
## [(1, [10, 11]), (2, [11, 11]), (3, [11, 12, 11]), (4, [11, 11])]