Python 多处理池/进程的结果不一致
Python multiprocessing Pool / Process has inconsistent results
我的共享字典对象的条目数不一致。它应该有 500,但大多数测试最终在 450 到 465 之间。我也尝试使用 map
和 Process
而不是 apply_async
。
map
稍微好一些,因为共享词典有大约 480 个条目而不是大约 450 个条目,但它仍然不一致,并不是预期的全部 500 个条目。
我也尝试过使用 Process,但这导致我的共享词典中的条目数量最少——大约 420。
这是使用 apply_async
的完整代码:
import numpy as np
from PIL import Image
from os import listdir
from multiprocessing import Manager, Pool
def processImage(path, d):
image = np.array(Image.open(source + "/" + path))
# Copy lists from shared dictionary since updates don't work otherwise
w = d["width"]
h = d["height"]
w.append(image.shape[0])
h.append(image.shape[1])
d["width"] = w
d["height"] = h
if __name__ == "__main__":
source = "./sample/images"
p = Pool()
m = Manager()
d = m.dict()
d["width"], d["height"] = [], []
for path in listdir(source):
p.apply_async(processImage, (path, d))
p.close()
p.join()
这是使用 map
的完整代码:
def processImage(obj):
image = np.array(Image.open(source + "/" + obj[1]))
w = obj[0]["width"]
h = obj[0]["height"]
w.append(image.shape[0])
h.append(image.shape[1])
obj[0]["width"] = w
obj[0]["height"] = h
if __name__ == "__main__":
source = "./sample/images"
p = Pool()
m = Manager()
d = m.dict()
d["width"], d["height"] = [], []
p.map(processImage, zip(itertools.repeat(d), listdir(source)))
这是使用 Process
的完整代码:
def processImage(path, d):
image = np.array(Image.open(source + "/" + path))
w = d["width"]
h = d["height"]
w.append(image.shape[0])
h.append(image.shape[1])
d["width"] = w
d["height"] = h
if __name__ == "__main__":
source = "./sample/images"
p = Pool()
m = Manager()
d = m.dict()
d["width"], d["height"] = [], []
jobs = []
for img in listdir(source):
p = Process(target=processImage, args=(img, d))
p.start()
jobs.append(p)
for j in jobs:
j.join()
这是竞争条件的典型例子。您需要某种同步原语来更新 d
.
考虑以下情况:有两个线程(在您的情况下是子进程)正在执行 processImage
。第一次得到w
和h
,第二次得到w
和h
。首先向两者附加一些内容并将其放回 d
。第二个对它自己的 w
和 h
做了一些事情,它们不再考虑第一个线程所做的更改,并将其放回 d
。此时,第一个线程所做的更改将丢失。
要解决此问题,您需要保护使用 d
:
的部分代码
from multiprocessing import Manager, Pool, Lock
...
lock = Lock()
...
def processImage(path, d):
image = np.array(Image.open(source + "/" + path))
lock.acquire()
d["width"].append(image.shape[0])
d["height"].append(image.shape[1])
lock.release()
我的共享字典对象的条目数不一致。它应该有 500,但大多数测试最终在 450 到 465 之间。我也尝试使用 map
和 Process
而不是 apply_async
。
map
稍微好一些,因为共享词典有大约 480 个条目而不是大约 450 个条目,但它仍然不一致,并不是预期的全部 500 个条目。
我也尝试过使用 Process,但这导致我的共享词典中的条目数量最少——大约 420。
这是使用 apply_async
的完整代码:
import numpy as np
from PIL import Image
from os import listdir
from multiprocessing import Manager, Pool
def processImage(path, d):
image = np.array(Image.open(source + "/" + path))
# Copy lists from shared dictionary since updates don't work otherwise
w = d["width"]
h = d["height"]
w.append(image.shape[0])
h.append(image.shape[1])
d["width"] = w
d["height"] = h
if __name__ == "__main__":
source = "./sample/images"
p = Pool()
m = Manager()
d = m.dict()
d["width"], d["height"] = [], []
for path in listdir(source):
p.apply_async(processImage, (path, d))
p.close()
p.join()
这是使用 map
的完整代码:
def processImage(obj):
image = np.array(Image.open(source + "/" + obj[1]))
w = obj[0]["width"]
h = obj[0]["height"]
w.append(image.shape[0])
h.append(image.shape[1])
obj[0]["width"] = w
obj[0]["height"] = h
if __name__ == "__main__":
source = "./sample/images"
p = Pool()
m = Manager()
d = m.dict()
d["width"], d["height"] = [], []
p.map(processImage, zip(itertools.repeat(d), listdir(source)))
这是使用 Process
的完整代码:
def processImage(path, d):
image = np.array(Image.open(source + "/" + path))
w = d["width"]
h = d["height"]
w.append(image.shape[0])
h.append(image.shape[1])
d["width"] = w
d["height"] = h
if __name__ == "__main__":
source = "./sample/images"
p = Pool()
m = Manager()
d = m.dict()
d["width"], d["height"] = [], []
jobs = []
for img in listdir(source):
p = Process(target=processImage, args=(img, d))
p.start()
jobs.append(p)
for j in jobs:
j.join()
这是竞争条件的典型例子。您需要某种同步原语来更新 d
.
考虑以下情况:有两个线程(在您的情况下是子进程)正在执行 processImage
。第一次得到w
和h
,第二次得到w
和h
。首先向两者附加一些内容并将其放回 d
。第二个对它自己的 w
和 h
做了一些事情,它们不再考虑第一个线程所做的更改,并将其放回 d
。此时,第一个线程所做的更改将丢失。
要解决此问题,您需要保护使用 d
:
from multiprocessing import Manager, Pool, Lock
...
lock = Lock()
...
def processImage(path, d):
image = np.array(Image.open(source + "/" + path))
lock.acquire()
d["width"].append(image.shape[0])
d["height"].append(image.shape[1])
lock.release()