如何将包含 (vertexId,edgeId) 的 Map 转换为 GraphX RDD
How to convert a Map containing (vertexId,edgeId) into GraphX RDDs
从文件中解析图后,我得到一个 Map,其中键表示顶点 (id),值表示边 (id)。为了创建边 (Vx->Vy)
我们需要使用值(边 id)连接 Map 条目。目标是根据此表示创建一个 GraphX 图。
这是我目前的情况:
tempLHM.foreach(x=>println(x))
(A.L0,A)
(B.L0,B)
(C.L0,C)
(D.L0,D)
(E.L0,E)
(a.L0M1,A)
(b.L0M1,B)
(c.L0M1,n4)
(a.L0M2,n4)
(b.L0M2,D)
(c.L0M2,n5)
(a.L0M3,n5)
(b.L0M3,C)
(c.L0M3,E)
有没有直接的方法将这个 hashmap 映射到顶点和边 RDD?
tempLHM 是可变的 LinkedHashMap[String,String]
。在上面的hashmap中,在元素(A.L0,A)和(a.L0M1,A)中,A.L0和a.L0M1是由公共值A(边)[=14=连接的键(顶点) ]
下面是我想得出的结果
val vertex:RDD(vertexId, VertexName) i.e ((A.L0).Long, A.L0), ((a.L0M1).Long, a.L0M1) etc
val edge:RDD((vertexId1, vertexId2), EdgeName) i.e ((A.L0).Long, (a.L0M1).Long), A)
假设您的数据具有这种结构。
val d = Map("v1" -> "e1", "v2" -> "e1", "v3" -> "e2", "v4" -> "e2")
这里有两条边 ("v1","v2") 和 ("v3","v4")
假设您有一个简单的图(不是可以有多个节点通过边连接的超图)。因此,此解决方案的假设是一条边仅连接两个节点并且边只出现一次。
import collection.mutable.{ HashMap, MultiMap, Set }
import java.security.MessageDigest
import org.apache.spark.graphx.Edge
import org.apache.spark.graphx.Graph
// a hacky way to go from string to Long since GraphX need Longs to
// represent vertex IDs. You might want to do something different
// here to make sure that your IDs are unique.
def str2Long(s: String) = s.##.toLong
val d = Map("v1" -> "e1", "v2" -> "e1", "v3" -> "e2", "v4" -> "e2")
// We use a multi-map to create an inverse map (Edge->Set(Vertices))
val mm = new HashMap[String, Set[String]] with MultiMap[String, String]
d.foreach{ x => mm.addBinding(x._2,x._1) }
val edges = mm.map{ case(k,v) => Edge[String](str2Long(v.head),str2Long(v.last), k) }.toList
val vertices = d.keys.map(x => (str2Long(x), x)).toList
val edgeRdd = sc.parallelize(edges)
val vertexRdd = sc.parallelize(vertices)
val g = Graph(vertexRdd, edgeRdd)
如果你打印你得到的边和顶点:
g.vertices.foreach(println)
g.edges.foreach(println)
(3709,v3)
(3707,v1)
(3708,v2)
(3710,v4)
Edge(3709,3710,e2)
Edge(3707,3708,e1)
注意:此处的解决方案仅适用于适合单个节点内存的数据。根据您的问题,我看到您将数据加载到本地地图中,因此以下解决方案适合您。如果您想 运行 在具有多个节点的庞大数据集上执行此操作,则上述解决方案将不起作用。
更新解决方案
此解决方案比上面的解决方案更具可扩展性。它确保您始终停留在 RDD 域中,而无需在驱动程序处收集图形(例如,上面我们将所有原始数据加载到 scala Map 中,我们将在此处避免这种情况)。它还涵盖了我们在不同节点之间有一个公共边 ID 的情况(以类似超图的方式)。
假设文本文件的格式如下:
v1,e1
v2,e1
v3,e2
v4,e2
在下面的代码中,我们首先读取原始数据,然后将它们转换为适当的顶点和边 RDD。
import org.apache.spark.graphx.Edge
import org.apache.spark.graphx.Graph
def str2Long(s: String) = s.##.toLong
val rawData: RDD[String] = sc.textFile("...")
val toBeJoined: RDD[(String, String)]
= rawData.map(_.split(",")).map{ case Array(x,y) => (y,x) }
请注意,我们得到的图将是双向的:如果我们有边 (v1,v2)
,我们也有边 (v2,v1)
。
val biDirectionalEdges: RDD[(String, (String, String))]
= toBeJoined.join(toBeJoined).filter{ case(e,(v1,v2)) => v1 != v2 }
val edgeRdd =
biDirectionalEdges.map{ case(e,v) => Edge[String](str2Long(v._1),str2Long(v._2), e) }
val vertexRdd =
toBeJoined.map(_._1).distinct.map(x => (str2Long(x), x))
val g = Graph(vertexRdd, edgeRdd)
// Verify that this is the right graph
g.vertices.take(10).foreach(println)
g.edges.take(10).foreach(println)
从文件中解析图后,我得到一个 Map,其中键表示顶点 (id),值表示边 (id)。为了创建边 (Vx->Vy)
我们需要使用值(边 id)连接 Map 条目。目标是根据此表示创建一个 GraphX 图。
这是我目前的情况:
tempLHM.foreach(x=>println(x))
(A.L0,A)
(B.L0,B)
(C.L0,C)
(D.L0,D)
(E.L0,E)
(a.L0M1,A)
(b.L0M1,B)
(c.L0M1,n4)
(a.L0M2,n4)
(b.L0M2,D)
(c.L0M2,n5)
(a.L0M3,n5)
(b.L0M3,C)
(c.L0M3,E)
有没有直接的方法将这个 hashmap 映射到顶点和边 RDD?
tempLHM 是可变的 LinkedHashMap[String,String]
。在上面的hashmap中,在元素(A.L0,A)和(a.L0M1,A)中,A.L0和a.L0M1是由公共值A(边)[=14=连接的键(顶点) ]
下面是我想得出的结果
val vertex:RDD(vertexId, VertexName) i.e ((A.L0).Long, A.L0), ((a.L0M1).Long, a.L0M1) etc
val edge:RDD((vertexId1, vertexId2), EdgeName) i.e ((A.L0).Long, (a.L0M1).Long), A)
假设您的数据具有这种结构。
val d = Map("v1" -> "e1", "v2" -> "e1", "v3" -> "e2", "v4" -> "e2")
这里有两条边 ("v1","v2") 和 ("v3","v4")
假设您有一个简单的图(不是可以有多个节点通过边连接的超图)。因此,此解决方案的假设是一条边仅连接两个节点并且边只出现一次。
import collection.mutable.{ HashMap, MultiMap, Set }
import java.security.MessageDigest
import org.apache.spark.graphx.Edge
import org.apache.spark.graphx.Graph
// a hacky way to go from string to Long since GraphX need Longs to
// represent vertex IDs. You might want to do something different
// here to make sure that your IDs are unique.
def str2Long(s: String) = s.##.toLong
val d = Map("v1" -> "e1", "v2" -> "e1", "v3" -> "e2", "v4" -> "e2")
// We use a multi-map to create an inverse map (Edge->Set(Vertices))
val mm = new HashMap[String, Set[String]] with MultiMap[String, String]
d.foreach{ x => mm.addBinding(x._2,x._1) }
val edges = mm.map{ case(k,v) => Edge[String](str2Long(v.head),str2Long(v.last), k) }.toList
val vertices = d.keys.map(x => (str2Long(x), x)).toList
val edgeRdd = sc.parallelize(edges)
val vertexRdd = sc.parallelize(vertices)
val g = Graph(vertexRdd, edgeRdd)
如果你打印你得到的边和顶点:
g.vertices.foreach(println)
g.edges.foreach(println)
(3709,v3)
(3707,v1)
(3708,v2)
(3710,v4)
Edge(3709,3710,e2)
Edge(3707,3708,e1)
注意:此处的解决方案仅适用于适合单个节点内存的数据。根据您的问题,我看到您将数据加载到本地地图中,因此以下解决方案适合您。如果您想 运行 在具有多个节点的庞大数据集上执行此操作,则上述解决方案将不起作用。
更新解决方案
此解决方案比上面的解决方案更具可扩展性。它确保您始终停留在 RDD 域中,而无需在驱动程序处收集图形(例如,上面我们将所有原始数据加载到 scala Map 中,我们将在此处避免这种情况)。它还涵盖了我们在不同节点之间有一个公共边 ID 的情况(以类似超图的方式)。
假设文本文件的格式如下:
v1,e1
v2,e1
v3,e2
v4,e2
在下面的代码中,我们首先读取原始数据,然后将它们转换为适当的顶点和边 RDD。
import org.apache.spark.graphx.Edge
import org.apache.spark.graphx.Graph
def str2Long(s: String) = s.##.toLong
val rawData: RDD[String] = sc.textFile("...")
val toBeJoined: RDD[(String, String)]
= rawData.map(_.split(",")).map{ case Array(x,y) => (y,x) }
请注意,我们得到的图将是双向的:如果我们有边 (v1,v2)
,我们也有边 (v2,v1)
。
val biDirectionalEdges: RDD[(String, (String, String))]
= toBeJoined.join(toBeJoined).filter{ case(e,(v1,v2)) => v1 != v2 }
val edgeRdd =
biDirectionalEdges.map{ case(e,v) => Edge[String](str2Long(v._1),str2Long(v._2), e) }
val vertexRdd =
toBeJoined.map(_._1).distinct.map(x => (str2Long(x), x))
val g = Graph(vertexRdd, edgeRdd)
// Verify that this is the right graph
g.vertices.take(10).foreach(println)
g.edges.take(10).foreach(println)