在 python-多线程中创建线程
Creating threads in python-multithreading
我有以下线程结构:
(1)
/ | \
(2) (3) (4)
| | |
(5) (6) (7)
| | |
(8) (9) (10)
| | |
(11) (12) (13)
\ / |
(14) |
\ /
(15)
如你所见,第一个函数启动了三个线程,然后每个都启动了一个新线程。第14个节点是第11个和第12个节点的连接; 15 号是 13 号和 14 号的交点。
我实现了前两个级别(节点 1、2、3、4)如下:
self.first()
list = ['a','b','c']
with concurrent.futures.ThreadPoolExecutor() as executor:
results = executor.map(self.do_this, list)
for result in results:
print(result)
但是不知道从这里到哪里去。
这是一个像您这样的图的运行器示例。
这个想法是定义一个运行每个任务的函数(do_task
这里),并构建每个任务所需的(直接)依赖关系图。下面的示例 task_deps
反映了上面的图表。
然后 run_graph
函数将使用每个任务 ID 调用 do_task
;该函数应该做任何计算结果所需的事情(如果需要,它可以读取任何先前计算的 results
)。
run_graph
函数最终会return一个{task_id: result}
的字典。
下面的代码输出
Scheduling {1}
Scheduling {2, 3, 4}
Scheduling {5, 6, 7}
Scheduling {8, 9, 10}
Scheduling {11, 12, 13}
Scheduling {14}
Scheduling {15}
据推测,这与你的图表从上到下的结构完全对应,
和
{1: 'Task 1 completed with result 42',
2: 'Task 2 completed with result 84',
3: 'Task 3 completed with result 126',
4: 'Task 4 completed with result 168',
5: 'Task 5 completed with result 210',
6: 'Task 6 completed with result 252',
7: 'Task 7 completed with result 294',
8: 'Task 8 completed with result 336',
9: 'Task 9 completed with result 378',
10: 'Task 10 completed with result 420',
11: 'Task 11 completed with result 462',
12: 'Task 12 completed with result 504',
13: 'Task 13 completed with result 546',
14: 'Task 14 completed with result 588',
15: 'Task 15 completed with result 630'}
import concurrent.futures
def do_task(task_id, results, dependencies):
# sanity check - this function could use `dependencies` and `results` too
assert all(dep in results for dep in dependencies)
return f"Task {task_id} completed with result {task_id * 42}"
def run_graph(task_dependencies, runner):
# Dict for results for each task.
results = {}
# Set of tasks yet to be completed.
todo = set(task_dependencies)
with concurrent.futures.ThreadPoolExecutor() as executor:
# While there are items in the to-do set...
while todo:
# ... figure out what we can immediately execute by
# comparing the dependency set to the result keys we already have
# (i.e. the complement of the to-do set)
next_tasks = {
task_id
for (task_id, deps) in task_dependencies.items()
if task_id in todo and set(deps) <= set(results)
}
# If there are no next tasks we could schedule, it means the dependency
# graph is incorrect (or at the very least incompleteable).
if not next_tasks:
raise RuntimeError(
f"Unable to schedule tasks, bad dependencies? Todo: {todo}"
)
print("Scheduling", next_tasks)
# Submit tasks for execution in parallel. `futures` will be a list of
# 2-tuples (task_id, future).
futures = [
(
task_id,
executor.submit(
runner, task_id, results, task_dependencies[task_id]
),
)
for task_id in next_tasks
]
# Loop over the futures, waiting for their results; when a future
# finishes, save the result value and remove that task from the
# to-do set.
for (task_id, future) in futures:
results[task_id] = future.result()
todo.remove(task_id)
# Once the while loop finishes, we have our results.
return results
if __name__ == "__main__":
task_deps = {
1: (),
2: (1,),
3: (1,),
4: (1,),
5: (2,),
6: (3,),
7: (4,),
8: (5,),
9: (6,),
10: (7,),
11: (8,),
12: (9,),
13: (10,),
14: (11, 12),
15: (14, 13),
}
result = run_graph(task_deps, do_task)
print(result)
我有以下线程结构:
(1)
/ | \
(2) (3) (4)
| | |
(5) (6) (7)
| | |
(8) (9) (10)
| | |
(11) (12) (13)
\ / |
(14) |
\ /
(15)
如你所见,第一个函数启动了三个线程,然后每个都启动了一个新线程。第14个节点是第11个和第12个节点的连接; 15 号是 13 号和 14 号的交点。 我实现了前两个级别(节点 1、2、3、4)如下:
self.first()
list = ['a','b','c']
with concurrent.futures.ThreadPoolExecutor() as executor:
results = executor.map(self.do_this, list)
for result in results:
print(result)
但是不知道从这里到哪里去。
这是一个像您这样的图的运行器示例。
这个想法是定义一个运行每个任务的函数(do_task
这里),并构建每个任务所需的(直接)依赖关系图。下面的示例 task_deps
反映了上面的图表。
然后 run_graph
函数将使用每个任务 ID 调用 do_task
;该函数应该做任何计算结果所需的事情(如果需要,它可以读取任何先前计算的 results
)。
run_graph
函数最终会return一个{task_id: result}
的字典。
下面的代码输出
Scheduling {1}
Scheduling {2, 3, 4}
Scheduling {5, 6, 7}
Scheduling {8, 9, 10}
Scheduling {11, 12, 13}
Scheduling {14}
Scheduling {15}
据推测,这与你的图表从上到下的结构完全对应, 和
{1: 'Task 1 completed with result 42',
2: 'Task 2 completed with result 84',
3: 'Task 3 completed with result 126',
4: 'Task 4 completed with result 168',
5: 'Task 5 completed with result 210',
6: 'Task 6 completed with result 252',
7: 'Task 7 completed with result 294',
8: 'Task 8 completed with result 336',
9: 'Task 9 completed with result 378',
10: 'Task 10 completed with result 420',
11: 'Task 11 completed with result 462',
12: 'Task 12 completed with result 504',
13: 'Task 13 completed with result 546',
14: 'Task 14 completed with result 588',
15: 'Task 15 completed with result 630'}
import concurrent.futures
def do_task(task_id, results, dependencies):
# sanity check - this function could use `dependencies` and `results` too
assert all(dep in results for dep in dependencies)
return f"Task {task_id} completed with result {task_id * 42}"
def run_graph(task_dependencies, runner):
# Dict for results for each task.
results = {}
# Set of tasks yet to be completed.
todo = set(task_dependencies)
with concurrent.futures.ThreadPoolExecutor() as executor:
# While there are items in the to-do set...
while todo:
# ... figure out what we can immediately execute by
# comparing the dependency set to the result keys we already have
# (i.e. the complement of the to-do set)
next_tasks = {
task_id
for (task_id, deps) in task_dependencies.items()
if task_id in todo and set(deps) <= set(results)
}
# If there are no next tasks we could schedule, it means the dependency
# graph is incorrect (or at the very least incompleteable).
if not next_tasks:
raise RuntimeError(
f"Unable to schedule tasks, bad dependencies? Todo: {todo}"
)
print("Scheduling", next_tasks)
# Submit tasks for execution in parallel. `futures` will be a list of
# 2-tuples (task_id, future).
futures = [
(
task_id,
executor.submit(
runner, task_id, results, task_dependencies[task_id]
),
)
for task_id in next_tasks
]
# Loop over the futures, waiting for their results; when a future
# finishes, save the result value and remove that task from the
# to-do set.
for (task_id, future) in futures:
results[task_id] = future.result()
todo.remove(task_id)
# Once the while loop finishes, we have our results.
return results
if __name__ == "__main__":
task_deps = {
1: (),
2: (1,),
3: (1,),
4: (1,),
5: (2,),
6: (3,),
7: (4,),
8: (5,),
9: (6,),
10: (7,),
11: (8,),
12: (9,),
13: (10,),
14: (11, 12),
15: (14, 13),
}
result = run_graph(task_deps, do_task)
print(result)