Python 使用 with 语句的多处理映射不会停止
Python multiprocessing map using with statement does not stop
我正在使用 multiprocessing
python 模块来 运行 并行且不相关的作业,其功能类似于以下示例:
import numpy as np
from multiprocessing import Pool
def myFunction(arg1):
name = "file_%s.npy"%arg1
A = np.load(arg1)
A[A<0] = np.nan
np.save(arg1,A)
if(__name__ == "__main__"):
N = list(range(50))
with Pool(4) as p:
p.map_async(myFunction, N)
p.close() # I tried with and without that statement
p.join() # I tried with and without that statement
DoOtherStuff()
我的问题是函数 DoOtherStuff
从未执行,进程在 top
切换到 sleep
模式,我需要用 ctrl[ 终止它=24=]+C 停止.
有什么建议吗?
你至少有几个问题。首先,您使用的是 map_async()
,它 不会 阻塞,直到任务结果完成。因此,您正在做的是使用 map_async()
开始任务,但随后 立即 关闭并终止池(with
语句调用 Pool.terminate()
退出)。
当您使用 map_async
之类的方法将任务添加到进程池时,它会将任务添加到任务队列,该任务队列由工作线程处理,工作线程从该队列中取出任务并将它们分配给工作进程,可能根据需要生成新进程(实际上有一个单独的线程处理)。
重点是,您有一个竞争条件,您可能在任何任务开始之前就终止了 Pool
。如果您希望您的脚本 阻塞 直到所有任务完成,只需使用 map()
而不是 map_async()
。例如,我这样重写了你的脚本:
import numpy as np
from multiprocessing import Pool
def myFunction(N):
A = np.load(f'file_{N:02}.npy')
A[A<0] = np.nan
np.save(f'file2_{N:02}.npy', A)
def DoOtherStuff():
print('done')
if __name__ == "__main__":
N = range(50)
with Pool(4) as p:
p.map(myFunction, N)
DoOtherStuff()
我不知道你的具体用例是什么,但是如果你确实想要使用map_async()
,那么这个任务可以运行在后台做其他事情时,您必须让 Pool
打开,并管理 map_async()
:
返回的 AsyncResult
对象
result = pool.map_async(myFunction, N)
DoOtherStuff()
# Is my map done yet? If not, we should still block until
# it finishes before ending the process
result.wait()
pool.close()
pool.join()
您可以在链接的文档中查看更多示例。
我不知道为什么在您的尝试中遇到了死锁——我无法重现它。有可能在某个时候出现了一个错误,然后修复了,尽管您也可能在竞争条件下调用未定义的行为,以及在已经 join()
ed 之后在池上调用 terminate()
。至于你为什么 your answer 做了任何事情,有可能通过多次调用 apply_async()
你设法绕过了竞争条件,但这根本不能保证有效。
我正在使用 multiprocessing
python 模块来 运行 并行且不相关的作业,其功能类似于以下示例:
import numpy as np
from multiprocessing import Pool
def myFunction(arg1):
name = "file_%s.npy"%arg1
A = np.load(arg1)
A[A<0] = np.nan
np.save(arg1,A)
if(__name__ == "__main__"):
N = list(range(50))
with Pool(4) as p:
p.map_async(myFunction, N)
p.close() # I tried with and without that statement
p.join() # I tried with and without that statement
DoOtherStuff()
我的问题是函数 DoOtherStuff
从未执行,进程在 top
切换到 sleep
模式,我需要用 ctrl[ 终止它=24=]+C 停止.
有什么建议吗?
你至少有几个问题。首先,您使用的是 map_async()
,它 不会 阻塞,直到任务结果完成。因此,您正在做的是使用 map_async()
开始任务,但随后 立即 关闭并终止池(with
语句调用 Pool.terminate()
退出)。
当您使用 map_async
之类的方法将任务添加到进程池时,它会将任务添加到任务队列,该任务队列由工作线程处理,工作线程从该队列中取出任务并将它们分配给工作进程,可能根据需要生成新进程(实际上有一个单独的线程处理)。
重点是,您有一个竞争条件,您可能在任何任务开始之前就终止了 Pool
。如果您希望您的脚本 阻塞 直到所有任务完成,只需使用 map()
而不是 map_async()
。例如,我这样重写了你的脚本:
import numpy as np
from multiprocessing import Pool
def myFunction(N):
A = np.load(f'file_{N:02}.npy')
A[A<0] = np.nan
np.save(f'file2_{N:02}.npy', A)
def DoOtherStuff():
print('done')
if __name__ == "__main__":
N = range(50)
with Pool(4) as p:
p.map(myFunction, N)
DoOtherStuff()
我不知道你的具体用例是什么,但是如果你确实想要使用map_async()
,那么这个任务可以运行在后台做其他事情时,您必须让 Pool
打开,并管理 map_async()
:
AsyncResult
对象
result = pool.map_async(myFunction, N)
DoOtherStuff()
# Is my map done yet? If not, we should still block until
# it finishes before ending the process
result.wait()
pool.close()
pool.join()
您可以在链接的文档中查看更多示例。
我不知道为什么在您的尝试中遇到了死锁——我无法重现它。有可能在某个时候出现了一个错误,然后修复了,尽管您也可能在竞争条件下调用未定义的行为,以及在已经 join()
ed 之后在池上调用 terminate()
。至于你为什么 your answer 做了任何事情,有可能通过多次调用 apply_async()
你设法绕过了竞争条件,但这根本不能保证有效。