在 Spark 中为每个执行者创建单例对象

Creating singleton object in Spark for each executor

我在这里描述了一个非常相似的问题: How to perform one operation on each executor once in spark 我在第一个答案中采用了第一种方法,但仍然遇到序列化问题。

我想做的是,我有像 (sourceVertex, targetVertex) 的元组这样的查询,并将这些查询发送给执行者,执行者将 return 给我一条最短路径。为此,我正在使用 jgrapht。

当我这样实现的时候

class ShortestPath(graph: SimpleDirectedWeightedGraph[Node, DefaultWeightedEdge], 
bc: Broadcast[SimpleDirectedWeightedGraph[Node, DefaultWeightedEdge]]) extends Serializable {

  def calculateShortestPath(vertexRDD: RDD[Node]) = {

    val result = vertexRDD.map(vertex => {
      val dijkstraShortestPath: DijkstraShortestPath[Node, DefaultWeightedEdge] 
                = new DijkstraShortestPath[Node, DefaultWeightedEdge](bc.value)
      val distanceIn = dijkstraShortestPath.getPath(vertex, Node(4, 1, true)).getWeight()
      distanceIn
    })
    result.collect().foreach(println(_))
  }

}

object ShortestPath {
  def apply(graph: SimpleDirectedWeightedGraph[Node, DefaultWeightedEdge], 
bc: Broadcast[SimpleDirectedWeightedGraph[Node, DefaultWeightedEdge]]): ShortestPath = new ShortestPath(graph, bc)
}

一切都好 但问题是我想我正在为每个任务创建 dijkstraShortestPath 对象,对吗?

我的目标是为每个执行器创建这个对象,并将它用于该执行器上的每个任务。

我给出的 link 说用 lazy val 创建一个对象,在这里实例化你的想法然后使用它 RDD 映射函数。我这样实施该解决方案:

object Dij {
  lazy val dijsktra = {
    val graph = GraphCreator.createGraph()
    val dijkstraShortestPath: DijkstraShortestPath[Node, DefaultWeightedEdge] = new DijkstraShortestPath[Node, DefaultWeightedEdge](graph)
    dijkstraShortestPath
  }
}

并在 ShortestPath 中使用 class

    val result = vertexRDD.map(vertex => {
      val dijkstraShortestPath = Dij.dijsktra
      val distanceIn = dijkstraShortestPath.getPath(vertex, Node(4, 1, true)).getWeight()
      dijkstraShortestPath
    })

    result.collect().foreach(println(_))

但是我遇到了序列化错误谢谢说 - object not serializable (class: org.jgrapht.alg.shortestpath.DijkstraShortestPath, value: org.jgrapht.alg.shortestpath.DijkstraShortestPath@2cb8e13b) 没错,当我查看实现时,没有可序列化的。

另一个问题是,如果它不是可序列化的,那么我的第一个实现是如何工作的?

我认为您的第二个代码段中存在意外错误。

给地图的第一个函数returns一个权重(大概是一个Double?)第二个returns一个不可序列化的DijkstraShortestPath。这解释了为什么这两个代码段的行为不同。