加入两个rdds来制作一个邻接表
join two rdds to make an adjacency list
我是 spark 的新手,我有两个 rdds,我想加入它们来制作邻接表
RDD1 (nodes): (a, b, c, d, e, f, g)
RDD2 (Edges): ((a,b), (a,e), (f, a), (k,l) ...)
现在我想加入这两个 rdds 来创建一个这样的邻接表
( (a,(b,e,..)), (b,(f,....), (g()) ,...)
#assuming that g is not connected to any node , also filter (k,l) because k and l are not in the nodes rdd
稍后我还需要找到节点和边的总数。
因此,如果我理解正确的话,您希望有一个邻接列表,其中最终的 RDD 由键值对组成,键是节点,值是它的边列表。也许如下所示的内容是您的想法?尽管我相信如果您希望 'g' 显示在您的最终 RDD 中,那么将它作为 ('g', '') 放在您的边缘列表中是有意义的,因为您希望传达它没有边缘.
要加入,我们需要将节点列表转换为一对 RDD,因此首先我们并行化以创建 RDD,然后映射一个虚拟值,以便我们拥有键值对。
现在我们可以将两个 RDD 连接起来,在本例中,结果将只是存在于两个 RDD 中的键 'a' 和 'f'。最后,我们剥离我们添加到节点 RDD 和 groupByKey 的虚拟值,以将我们的值组合在一起。
nodes = ['a', 'b', 'c', 'd', 'e', 'f', 'g']
edges = [('a','b'), ('a','e'), ('f', 'a'), ('k','l')]
nodesRDD = sc.parallelize(nodes).map(lambda n: (n, ''))
edgesRDD = sc.parallelize(edges)
joinedRDD = nodesRDD.join(edgesRDD).map(lambda tup: (tup[0], tup[1][1]))
groupedRDD = joinedRDD.groupByKey()
groupedRDD.map(lambda x : (x[0], list(x[1]))).collect()
Out[146]: [('f', ['a']), ('a', ['b', 'e'])]
计数类似,但现在我们不关心实际节点值,只关心它们的计数:
nodes = ['a', 'b', 'c', 'd', 'e', 'f', 'g']
edges = [('a','b'), ('a','e'), ('f', 'a'), ('k','l')]
nodesRDD = sc.parallelize(nodes).map(lambda n: (n, 0))
edgesRDD = sc.parallelize(edges).map(lambda tup: (tup[0], 1))
joinedRDD = nodesRDD.join(edgesRDD).map(lambda tup: (tup[0], tup[1][1]))
reducedRDD = joinedRDD.reduceByKey(lambda a, b: a + b)
reducedRDD.collect()
输出[159]: [('f', 1), ('a', 2)]
我是 spark 的新手,我有两个 rdds,我想加入它们来制作邻接表
RDD1 (nodes): (a, b, c, d, e, f, g)
RDD2 (Edges): ((a,b), (a,e), (f, a), (k,l) ...)
现在我想加入这两个 rdds 来创建一个这样的邻接表
( (a,(b,e,..)), (b,(f,....), (g()) ,...)
#assuming that g is not connected to any node , also filter (k,l) because k and l are not in the nodes rdd
稍后我还需要找到节点和边的总数。
因此,如果我理解正确的话,您希望有一个邻接列表,其中最终的 RDD 由键值对组成,键是节点,值是它的边列表。也许如下所示的内容是您的想法?尽管我相信如果您希望 'g' 显示在您的最终 RDD 中,那么将它作为 ('g', '') 放在您的边缘列表中是有意义的,因为您希望传达它没有边缘.
要加入,我们需要将节点列表转换为一对 RDD,因此首先我们并行化以创建 RDD,然后映射一个虚拟值,以便我们拥有键值对。
现在我们可以将两个 RDD 连接起来,在本例中,结果将只是存在于两个 RDD 中的键 'a' 和 'f'。最后,我们剥离我们添加到节点 RDD 和 groupByKey 的虚拟值,以将我们的值组合在一起。
nodes = ['a', 'b', 'c', 'd', 'e', 'f', 'g']
edges = [('a','b'), ('a','e'), ('f', 'a'), ('k','l')]
nodesRDD = sc.parallelize(nodes).map(lambda n: (n, ''))
edgesRDD = sc.parallelize(edges)
joinedRDD = nodesRDD.join(edgesRDD).map(lambda tup: (tup[0], tup[1][1]))
groupedRDD = joinedRDD.groupByKey()
groupedRDD.map(lambda x : (x[0], list(x[1]))).collect()
Out[146]: [('f', ['a']), ('a', ['b', 'e'])]
计数类似,但现在我们不关心实际节点值,只关心它们的计数:
nodes = ['a', 'b', 'c', 'd', 'e', 'f', 'g']
edges = [('a','b'), ('a','e'), ('f', 'a'), ('k','l')]
nodesRDD = sc.parallelize(nodes).map(lambda n: (n, 0))
edgesRDD = sc.parallelize(edges).map(lambda tup: (tup[0], 1))
joinedRDD = nodesRDD.join(edgesRDD).map(lambda tup: (tup[0], tup[1][1]))
reducedRDD = joinedRDD.reduceByKey(lambda a, b: a + b)
reducedRDD.collect()
输出[159]: [('f', 1), ('a', 2)]