加入两个 RDD,然后按另一列分组
Join two RDDs then group by another column
我有 2 个 RDD,第一个的格式为 Code: string, Name: string
,rdd2 的格式为 Code: string, Year: string, Delay: float
rdd1 = [('a', 'name1'), ('b', 'name2')]
rdd2 = [('a', '2000', 1.25), ('a', '2000', 2.0), ('b', '2010', -1.0)]
我想执行连接(在 code
上),以便我可以按 name
对数据进行分组,以便在 [=16= 上进行计数、平均值、最小值和最大值等聚合].
我试图在执行连接后展平值,如下所示:
joined = rdd1.join(rdd2).map(lambda (keys, values): (keys,) + values)
但出现错误:缺少 1 个必需的位置参数。
我的加入结果也只显示 [('code', ('name', 'year'))]
,不包括延迟值。我该如何解决?
这在 Python 3.x 中不起作用,因为对元组参数解包 (PEP-3113) 的支持已被删除。因此类型错误。
对 RDD 连接作为键值工作,其中
(a,b) joined (a, c) will give you (a, (b,c))
因此,使其工作的一种方法是:
joined = rdd1.join(rdd2.map(lambda x: (x[0],x[1:])))
joined.map(lambda x: (x[0],)+ (x[1][0],) + x[1][1]).collect()
# Output
# [('b', 'name2', '2010', -1.0),
# ('a', 'name1', '2000', 1.25),
# ('a', 'name1', '2000', 2.0)]
加入前需要确认rdd2
是(key, value)
的形式。否则第二个之后的元素将被丢弃。
rdd3 = rdd1.join(rdd2.map(lambda x: (x[0], (x[1], x[2]))))
rdd3.collect()
# [('b', ('name2', ('2010', -1.0))), ('a', ('name1', ('2000', 1.25))), ('a', ('name1', ('2000', 2.0)))]
如果要去掉嵌套结构,可以再添加一个mapValues
:
rdd3 = rdd1.join(rdd2.map(lambda x: (x[0], (x[1], x[2])))).mapValues(lambda x: (x[0], x[1][0], x[1][1]))
rdd3.collect()
# [('b', ('name2', '2010', -1.0)), ('a', ('name1', '2000', 1.25)), ('a', ('name1', '2000', 2.0))]
我有 2 个 RDD,第一个的格式为 Code: string, Name: string
,rdd2 的格式为 Code: string, Year: string, Delay: float
rdd1 = [('a', 'name1'), ('b', 'name2')]
rdd2 = [('a', '2000', 1.25), ('a', '2000', 2.0), ('b', '2010', -1.0)]
我想执行连接(在 code
上),以便我可以按 name
对数据进行分组,以便在 [=16= 上进行计数、平均值、最小值和最大值等聚合].
我试图在执行连接后展平值,如下所示:
joined = rdd1.join(rdd2).map(lambda (keys, values): (keys,) + values)
但出现错误:缺少 1 个必需的位置参数。
我的加入结果也只显示 [('code', ('name', 'year'))]
,不包括延迟值。我该如何解决?
这在 Python 3.x 中不起作用,因为对元组参数解包 (PEP-3113) 的支持已被删除。因此类型错误。
对 RDD 连接作为键值工作,其中
(a,b) joined (a, c) will give you (a, (b,c))
因此,使其工作的一种方法是:
joined = rdd1.join(rdd2.map(lambda x: (x[0],x[1:])))
joined.map(lambda x: (x[0],)+ (x[1][0],) + x[1][1]).collect()
# Output
# [('b', 'name2', '2010', -1.0),
# ('a', 'name1', '2000', 1.25),
# ('a', 'name1', '2000', 2.0)]
加入前需要确认rdd2
是(key, value)
的形式。否则第二个之后的元素将被丢弃。
rdd3 = rdd1.join(rdd2.map(lambda x: (x[0], (x[1], x[2]))))
rdd3.collect()
# [('b', ('name2', ('2010', -1.0))), ('a', ('name1', ('2000', 1.25))), ('a', ('name1', ('2000', 2.0)))]
如果要去掉嵌套结构,可以再添加一个mapValues
:
rdd3 = rdd1.join(rdd2.map(lambda x: (x[0], (x[1], x[2])))).mapValues(lambda x: (x[0], x[1][0], x[1][1]))
rdd3.collect()
# [('b', ('name2', '2010', -1.0)), ('a', ('name1', '2000', 1.25)), ('a', ('name1', '2000', 2.0))]