PySpark - 使用共享相同值的两个键创建对 RDD
PySpark - create pair RDD with two keys that share the same value
我有一个键值对 RDD,其中键是一个演员,值是这个演员参演的一部电影,格式为:
["actor 1", "movie 1"]
["actor 1", "movie 2"]
["actor 1", "movie 3"]
...
["actor n", "movie 2"]
我想将其映射到另一个键值对 RDD,其中每对由参与一部普通电影的两个演员组成。
在上面的例子中,这意味着新的 RDD 将包含 ["actor 1", "actor n"]
对,因为它们都参与了 "movie 2"
.
一个简单的交换和连接就可以解决问题。首先让我们创建一些虚拟数据和一个小的辅助函数:
actor_movie = sc.parallelize([
("actor 1", "movie 1"),
("actor 1", "movie 3"),
("actor 1", "movie 3"),
("actor n", "movie 2")
])
swap = lambda x: (x[1], x[0])
接下来你交换顺序:
movie_actor = (actor_movie.map(swap)
.partitionBy(actor_movie.getNumPartitions())
.cache())
并加入:
(movie_actor
.join(movie_actor) # Join by movie
.values() # Extract values (actors)
.filter(lambda x: x[0] != x[1]))
虽然不是您所要求的,但我认为已经足够好了:
import itertools as iter
movies = sc.parallelize([("P", "SW4"), ("P", "SW5"), ("P", "SW6"),
("A", "SW4"), ("A", "SW5"),
("B", "SW5"), ("B", "SW6"),
("W", "SW4"),
("X", "SW1"), ("X", "SW7"), ("X", "SW2"), ("X", "SW3"),
("Y", "SW1"), ("Y", "SW7"), ("Y", "SW2"), ("Y", "SW3")])
swap_tuple = lambda (k, v): (v, k)
movies = movies.groupByKey().mapValues(list)
all_pairs = movies.flatMap(lambda (movie, actors): map(lambda actors:(movie, actors), iter.combinations(actors, 2)))
print all_pairs.collect()
"""
>> [('SW1', ('X', 'Y')),
('SW3', ('X', 'Y')),
('SW5', ('P', 'A')),
('SW5', ('P', 'B')),
('SW5', ('A', 'B')),
('SW7', ('X', 'Y')),
('SW2', ('X', 'Y')),
('SW4', ('P', 'A')),
('SW4', ('P', 'W')),
('SW4', ('A', 'W')),
('SW6', ('P', 'B'))]
"""
Here 是 运行 使用 .ipynb
我有一个键值对 RDD,其中键是一个演员,值是这个演员参演的一部电影,格式为:
["actor 1", "movie 1"]
["actor 1", "movie 2"]
["actor 1", "movie 3"]
...
["actor n", "movie 2"]
我想将其映射到另一个键值对 RDD,其中每对由参与一部普通电影的两个演员组成。
在上面的例子中,这意味着新的 RDD 将包含 ["actor 1", "actor n"]
对,因为它们都参与了 "movie 2"
.
一个简单的交换和连接就可以解决问题。首先让我们创建一些虚拟数据和一个小的辅助函数:
actor_movie = sc.parallelize([
("actor 1", "movie 1"),
("actor 1", "movie 3"),
("actor 1", "movie 3"),
("actor n", "movie 2")
])
swap = lambda x: (x[1], x[0])
接下来你交换顺序:
movie_actor = (actor_movie.map(swap)
.partitionBy(actor_movie.getNumPartitions())
.cache())
并加入:
(movie_actor
.join(movie_actor) # Join by movie
.values() # Extract values (actors)
.filter(lambda x: x[0] != x[1]))
虽然不是您所要求的,但我认为已经足够好了:
import itertools as iter
movies = sc.parallelize([("P", "SW4"), ("P", "SW5"), ("P", "SW6"),
("A", "SW4"), ("A", "SW5"),
("B", "SW5"), ("B", "SW6"),
("W", "SW4"),
("X", "SW1"), ("X", "SW7"), ("X", "SW2"), ("X", "SW3"),
("Y", "SW1"), ("Y", "SW7"), ("Y", "SW2"), ("Y", "SW3")])
swap_tuple = lambda (k, v): (v, k)
movies = movies.groupByKey().mapValues(list)
all_pairs = movies.flatMap(lambda (movie, actors): map(lambda actors:(movie, actors), iter.combinations(actors, 2)))
print all_pairs.collect()
"""
>> [('SW1', ('X', 'Y')),
('SW3', ('X', 'Y')),
('SW5', ('P', 'A')),
('SW5', ('P', 'B')),
('SW5', ('A', 'B')),
('SW7', ('X', 'Y')),
('SW2', ('X', 'Y')),
('SW4', ('P', 'A')),
('SW4', ('P', 'W')),
('SW4', ('A', 'W')),
('SW6', ('P', 'B'))]
"""
Here 是 运行 使用 .ipynb