Python 中的多处理:并行化 for 循环以填充 Numpy 数组
Multiprocessing in Python: Parallelize a for loop to fill a Numpy array
我一直在阅读 this one 之类的话题,但其中任何一个似乎都适合我的情况。我正在尝试并行化以下玩具示例,以在 Python:
中使用多处理在 for 循环中填充 Numpy 数组
import numpy as np
from multiprocessing import Pool
import time
def func1(x, y=1):
return x**2 + y
def func2(n, parallel=False):
my_array = np.zeros((n))
# Parallelized version:
if parallel:
pool = Pool(processes=6)
for idx, val in enumerate(range(1, n+1)):
result = pool.apply_async(func1, [val])
my_array[idx] = result.get()
pool.close()
# Not parallelized version:
else:
for i in range(1, n+1):
my_array[i-1] = func1(i)
return my_array
def main():
start = time.time()
my_array = func2(60000)
end = time.time()
print(my_array)
print("Normal time: {}\n".format(end-start))
start_parallel = time.time()
my_array_parallelized = func2(60000, parallel=True)
end_parallel = time.time()
print(my_array_parallelized)
print("Time based on multiprocessing: {}".format(end_parallel-start_parallel))
if __name__ == '__main__':
main()
基于 Multiprocessing 的代码行似乎有效,并为您提供了正确的结果。但是,它比非并行化版本花费的时间要长得多。这是两个版本的输出:
[2.00000e+00 5.00000e+00 1.00000e+01 ... 3.59976e+09 3.59988e+09
3.60000e+09]
Normal time: 0.01605963706970215
[2.00000e+00 5.00000e+00 1.00000e+01 ... 3.59976e+09 3.59988e+09
3.60000e+09]
Time based on multiprocessing: 2.8775112628936768
我的直觉告诉我,这应该是从 pool.apply_async() 捕获结果的更好方法。我究竟做错了什么?实现这一目标的最有效方法是什么?谢谢
创建流程的成本很高。在我的机器上,每个创建的进程至少需要几百微秒。此外,多处理模块在进程之间复制要计算的数据,然后从进程池中收集结果。 进程间通信也很慢。问题是你的计算是微不足道的,可以很快完成,可能比所有引入的开销快得多。 multiprocessing 模块仅在处理非常小的数据集并执行密集计算(与计算数据量相比)时才有用。
希望,当涉及到使用 Numpy 的 numericals 计算时,有一种简单快速的方法可以并行化您的应用程序:Numba JIT。如果您显式使用并行结构(parallel=True
和 prange
),Numba 可以并行化代码。它使用 线程 而不是在 共享内存 中工作的繁重进程。如果您的代码不处理 本机类型和 Numpy 数组 而不是纯 Python 动态对象(列表,大整数,类,等等)。这是一个例子:
import numpy as np
import numba as nb
import time
@nb.njit
def func1(x, y=1):
return x**2 + y
@nb.njit('float64[:](int64)', parallel=True)
def func2(n):
my_array = np.zeros(n)
for i in nb.prange(1, n+1):
my_array[i-1] = func1(i)
return my_array
def main():
start = time.time()
my_array = func2(60000)
end = time.time()
print(my_array)
print("Numba time: {}\n".format(end-start))
if __name__ == '__main__':
main()
因为 Numba 在运行时编译代码,所以它能够将循环完全优化为无操作,在这种情况下导致时间接近 0 秒。
这是@thisisalsomypassword 提出的解决方案,它改进了我最初的提议。也就是说,“在循环内的列表中收集 AsyncResult
对象,然后在每个结果对象上启动所有进程后调用 AsyncResult.get()
”:
import numpy as np
from multiprocessing import Pool
import time
def func1(x, y=1):
time.sleep(0.1)
return x**2 + y
def func2(n, parallel=False):
my_array = np.zeros((n))
# Parallelized version:
if parallel:
pool = Pool(processes=6)
####### HERE COMES THE CHANGE #######
results = [pool.apply_async(func1, [val]) for val in range(1, n+1)]
for idx, val in enumerate(results):
my_array[idx] = val.get()
#######
pool.close()
# Not parallelized version:
else:
for i in range(1, n+1):
my_array[i-1] = func1(i)
return my_array
def main():
start = time.time()
my_array = func2(600)
end = time.time()
print(my_array)
print("Normal time: {}\n".format(end-start))
start_parallel = time.time()
my_array_parallelized = func2(600, parallel=True)
end_parallel = time.time()
print(my_array_parallelized)
print("Time based on multiprocessing: {}".format(end_parallel-start_parallel))
if __name__ == '__main__':
main()
现在可以了。多处理大大减少了时间:
Normal time: 60.107836008071
Time based on multiprocessing: 10.049324989318848
time.sleep(0.1)
已添加到 func1
中,以抵消成为超级琐碎任务的影响。
我一直在阅读 this one 之类的话题,但其中任何一个似乎都适合我的情况。我正在尝试并行化以下玩具示例,以在 Python:
中使用多处理在 for 循环中填充 Numpy 数组import numpy as np
from multiprocessing import Pool
import time
def func1(x, y=1):
return x**2 + y
def func2(n, parallel=False):
my_array = np.zeros((n))
# Parallelized version:
if parallel:
pool = Pool(processes=6)
for idx, val in enumerate(range(1, n+1)):
result = pool.apply_async(func1, [val])
my_array[idx] = result.get()
pool.close()
# Not parallelized version:
else:
for i in range(1, n+1):
my_array[i-1] = func1(i)
return my_array
def main():
start = time.time()
my_array = func2(60000)
end = time.time()
print(my_array)
print("Normal time: {}\n".format(end-start))
start_parallel = time.time()
my_array_parallelized = func2(60000, parallel=True)
end_parallel = time.time()
print(my_array_parallelized)
print("Time based on multiprocessing: {}".format(end_parallel-start_parallel))
if __name__ == '__main__':
main()
基于 Multiprocessing 的代码行似乎有效,并为您提供了正确的结果。但是,它比非并行化版本花费的时间要长得多。这是两个版本的输出:
[2.00000e+00 5.00000e+00 1.00000e+01 ... 3.59976e+09 3.59988e+09
3.60000e+09]
Normal time: 0.01605963706970215
[2.00000e+00 5.00000e+00 1.00000e+01 ... 3.59976e+09 3.59988e+09
3.60000e+09]
Time based on multiprocessing: 2.8775112628936768
我的直觉告诉我,这应该是从 pool.apply_async() 捕获结果的更好方法。我究竟做错了什么?实现这一目标的最有效方法是什么?谢谢
创建流程的成本很高。在我的机器上,每个创建的进程至少需要几百微秒。此外,多处理模块在进程之间复制要计算的数据,然后从进程池中收集结果。 进程间通信也很慢。问题是你的计算是微不足道的,可以很快完成,可能比所有引入的开销快得多。 multiprocessing 模块仅在处理非常小的数据集并执行密集计算(与计算数据量相比)时才有用。
希望,当涉及到使用 Numpy 的 numericals 计算时,有一种简单快速的方法可以并行化您的应用程序:Numba JIT。如果您显式使用并行结构(parallel=True
和 prange
),Numba 可以并行化代码。它使用 线程 而不是在 共享内存 中工作的繁重进程。如果您的代码不处理 本机类型和 Numpy 数组 而不是纯 Python 动态对象(列表,大整数,类,等等)。这是一个例子:
import numpy as np
import numba as nb
import time
@nb.njit
def func1(x, y=1):
return x**2 + y
@nb.njit('float64[:](int64)', parallel=True)
def func2(n):
my_array = np.zeros(n)
for i in nb.prange(1, n+1):
my_array[i-1] = func1(i)
return my_array
def main():
start = time.time()
my_array = func2(60000)
end = time.time()
print(my_array)
print("Numba time: {}\n".format(end-start))
if __name__ == '__main__':
main()
因为 Numba 在运行时编译代码,所以它能够将循环完全优化为无操作,在这种情况下导致时间接近 0 秒。
这是@thisisalsomypassword 提出的解决方案,它改进了我最初的提议。也就是说,“在循环内的列表中收集 AsyncResult
对象,然后在每个结果对象上启动所有进程后调用 AsyncResult.get()
”:
import numpy as np
from multiprocessing import Pool
import time
def func1(x, y=1):
time.sleep(0.1)
return x**2 + y
def func2(n, parallel=False):
my_array = np.zeros((n))
# Parallelized version:
if parallel:
pool = Pool(processes=6)
####### HERE COMES THE CHANGE #######
results = [pool.apply_async(func1, [val]) for val in range(1, n+1)]
for idx, val in enumerate(results):
my_array[idx] = val.get()
#######
pool.close()
# Not parallelized version:
else:
for i in range(1, n+1):
my_array[i-1] = func1(i)
return my_array
def main():
start = time.time()
my_array = func2(600)
end = time.time()
print(my_array)
print("Normal time: {}\n".format(end-start))
start_parallel = time.time()
my_array_parallelized = func2(600, parallel=True)
end_parallel = time.time()
print(my_array_parallelized)
print("Time based on multiprocessing: {}".format(end_parallel-start_parallel))
if __name__ == '__main__':
main()
现在可以了。多处理大大减少了时间:
Normal time: 60.107836008071
Time based on multiprocessing: 10.049324989318848
time.sleep(0.1)
已添加到 func1
中,以抵消成为超级琐碎任务的影响。