如何将包含 (vertexId,edgeId) 的 Map 转换为 GraphX RDD

How to convert a Map containing (vertexId,edgeId) into GraphX RDDs

从文件中解析图后,我得到一个 Map,其中键表示顶点 (id),值表示边 (id)。为了创建边 (Vx->Vy) 我们需要使用值(边 id)连接 Map 条目。目标是根据此表示创建一个 GraphX 图。

这是我目前的情况:

tempLHM.foreach(x=>println(x))

(A.L0,A)
(B.L0,B)
(C.L0,C)
(D.L0,D)
(E.L0,E)
(a.L0M1,A)
(b.L0M1,B)
(c.L0M1,n4)
(a.L0M2,n4)
(b.L0M2,D)
(c.L0M2,n5)
(a.L0M3,n5)
(b.L0M3,C)
(c.L0M3,E)

有没有直接的方法将这个 hashmap 映射到顶点和边 RDD?

tempLHM 是可变的 LinkedHashMap[String,String]。在上面的hashmap中,在元素(A.L0,A)和(a.L0M1,A)中,A.L0和a.L0M1是由公共值A(边)[=14=连接的键(顶点) ]

下面是我想得出的结果

val vertex:RDD(vertexId, VertexName)  i.e ((A.L0).Long, A.L0), ((a.L0M1).Long, a.L0M1) etc

val edge:RDD((vertexId1, vertexId2), EdgeName) i.e ((A.L0).Long, (a.L0M1).Long), A)

假设您的数据具有这种结构。

val d = Map("v1" -> "e1", "v2" -> "e1", "v3" -> "e2", "v4" -> "e2")

这里有两条边 ("v1","v2") 和 ("v3","v4")

假设您有一个简单的图(不是可以有多个节点通过边连接的超图)。因此,此解决方案的假设是一条边仅连接两个节点并且边只出现一次。

import collection.mutable.{ HashMap, MultiMap, Set }
import java.security.MessageDigest
import org.apache.spark.graphx.Edge
import org.apache.spark.graphx.Graph

// a hacky way to go from string to Long since GraphX need Longs to
// represent vertex IDs. You might want to do something different 
// here to make sure that your IDs are unique.
def str2Long(s: String) = s.##.toLong

val d = Map("v1" -> "e1", "v2" -> "e1", "v3" -> "e2", "v4" -> "e2")

// We use a multi-map to create an inverse map (Edge->Set(Vertices))
val mm = new HashMap[String, Set[String]] with MultiMap[String, String]
d.foreach{ x => mm.addBinding(x._2,x._1) }

val edges = mm.map{ case(k,v) => Edge[String](str2Long(v.head),str2Long(v.last), k) }.toList
val vertices = d.keys.map(x => (str2Long(x), x)).toList

val edgeRdd = sc.parallelize(edges)
val vertexRdd = sc.parallelize(vertices)

val g = Graph(vertexRdd, edgeRdd)

如果你打印你得到的边和顶点:​​

g.vertices.foreach(println)
g.edges.foreach(println)


(3709,v3)
(3707,v1)
(3708,v2)
(3710,v4)
Edge(3709,3710,e2)
Edge(3707,3708,e1)

注意:此处的解决方案仅适用于适合单个节点内存的数据。根据您的问题,我看到您将数据加载到本地地图中,因此以下解决方案适合您。如果您想 运行 在具有多个节点的庞大数据集上执行此操作,则上述解决方案将不起作用。


更新解决方案

此解决方案比上面的解决方案更具可扩展性。它确保您始终停留在 RDD 域中,而无需在驱动程序处收集图形(例如,上面我们将所有原始数据加载到 scala Map 中,我们将在此处避免这种情况)。它还涵盖了我们在不同节点之间有一个公共边 ID 的情况(以类似超图的方式)。

假设文本文件的格式如下:

v1,e1 
v2,e1 
v3,e2
v4,e2

在下面的代码中,我们首先读取原始数据,然后将它们转换为适当的顶点和边 RDD。

import org.apache.spark.graphx.Edge
import org.apache.spark.graphx.Graph

def str2Long(s: String) = s.##.toLong

val rawData: RDD[String] = sc.textFile("...")

val toBeJoined: RDD[(String, String)] 
  = rawData.map(_.split(",")).map{ case Array(x,y) => (y,x) }

请注意,我们得到的图将是双向的:如果我们有边 (v1,v2),我们也有边 (v2,v1)

val biDirectionalEdges: RDD[(String, (String, String))] 
  = toBeJoined.join(toBeJoined).filter{ case(e,(v1,v2)) => v1 != v2 }

val edgeRdd = 
  biDirectionalEdges.map{ case(e,v) => Edge[String](str2Long(v._1),str2Long(v._2), e) }
val vertexRdd = 
  toBeJoined.map(_._1).distinct.map(x => (str2Long(x), x))

val g = Graph(vertexRdd, edgeRdd)

// Verify that this is the right graph
g.vertices.take(10).foreach(println)
g.edges.take(10).foreach(println)