Pyspark:从另一个 RDD 获取一个 RDD 元素的索引
Pyspark: Get indexes of an RDD elements from another RDD
所以我有这两个 rdds,如下所示:
rdd1: [([1, 2], 0), ([2, 4], 1)]
rdd2: [([2, 4], 0), ([1, 2], 1)]
我需要的是一个列表,它指示rdd2 中rdd1 元素的索引。所以它会是这样的:
[1,0]
我知道我可以使用过滤器和 lambda 函数找到一个特定的元素索引,但找到整个事情是另一回事。我头脑中最天真的方法是使用 for 循环,我确信这不是处理此问题的最佳方法。任何帮助将不胜感激。
谢谢
首先,您必须制作密钥 hashable
,以便可以以分布式方式比较这些密钥。让我们创建一个小帮手
def tupelize_keys(kv):
k, v = kv
return tuple(k), v
rdd1H = sc.parallelize([
([1, 2], 0), ([2, 4], 1), ([9, 9], 3)
]).map(tupelize_keys)
rdd2H = sc.parallelize([
([2, 4], 0), ([1, 2], 1), ([1, 2], 3)
]).map(tupelize_keys)
剩下的就是一个简单的外连接:
rdd1H.leftOuterJoin(rdd2H).values().collect()
## [(0, 1), (0, 3), (1, 0), (3, None)]
请注意,这是一张多图,顺序不会保留。
所以我有这两个 rdds,如下所示:
rdd1: [([1, 2], 0), ([2, 4], 1)]
rdd2: [([2, 4], 0), ([1, 2], 1)]
我需要的是一个列表,它指示rdd2 中rdd1 元素的索引。所以它会是这样的:
[1,0]
我知道我可以使用过滤器和 lambda 函数找到一个特定的元素索引,但找到整个事情是另一回事。我头脑中最天真的方法是使用 for 循环,我确信这不是处理此问题的最佳方法。任何帮助将不胜感激。
谢谢
首先,您必须制作密钥 hashable
,以便可以以分布式方式比较这些密钥。让我们创建一个小帮手
def tupelize_keys(kv):
k, v = kv
return tuple(k), v
rdd1H = sc.parallelize([
([1, 2], 0), ([2, 4], 1), ([9, 9], 3)
]).map(tupelize_keys)
rdd2H = sc.parallelize([
([2, 4], 0), ([1, 2], 1), ([1, 2], 3)
]).map(tupelize_keys)
剩下的就是一个简单的外连接:
rdd1H.leftOuterJoin(rdd2H).values().collect()
## [(0, 1), (0, 3), (1, 0), (3, None)]
请注意,这是一张多图,顺序不会保留。