日元的算法实现不选择最短路径
Yen's Algorithm implementation not choosing shortest paths
我正在使用 Yen 算法 (Wikipedia) 在图中查找 k 条最短路径。在下面的示例中,我的图是一个字典,其中每个节点都是一个键,其值是邻居。 Map()
from dotmap
只允许将字典转换为一个对象,在该对象中可以使用点表示法访问键。我想找到从 A 到 F 降序排列的四个最短路径,其中每条边都具有相同的权重。前两个是领带 (A > B > D > F) 和 (A > E > D > F),接下来的两个是 (A > B > C > G > F) 最后是 (A > B > D > C > G > F)。可能是我对 Dijkstra 的实现(尽管没有启发式算法,但仍称为 AStar)有缺陷,因为它 return 在未找到路径时正在创建一个空列表。我怎样才能让我的代码只选择有效路径?目前它 returns [['A', 'B', 'D', 'F'], ['A', 'E', 'D', 'F'], [], []]
-- 它应该 return [['A', 'B', 'D', 'F'], ['A', 'E', 'D', 'F'], ['A', 'B', 'C', 'G', 'F'], ['A', 'B', 'D', 'C', 'G', 'F']]
这是最短的路径。
import copy
import heapq
from dotmap import Map
from itertools import count
graph = {
'A': ['B', 'E'],
'B': ['C', 'D'],
'C': ['G'],
'D': ['C', 'F'],
'E': ['D'],
'F': [],
'G': ['F']
}
class PriorityQueue:
def __init__(self):
self.elements = []
self._counter = count()
def empty(self):
return len(self.elements) == 0
def put(self, item, priority):
heapq.heappush(self.elements, (priority, item,))
def get(self):
return heapq.heappop(self.elements)[1]
class AStar:
def __init__(self, graph, start, goals=[]):
self.graph = graph
self.start = start
self.frontier = PriorityQueue()
self.frontier.put(start, 0)
self.previous = {}
self.previous[start] = None
self.costs = {}
self.costs[start] = 0
self.final_path = None
self.goals = goals
self.goal = None
def search(self):
graph = self.graph
frontier = self.frontier
goals = self.goals
costs = self.costs
while not frontier.empty():
state = frontier.get()
if state in goals:
cost = self.costs[state]
self.goal = state
self.final_path = self.trace_path()
return Map({'path': self.final_path, 'cost': cost})
for next_state in graph[state]:
new_cost = costs[state] + 1
if next_state not in costs or new_cost < costs[next_state]:
costs[next_state] = new_cost
priority = new_cost
frontier.put(next_state, priority)
self.previous[next_state] = state
# No path found
return Map({'path': [], 'cost': 0})
def trace_path(self):
current = self.goal
path = []
while current != self.start:
path.append(current)
current = self.previous[current]
path.append(self.start)
path.reverse()
return path
def YenKSP(graph, source, sink, k_paths):
graph_clone = copy.deepcopy(graph)
A = [AStar(graph, source, sink).search().path]
B = []
for k in range(1, k_paths):
for i in range(len(A[-1]) - 1):
spur_node = A[-1][i]
root_path = A[-1][:i+1]
for path in A:
if len(path) > i and root_path == path[:i+1]:
graph_clone[path[i]].remove(path[i+1])
result = AStar(graph_clone, spur_node, sink).search()
spur_path = result.path
total_path = root_path[:-1] + spur_path
spur_cost = AStar(graph_clone, source, spur_node).search().cost
B.append(Map({'path': total_path, 'cost': result.cost + spur_cost}))
graph_clone = copy.deepcopy(graph)
if len(B) == 0:
break
B.sort(key=lambda p: (p.cost, len(p.path)))
A.append(B[0].path)
B.pop()
return A
paths = YenKSP(graph, 'A', 'F', 4)
print(paths)
import copy
import heapq
#from dotmap import Map
from itertools import count
class Map(dict):
def __getattr__(self, k):
return self[k]
def __setattr__(self, k, v):
self[k] = v
graph = {
'A': ['B', 'E'],
'B': ['C', 'D'],
'C': ['G'],
'D': ['C', 'F'],
'E': ['D'],
'F': [],
'G': ['F']
}
class PriorityQueue:
def __init__(self):
self.elements = []
self._counter = count()
def empty(self):
return len(self.elements) == 0
def put(self, item, priority):
heapq.heappush(self.elements, (priority, item,))
def get(self):
return heapq.heappop(self.elements)[1]
class AStar:
def __init__(self, graph, start, goals=[]):
self.graph = graph
self.start = start
self.frontier = PriorityQueue()
self.frontier.put(start, 0)
self.previous = {}
self.previous[start] = None
self.costs = {}
self.costs[start] = 0
self.final_path = None
self.goals = goals
self.goal = None
def search(self):
graph = self.graph
frontier = self.frontier
goals = self.goals
costs = self.costs
while not frontier.empty():
state = frontier.get()
if state in goals:
cost = self.costs[state]
self.goal = state
self.final_path = self.trace_path()
return Map({'path': self.final_path, 'cost': cost})
for next_state in graph[state]:
new_cost = costs[state] + 1
if next_state not in costs or new_cost < costs[next_state]:
costs[next_state] = new_cost
priority = new_cost
frontier.put(next_state, priority)
self.previous[next_state] = state
# No path found
return Map({'path': [], 'cost': float('inf')})
def trace_path(self):
current = self.goal
path = []
while current != self.start:
path.append(current)
current = self.previous[current]
path.append(self.start)
path.reverse()
return path
def YenKSP(graph, source, sink, k_paths):
A = [AStar(graph, source, sink).search().path]
B = []
for _ in range(1, k_paths):
for i in range(len(A[-1]) - 1):
graph_clone = copy.deepcopy(graph)
spur_node = A[-1][i]
root_path = A[-1][:i+1]
for path in A:
if len(path) > i and root_path == path[:i+1]:
if path[i+1] in graph_clone[path[i]]:
graph_clone[path[i]].remove(path[i+1])
result = AStar(graph_clone, spur_node, sink).search()
spur_path = result.path
total_path = root_path[:-1] + spur_path
spur_cost = AStar(graph_clone, source, spur_node).search().cost
B.append(Map({'path': total_path, 'cost': result.cost + spur_cost}))
if len(B) == 0:
break
B.sort(key=lambda p: (p.cost, len(p.path)))
best_b = B.pop(0)
if best_b.cost != float('inf'):
A.append(best_b.path)
return A
paths = YenKSP(graph, 'A', 'F', 4)
print(paths)
生产:
[['A', 'B', 'D', 'F'], ['A', 'E', 'D', 'F'], ['A', 'B', 'C', 'G', 'F'], ['A', 'B', 'D', 'C', 'G', 'F']]
主要问题是,当找不到路径时,您的默认值会返回一条成本为 0 的路径。因此,当按路径成本排序时,这些路径在 B
中显示为最佳选择,并被添加到 A
中。我将默认路径成本更改为 float('inf')
。这样做揭示了当您尝试从 graph_clone
(在 for path in A: ...
内)删除相同边两次时可能发生的错误,因此我添加了一个 if
检查以有条件地删除边。最后两件事 the diff 表明我所做的是 (a) 模仿你的 dotmap.Map
class (你可以删除它并取消注释导入),以及 (b) 只添加一个路径到结果集 A
如果成本是有限的。
我正在使用 Yen 算法 (Wikipedia) 在图中查找 k 条最短路径。在下面的示例中,我的图是一个字典,其中每个节点都是一个键,其值是邻居。 Map()
from dotmap
只允许将字典转换为一个对象,在该对象中可以使用点表示法访问键。我想找到从 A 到 F 降序排列的四个最短路径,其中每条边都具有相同的权重。前两个是领带 (A > B > D > F) 和 (A > E > D > F),接下来的两个是 (A > B > C > G > F) 最后是 (A > B > D > C > G > F)。可能是我对 Dijkstra 的实现(尽管没有启发式算法,但仍称为 AStar)有缺陷,因为它 return 在未找到路径时正在创建一个空列表。我怎样才能让我的代码只选择有效路径?目前它 returns [['A', 'B', 'D', 'F'], ['A', 'E', 'D', 'F'], [], []]
-- 它应该 return [['A', 'B', 'D', 'F'], ['A', 'E', 'D', 'F'], ['A', 'B', 'C', 'G', 'F'], ['A', 'B', 'D', 'C', 'G', 'F']]
这是最短的路径。
import copy
import heapq
from dotmap import Map
from itertools import count
graph = {
'A': ['B', 'E'],
'B': ['C', 'D'],
'C': ['G'],
'D': ['C', 'F'],
'E': ['D'],
'F': [],
'G': ['F']
}
class PriorityQueue:
def __init__(self):
self.elements = []
self._counter = count()
def empty(self):
return len(self.elements) == 0
def put(self, item, priority):
heapq.heappush(self.elements, (priority, item,))
def get(self):
return heapq.heappop(self.elements)[1]
class AStar:
def __init__(self, graph, start, goals=[]):
self.graph = graph
self.start = start
self.frontier = PriorityQueue()
self.frontier.put(start, 0)
self.previous = {}
self.previous[start] = None
self.costs = {}
self.costs[start] = 0
self.final_path = None
self.goals = goals
self.goal = None
def search(self):
graph = self.graph
frontier = self.frontier
goals = self.goals
costs = self.costs
while not frontier.empty():
state = frontier.get()
if state in goals:
cost = self.costs[state]
self.goal = state
self.final_path = self.trace_path()
return Map({'path': self.final_path, 'cost': cost})
for next_state in graph[state]:
new_cost = costs[state] + 1
if next_state not in costs or new_cost < costs[next_state]:
costs[next_state] = new_cost
priority = new_cost
frontier.put(next_state, priority)
self.previous[next_state] = state
# No path found
return Map({'path': [], 'cost': 0})
def trace_path(self):
current = self.goal
path = []
while current != self.start:
path.append(current)
current = self.previous[current]
path.append(self.start)
path.reverse()
return path
def YenKSP(graph, source, sink, k_paths):
graph_clone = copy.deepcopy(graph)
A = [AStar(graph, source, sink).search().path]
B = []
for k in range(1, k_paths):
for i in range(len(A[-1]) - 1):
spur_node = A[-1][i]
root_path = A[-1][:i+1]
for path in A:
if len(path) > i and root_path == path[:i+1]:
graph_clone[path[i]].remove(path[i+1])
result = AStar(graph_clone, spur_node, sink).search()
spur_path = result.path
total_path = root_path[:-1] + spur_path
spur_cost = AStar(graph_clone, source, spur_node).search().cost
B.append(Map({'path': total_path, 'cost': result.cost + spur_cost}))
graph_clone = copy.deepcopy(graph)
if len(B) == 0:
break
B.sort(key=lambda p: (p.cost, len(p.path)))
A.append(B[0].path)
B.pop()
return A
paths = YenKSP(graph, 'A', 'F', 4)
print(paths)
import copy
import heapq
#from dotmap import Map
from itertools import count
class Map(dict):
def __getattr__(self, k):
return self[k]
def __setattr__(self, k, v):
self[k] = v
graph = {
'A': ['B', 'E'],
'B': ['C', 'D'],
'C': ['G'],
'D': ['C', 'F'],
'E': ['D'],
'F': [],
'G': ['F']
}
class PriorityQueue:
def __init__(self):
self.elements = []
self._counter = count()
def empty(self):
return len(self.elements) == 0
def put(self, item, priority):
heapq.heappush(self.elements, (priority, item,))
def get(self):
return heapq.heappop(self.elements)[1]
class AStar:
def __init__(self, graph, start, goals=[]):
self.graph = graph
self.start = start
self.frontier = PriorityQueue()
self.frontier.put(start, 0)
self.previous = {}
self.previous[start] = None
self.costs = {}
self.costs[start] = 0
self.final_path = None
self.goals = goals
self.goal = None
def search(self):
graph = self.graph
frontier = self.frontier
goals = self.goals
costs = self.costs
while not frontier.empty():
state = frontier.get()
if state in goals:
cost = self.costs[state]
self.goal = state
self.final_path = self.trace_path()
return Map({'path': self.final_path, 'cost': cost})
for next_state in graph[state]:
new_cost = costs[state] + 1
if next_state not in costs or new_cost < costs[next_state]:
costs[next_state] = new_cost
priority = new_cost
frontier.put(next_state, priority)
self.previous[next_state] = state
# No path found
return Map({'path': [], 'cost': float('inf')})
def trace_path(self):
current = self.goal
path = []
while current != self.start:
path.append(current)
current = self.previous[current]
path.append(self.start)
path.reverse()
return path
def YenKSP(graph, source, sink, k_paths):
A = [AStar(graph, source, sink).search().path]
B = []
for _ in range(1, k_paths):
for i in range(len(A[-1]) - 1):
graph_clone = copy.deepcopy(graph)
spur_node = A[-1][i]
root_path = A[-1][:i+1]
for path in A:
if len(path) > i and root_path == path[:i+1]:
if path[i+1] in graph_clone[path[i]]:
graph_clone[path[i]].remove(path[i+1])
result = AStar(graph_clone, spur_node, sink).search()
spur_path = result.path
total_path = root_path[:-1] + spur_path
spur_cost = AStar(graph_clone, source, spur_node).search().cost
B.append(Map({'path': total_path, 'cost': result.cost + spur_cost}))
if len(B) == 0:
break
B.sort(key=lambda p: (p.cost, len(p.path)))
best_b = B.pop(0)
if best_b.cost != float('inf'):
A.append(best_b.path)
return A
paths = YenKSP(graph, 'A', 'F', 4)
print(paths)
生产:
[['A', 'B', 'D', 'F'], ['A', 'E', 'D', 'F'], ['A', 'B', 'C', 'G', 'F'], ['A', 'B', 'D', 'C', 'G', 'F']]
主要问题是,当找不到路径时,您的默认值会返回一条成本为 0 的路径。因此,当按路径成本排序时,这些路径在 B
中显示为最佳选择,并被添加到 A
中。我将默认路径成本更改为 float('inf')
。这样做揭示了当您尝试从 graph_clone
(在 for path in A: ...
内)删除相同边两次时可能发生的错误,因此我添加了一个 if
检查以有条件地删除边。最后两件事 the diff 表明我所做的是 (a) 模仿你的 dotmap.Map
class (你可以删除它并取消注释导入),以及 (b) 只添加一个路径到结果集 A
如果成本是有限的。