如何在python多进程中实现reduce操作?
How to implement a reduce operation in python multiprocessing?
我是 OpenMP 和 C++ 方面的并行程序员专家。现在我正在尝试了解 python 和 multiprocessing
库中的并行性。
特别是,我试图并行化这个简单的代码,它随机递增一个数组 100 次:
from random import randint
import multiprocessing as mp
import numpy as np
def random_add(x):
x[randint(0,len(x)-1)] += 1
if __name__ == "__main__":
print("Serial")
x = np.zeros(8)
for i in range(100):
random_add(x)
print(x)
print("Parallel")
x = np.zeros(8)
processes = [mp.Process(target = random_add, args=(x,)) for i in range(100)]
for p in processes:
p.start()
print(x)
但是,这是以下输出:
Serial
[ 9. 18. 11. 15. 16. 8. 10. 13.]
Parallel
[ 0. 0. 0. 0. 0. 0. 0. 0.]
为什么会这样?好吧,我想我有一个解释:因为我们处于多处理(而不是多线程)中,每个进程都是他自己的内存部分,即每个派生的进程都有他自己的 x
,它被销毁一次 random_add(x)
被终止。作为结论,主程序中的 x
从未真正更新过。
这是正确的吗?如果是这样,我该如何解决这个问题?简而言之,我需要一个全局化简操作,将所有 random_add
调用的结果相加,获得与串行版本相同的结果。
您应该在您的案例中使用共享内存对象:
from random import randint
import multiprocessing as mp
def random_add(x):
x[randint(0,len(x)-1)] += 1
if __name__ == "__main__":
print("Serial")
x = [0]*8
for i in range(100):
random_add(x)
print(x)
print("Parallel")
x = mp.Array('i', range(8))
processes = [mp.Process(target = random_add, args=(x,)) for i in range(100)]
for p in processes:
p.start()
print(x[:])
为了代码清晰,我已经将 numpy 数组更改为序数列表
我是 OpenMP 和 C++ 方面的并行程序员专家。现在我正在尝试了解 python 和 multiprocessing
库中的并行性。
特别是,我试图并行化这个简单的代码,它随机递增一个数组 100 次:
from random import randint
import multiprocessing as mp
import numpy as np
def random_add(x):
x[randint(0,len(x)-1)] += 1
if __name__ == "__main__":
print("Serial")
x = np.zeros(8)
for i in range(100):
random_add(x)
print(x)
print("Parallel")
x = np.zeros(8)
processes = [mp.Process(target = random_add, args=(x,)) for i in range(100)]
for p in processes:
p.start()
print(x)
但是,这是以下输出:
Serial
[ 9. 18. 11. 15. 16. 8. 10. 13.]
Parallel
[ 0. 0. 0. 0. 0. 0. 0. 0.]
为什么会这样?好吧,我想我有一个解释:因为我们处于多处理(而不是多线程)中,每个进程都是他自己的内存部分,即每个派生的进程都有他自己的 x
,它被销毁一次 random_add(x)
被终止。作为结论,主程序中的 x
从未真正更新过。
这是正确的吗?如果是这样,我该如何解决这个问题?简而言之,我需要一个全局化简操作,将所有 random_add
调用的结果相加,获得与串行版本相同的结果。
您应该在您的案例中使用共享内存对象:
from random import randint
import multiprocessing as mp
def random_add(x):
x[randint(0,len(x)-1)] += 1
if __name__ == "__main__":
print("Serial")
x = [0]*8
for i in range(100):
random_add(x)
print(x)
print("Parallel")
x = mp.Array('i', range(8))
processes = [mp.Process(target = random_add, args=(x,)) for i in range(100)]
for p in processes:
p.start()
print(x[:])
为了代码清晰,我已经将 numpy 数组更改为序数列表