Scala 中的拓扑排序
Topological sort in scala
我正在寻找 topological sorting 在 scala 中的一个很好的实现。
解决方案应该是稳定的:
如果输入已经排序,输出应该不变
算法应该是确定性的(hashCode没有作用)
我怀疑有些库可以做到这一点,但我不想因此添加重要的依赖项。
示例问题:
case class Node(name: String)(val referenced: Node*)
val a = Node("a")()
val b = Node("b")(a)
val c = Node("c")(a)
val d = Node("d")(b, c)
val e = Node("e")(d)
val f = Node("f")()
assertEquals("Previous order is kept",
Vector(f, a, b, c, d, e),
topoSort(Vector(f, a, b, c, d, e)))
assertEquals(Vector(a, b, c, d, f, e),
topoSort(Vector(d, c, b, f, a, e)))
这里定义了顺序,如果节点是引用其他声明的编程语言中的声明,则结果顺序将
在声明之前不使用任何声明。
这是我自己的解决方案。此外,它 returns 在输入中检测到可能的循环。
节点的格式不固定,因为调用者提供了一个访问者
将获取一个节点和一个回调,并为每个引用的节点调用回调。
如果不需要循环报告,应该很容易删除。
import scala.collection.mutable
// Based on https://en.wikipedia.org/wiki/Topological_sorting?oldformat=true#Depth-first_search
object TopologicalSort {
case class Result[T](result: IndexedSeq[T], loops: IndexedSeq[IndexedSeq[T]])
type Visit[T] = (T) => Unit
// A visitor is a function that takes a node and a callback.
// The visitor calls the callback for each node referenced by the given node.
type Visitor[T] = (T, Visit[T]) => Unit
def topoSort[T <: AnyRef](input: Iterable[T], visitor: Visitor[T]): Result[T] = {
// Buffer, because it is operated in a stack like fashion
val temporarilyMarked = mutable.Buffer[T]()
val permanentlyMarked = mutable.HashSet[T]()
val loopsBuilder = IndexedSeq.newBuilder[IndexedSeq[T]]
val resultBuilder = IndexedSeq.newBuilder[T]
def visit(node: T): Unit = {
if (temporarilyMarked.contains(node)) {
val loopStartIndex = temporarilyMarked.indexOf(node)
val loop = temporarilyMarked.slice(loopStartIndex, temporarilyMarked.size)
.toIndexedSeq
loopsBuilder += loop
} else if (!permanentlyMarked.contains(node)) {
temporarilyMarked += node
visitor(node, visit)
permanentlyMarked += node
temporarilyMarked.remove(temporarilyMarked.size - 1, 1)
resultBuilder += node
}
}
for (i <- input) {
if (!permanentlyMarked.contains(i)) {
visit(i)
}
}
Result(resultBuilder.result(), loopsBuilder.result())
}
}
在问题的示例中,这将像这样应用:
import TopologicalSort._
def visitor(node: BaseNode, callback: (Node) => Unit): Unit = {
node.referenced.foreach(callback)
}
assertEquals("Previous order is kept",
Vector(f, a, b, c, d, e),
topoSort(Vector(f, a, b, c, d, e), visitor).result)
assertEquals(Vector(a, b, c, d, f, e),
topoSort(Vector(d, c, b, f, a, e), visitor).result)
关于复杂性的一些想法:
这个解决方案的最坏情况复杂度实际上在 O(n + m) 以上,因为每个节点都扫描了 temporarilyMarked
数组。
如果将 temporarilyMarked
替换为例如 HashSet
,渐近复杂度将得到改善。
如果将标记直接存储在节点内部,则可以实现真正的 O(n + m),但将它们存储在外部使得编写通用解决方案更加容易。
我没有 运行 任何性能测试,但我怀疑即使在大图中扫描 temporarilyMarked
数组也不是问题,只要它们不是很深。
Github
上的示例代码和测试
我也有非常相似的代码 published here. That version has a test suite 这对于试验和探索实现很有用。
为什么要检测循环
检测循环可能很有用,例如在大多数数据可以作为 DAG 处理的序列化情况下,但循环可以通过某种特殊安排来处理。
上一节链接的 Github 代码中的测试套件包含具有多个循环的各种情况。
这是一个纯函数实现,returns 仅当图是无环图时才进行拓扑排序。
case class Node(label: Int)
case class Graph(adj: Map[Node, Set[Node]]) {
case class DfsState(discovered: Set[Node] = Set(), activeNodes: Set[Node] = Set(), tsOrder: List[Node] = List(),
isCylic: Boolean = false)
def dfs: (List[Node], Boolean) = {
def dfsVisit(currState: DfsState, src: Node): DfsState = {
val newState = currState.copy(discovered = currState.discovered + src, activeNodes = currState.activeNodes + src,
isCylic = currState.isCylic || adj(src).exists(currState.activeNodes))
val finalState = adj(src).filterNot(newState.discovered).foldLeft(newState)(dfsVisit(_, _))
finalState.copy(tsOrder = src :: finalState.tsOrder, activeNodes = finalState.activeNodes - src)
}
val stateAfterSearch = adj.keys.foldLeft(DfsState()) {(state, n) => if (state.discovered(n)) state else dfsVisit(state, n)}
(stateAfterSearch.tsOrder, stateAfterSearch.isCylic)
}
def topologicalSort: Option[List[Node]] = dfs match {
case (topologicalOrder, false) => Some(topologicalOrder)
case _ => None
}
}
我正在寻找 topological sorting 在 scala 中的一个很好的实现。
解决方案应该是稳定的:
如果输入已经排序,输出应该不变
算法应该是确定性的(hashCode没有作用)
我怀疑有些库可以做到这一点,但我不想因此添加重要的依赖项。
示例问题:
case class Node(name: String)(val referenced: Node*)
val a = Node("a")()
val b = Node("b")(a)
val c = Node("c")(a)
val d = Node("d")(b, c)
val e = Node("e")(d)
val f = Node("f")()
assertEquals("Previous order is kept",
Vector(f, a, b, c, d, e),
topoSort(Vector(f, a, b, c, d, e)))
assertEquals(Vector(a, b, c, d, f, e),
topoSort(Vector(d, c, b, f, a, e)))
这里定义了顺序,如果节点是引用其他声明的编程语言中的声明,则结果顺序将 在声明之前不使用任何声明。
这是我自己的解决方案。此外,它 returns 在输入中检测到可能的循环。
节点的格式不固定,因为调用者提供了一个访问者 将获取一个节点和一个回调,并为每个引用的节点调用回调。
如果不需要循环报告,应该很容易删除。
import scala.collection.mutable
// Based on https://en.wikipedia.org/wiki/Topological_sorting?oldformat=true#Depth-first_search
object TopologicalSort {
case class Result[T](result: IndexedSeq[T], loops: IndexedSeq[IndexedSeq[T]])
type Visit[T] = (T) => Unit
// A visitor is a function that takes a node and a callback.
// The visitor calls the callback for each node referenced by the given node.
type Visitor[T] = (T, Visit[T]) => Unit
def topoSort[T <: AnyRef](input: Iterable[T], visitor: Visitor[T]): Result[T] = {
// Buffer, because it is operated in a stack like fashion
val temporarilyMarked = mutable.Buffer[T]()
val permanentlyMarked = mutable.HashSet[T]()
val loopsBuilder = IndexedSeq.newBuilder[IndexedSeq[T]]
val resultBuilder = IndexedSeq.newBuilder[T]
def visit(node: T): Unit = {
if (temporarilyMarked.contains(node)) {
val loopStartIndex = temporarilyMarked.indexOf(node)
val loop = temporarilyMarked.slice(loopStartIndex, temporarilyMarked.size)
.toIndexedSeq
loopsBuilder += loop
} else if (!permanentlyMarked.contains(node)) {
temporarilyMarked += node
visitor(node, visit)
permanentlyMarked += node
temporarilyMarked.remove(temporarilyMarked.size - 1, 1)
resultBuilder += node
}
}
for (i <- input) {
if (!permanentlyMarked.contains(i)) {
visit(i)
}
}
Result(resultBuilder.result(), loopsBuilder.result())
}
}
在问题的示例中,这将像这样应用:
import TopologicalSort._
def visitor(node: BaseNode, callback: (Node) => Unit): Unit = {
node.referenced.foreach(callback)
}
assertEquals("Previous order is kept",
Vector(f, a, b, c, d, e),
topoSort(Vector(f, a, b, c, d, e), visitor).result)
assertEquals(Vector(a, b, c, d, f, e),
topoSort(Vector(d, c, b, f, a, e), visitor).result)
关于复杂性的一些想法:
这个解决方案的最坏情况复杂度实际上在 O(n + m) 以上,因为每个节点都扫描了 temporarilyMarked
数组。
如果将 temporarilyMarked
替换为例如 HashSet
,渐近复杂度将得到改善。
如果将标记直接存储在节点内部,则可以实现真正的 O(n + m),但将它们存储在外部使得编写通用解决方案更加容易。
我没有 运行 任何性能测试,但我怀疑即使在大图中扫描 temporarilyMarked
数组也不是问题,只要它们不是很深。
Github
上的示例代码和测试我也有非常相似的代码 published here. That version has a test suite 这对于试验和探索实现很有用。
为什么要检测循环
检测循环可能很有用,例如在大多数数据可以作为 DAG 处理的序列化情况下,但循环可以通过某种特殊安排来处理。
上一节链接的 Github 代码中的测试套件包含具有多个循环的各种情况。
这是一个纯函数实现,returns 仅当图是无环图时才进行拓扑排序。
case class Node(label: Int)
case class Graph(adj: Map[Node, Set[Node]]) {
case class DfsState(discovered: Set[Node] = Set(), activeNodes: Set[Node] = Set(), tsOrder: List[Node] = List(),
isCylic: Boolean = false)
def dfs: (List[Node], Boolean) = {
def dfsVisit(currState: DfsState, src: Node): DfsState = {
val newState = currState.copy(discovered = currState.discovered + src, activeNodes = currState.activeNodes + src,
isCylic = currState.isCylic || adj(src).exists(currState.activeNodes))
val finalState = adj(src).filterNot(newState.discovered).foldLeft(newState)(dfsVisit(_, _))
finalState.copy(tsOrder = src :: finalState.tsOrder, activeNodes = finalState.activeNodes - src)
}
val stateAfterSearch = adj.keys.foldLeft(DfsState()) {(state, n) => if (state.discovered(n)) state else dfsVisit(state, n)}
(stateAfterSearch.tsOrder, stateAfterSearch.isCylic)
}
def topologicalSort: Option[List[Node]] = dfs match {
case (topologicalOrder, false) => Some(topologicalOrder)
case _ => None
}
}