给定一个无环有向图,return 节点集合的集合 "at the same level"?
Given an acyclic directed graph, return a collection of collections of nodes "at the same level"?
首先我不确定这种算法叫什么,这是主要问题 - 所以问题的第一部分是这个算法叫什么?
基本上我有一个 DiGraph()
,我在其中插入节点 [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
和边 ([1,3],[2,3],[3,5],[4,5],[5,7],[6,7],[7,8],[7,9],[7,10])
由此我想知道是否有可能得到一个集合如下:[[1, 2, 4, 6], [3], [5], [7], [8, 9, 10]]
编辑:如果有帮助,让我添加一些约束条件。
- 没有循环,这是有保证的
- 图表没有一个起点
我想做的是收集同一级别的节点,以便可以并行处理它们,但在外部集合中,处理是串行的。
EDIT2:很明显我没有充分考虑这个问题,所以描述 "level" 的最简单方法是 最深的前任 ,所有具有前辈的深度相同。所以上面列表中的第一个条目是所有以 0 作为最深前驱的节点,第二个是 1,第三个是 2,依此类推。在每个列表中,siblings 的顺序无关紧要,因为它们将被并行处理。
为什么bfs不解决呢? bfs 算法是广度遍历算法,即它遍历树级别。这也意味着,一次遍历同一级别的所有节点,这是您想要的输出。
正如评论中指出的那样,这将假设图中的起点。
您的问题表明您希望此图的输出为 [[1, 2, 4, 6], [3], [5], [7], [8, 9, 10]]
。 IIUC,格局如下:
[1, 2, 4, 6]
是没有边的节点。
[3]
是没有边的节点,假设之前的所有节点都被擦除。
[4]
是没有边的节点,假设之前的所有节点都被擦除。
等(直到所有节点都被擦除)
假设我们从
开始
g = networkx.DiGraph()
g.add_edges_from([[1,3],[2,3],[3,5],[4,5],[5,7],[6,7],[7,8],[7,9],[7,10]])
然后我们可以将其编码为
def find_levels(g):
levels = []
while g.nodes():
no_in_nodes = [n for (n, d) in g.in_degree(g.nodes()).items() if d == 0]
levels.append(no_in_nodes)
for n in no_in_nodes:
g.remove_node(n)
return levels
如果我们运行这个,我们得到结果:
>>> find_levels(g)
[[1, 2, 4, 6], [3], [5], [7], [8, 9, 10]]
这里的复杂度是Θ(|V|2 + |E|)。可以使用 Fibonnacci Heap 构建更复杂的版本。基本上,所有顶点都需要放入一个堆中,每一层都由度数为 0 的顶点组成。每次弹出一个,并删除其他顶点的边,我们可以将其转换为堆减少键操作(减少剩余顶点的入度)。这会将 运行ning 时间减少到 Θ(|V| log(|V|) + |E|).
正如 Ami 所说,拓扑排序将实现这一点。下面是一个Boost Graph Library的实现,没有上下文,但是可以提取伪代码。 toporder
对象只是为拓扑排序提供迭代器。如果需要,我可以提取通用算法。
template<typename F>
void
scheduler<F>::set_run_levels()
{
run_levels = std::vector<int>(tasks.size(), 0);
Vertexcont toporder;
try
{
topological_sort(g, std::front_inserter(toporder));
}
catch(std::exception &e)
{
std::cerr << e.what() << "\n";
std::cerr << "You most likely have a cycle...\n";
exit(1);
}
vContIt i = toporder.begin();
for(;
i != toporder.end();
++i)
{
if (in_degree(*i,g) > 0)
{
inIt j, j_end;
int maxdist = 0;
for(boost::tie(j,j_end) = in_edges(*i,g);
j != j_end;
++j)
{
maxdist = (std::max)(run_levels[source(*j,g)], maxdist);
run_levels[*i] = maxdist+1;
}
}
}
}
我想我曾将此应用于同一问题,然后意识到这是不必要的。只需在一触即发的情况下设置任务,所有任务都会向其家属发出完成信号(通过 condition_variable,承诺)。所以我所需要的只是了解每个任务的依赖关系,找到初始任务,然后启动。您的情况是否需要 run_level
的完整规格?
首先我不确定这种算法叫什么,这是主要问题 - 所以问题的第一部分是这个算法叫什么?
基本上我有一个 DiGraph()
,我在其中插入节点 [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
和边 ([1,3],[2,3],[3,5],[4,5],[5,7],[6,7],[7,8],[7,9],[7,10])
由此我想知道是否有可能得到一个集合如下:[[1, 2, 4, 6], [3], [5], [7], [8, 9, 10]]
编辑:如果有帮助,让我添加一些约束条件。 - 没有循环,这是有保证的 - 图表没有一个起点
我想做的是收集同一级别的节点,以便可以并行处理它们,但在外部集合中,处理是串行的。
EDIT2:很明显我没有充分考虑这个问题,所以描述 "level" 的最简单方法是 最深的前任 ,所有具有前辈的深度相同。所以上面列表中的第一个条目是所有以 0 作为最深前驱的节点,第二个是 1,第三个是 2,依此类推。在每个列表中,siblings 的顺序无关紧要,因为它们将被并行处理。
为什么bfs不解决呢? bfs 算法是广度遍历算法,即它遍历树级别。这也意味着,一次遍历同一级别的所有节点,这是您想要的输出。 正如评论中指出的那样,这将假设图中的起点。
您的问题表明您希望此图的输出为 [[1, 2, 4, 6], [3], [5], [7], [8, 9, 10]]
。 IIUC,格局如下:
[1, 2, 4, 6]
是没有边的节点。[3]
是没有边的节点,假设之前的所有节点都被擦除。[4]
是没有边的节点,假设之前的所有节点都被擦除。等(直到所有节点都被擦除)
假设我们从
开始g = networkx.DiGraph()
g.add_edges_from([[1,3],[2,3],[3,5],[4,5],[5,7],[6,7],[7,8],[7,9],[7,10]])
然后我们可以将其编码为
def find_levels(g):
levels = []
while g.nodes():
no_in_nodes = [n for (n, d) in g.in_degree(g.nodes()).items() if d == 0]
levels.append(no_in_nodes)
for n in no_in_nodes:
g.remove_node(n)
return levels
如果我们运行这个,我们得到结果:
>>> find_levels(g)
[[1, 2, 4, 6], [3], [5], [7], [8, 9, 10]]
这里的复杂度是Θ(|V|2 + |E|)。可以使用 Fibonnacci Heap 构建更复杂的版本。基本上,所有顶点都需要放入一个堆中,每一层都由度数为 0 的顶点组成。每次弹出一个,并删除其他顶点的边,我们可以将其转换为堆减少键操作(减少剩余顶点的入度)。这会将 运行ning 时间减少到 Θ(|V| log(|V|) + |E|).
正如 Ami 所说,拓扑排序将实现这一点。下面是一个Boost Graph Library的实现,没有上下文,但是可以提取伪代码。 toporder
对象只是为拓扑排序提供迭代器。如果需要,我可以提取通用算法。
template<typename F>
void
scheduler<F>::set_run_levels()
{
run_levels = std::vector<int>(tasks.size(), 0);
Vertexcont toporder;
try
{
topological_sort(g, std::front_inserter(toporder));
}
catch(std::exception &e)
{
std::cerr << e.what() << "\n";
std::cerr << "You most likely have a cycle...\n";
exit(1);
}
vContIt i = toporder.begin();
for(;
i != toporder.end();
++i)
{
if (in_degree(*i,g) > 0)
{
inIt j, j_end;
int maxdist = 0;
for(boost::tie(j,j_end) = in_edges(*i,g);
j != j_end;
++j)
{
maxdist = (std::max)(run_levels[source(*j,g)], maxdist);
run_levels[*i] = maxdist+1;
}
}
}
}
我想我曾将此应用于同一问题,然后意识到这是不必要的。只需在一触即发的情况下设置任务,所有任务都会向其家属发出完成信号(通过 condition_variable,承诺)。所以我所需要的只是了解每个任务的依赖关系,找到初始任务,然后启动。您的情况是否需要 run_level
的完整规格?