multiprocessing.Pool.map 不能并行
multiprocessing.Pool.map does not work in parallel
我想要达到的目标:
Parallelize 每次调用产生多个线程的函数,如下所示:
- PROCESS01 -> 16 Threads
- PROCESS02 -> 16 Threads
- ...
- PROCESSn -> 16 Threads
代码:
with multiprocessing.Pool(4) as process_pool:
results = process_pool.map(do_stuff, [drain_queue()])
其中 drain_queue()
return 项目列表和
do_stuff(item_list):
print('> PID: ' + str(os.getpid()))
with concurrent.futures.ThreadPoolExecutor(max_workers=16) as executor:
result_dict = {executor.submit(thread_function, item): item for item in item_list}
for future in concurrent.futures.as_completed(result_dict):
pass
并且 thread_function()
处理传递给它的每个项目。
然而,当执行代码时输出如下:
> PID: 1000
(WAITS UNTIL THE PROCESS FINISHES, THEN START NEXT)
> PID: 2000
(WAITS UNTIL THE PROCESS FINISHES, THEN START NEXT)
> PID: 3000
(WAITS UNTIL THE PROCESS FINISHES, THEN START NEXT)
> PID: 3000
(WAITS UNTIL THE PROCESS FINISHES, THEN START NEXT)
Here is a screenshot of Task Manager
我在这里错过了什么?我不明白为什么不能按预期工作。
谢谢!
我找到问题了。 map()
的第二个参数应该是一个可迭代的,在我的例子中是 一个包含 单个对象的列表 .
有什么问题吗? 这个:[drain_queue()]
,它生成一个包含单个对象的列表。
在这种情况下,代码
with multiprocessing.Pool(4) as process_pool:
results = process_pool.map(do_stuff, [drain_queue()])
强制multiprocessing.Pool.map
到"distribute"单个对象到单个进程,即使它创建n
个进程,工作仍将由一个进程完成。值得庆幸的是,这与 GIL 限制无关。
我想要达到的目标:
Parallelize 每次调用产生多个线程的函数,如下所示:
- PROCESS01 -> 16 Threads
- PROCESS02 -> 16 Threads
- ...
- PROCESSn -> 16 Threads
代码:
with multiprocessing.Pool(4) as process_pool:
results = process_pool.map(do_stuff, [drain_queue()])
其中 drain_queue()
return 项目列表和
do_stuff(item_list):
print('> PID: ' + str(os.getpid()))
with concurrent.futures.ThreadPoolExecutor(max_workers=16) as executor:
result_dict = {executor.submit(thread_function, item): item for item in item_list}
for future in concurrent.futures.as_completed(result_dict):
pass
并且 thread_function()
处理传递给它的每个项目。
然而,当执行代码时输出如下:
> PID: 1000
(WAITS UNTIL THE PROCESS FINISHES, THEN START NEXT)
> PID: 2000
(WAITS UNTIL THE PROCESS FINISHES, THEN START NEXT)
> PID: 3000
(WAITS UNTIL THE PROCESS FINISHES, THEN START NEXT)
> PID: 3000
(WAITS UNTIL THE PROCESS FINISHES, THEN START NEXT)
Here is a screenshot of Task Manager
我在这里错过了什么?我不明白为什么不能按预期工作。 谢谢!
我找到问题了。 map()
的第二个参数应该是一个可迭代的,在我的例子中是 一个包含 单个对象的列表 .
有什么问题吗? 这个:[drain_queue()]
,它生成一个包含单个对象的列表。
在这种情况下,代码
with multiprocessing.Pool(4) as process_pool:
results = process_pool.map(do_stuff, [drain_queue()])
强制multiprocessing.Pool.map
到"distribute"单个对象到单个进程,即使它创建n
个进程,工作仍将由一个进程完成。值得庆幸的是,这与 GIL 限制无关。