多处理值因锁定而挂起
multiprocessing value hangs with lock
我已阅读文档 here,似乎要确保值不会挂起,我们需要使用锁。我就是这么做的,但它仍然卡住了:
from multiprocessing import Process, Value, freeze_support, Lock
nb_threads = 3
nbloops = 10
v = Value('i', 0)
def run_process(lock):
global nbloops
i = 0
while i < nbloops:
# do stuff
i += 1
with lock:
v.value += 1
# wait for all the processes to finish doing something
while v.value % nb_threads != 0:
pass
if __name__ == '__main__':
freeze_support()
processes = []
lock = Lock()
for i in range(0, 3):
processes.append( Process( target=run_process, args=(lock,) ) )
for process in processes:
process.start()
for process in processes:
process.join()
我已经尝试使用锁访问该值,但它仍然阻塞:
val = -1
while val % nb_threads != 0:
with lock:
val = v.value
我该如何解决这个问题?谢谢
您的代码存在竞争条件;您不保证所有三个进程在允许它们继续前进之前都脱离 while v.value % nb_threads != 0
循环。这允许一个或两个进程继续进行 while i < nbloops
循环的下一次迭代,增加 v.value
,然后防止剩余的 process/processes 突破它们自己的 while v.value % nb_threads != 0
循环。您尝试执行的同步类型最好由 Barrier
处理,而不是循环并反复检查值。
此外,multiprocessing.Value
默认也有一个内置的同步,你可以通过调用Value.get_lock
显式访问它使用的Lock
,所以没有必要为每个进程显式设置一个 Lock
。放在一起,你有:
from multiprocessing import Process, Value, freeze_support, Lock, Barrier
nb_threads = 3
nbloops = 10
v = Value('i', 0)
def run_process(barrier):
global nbloops
i = 0
while i < nbloops:
# do stuff
i += 1
with v.get_lock():
v.value += 1
# wait for all the processes to finish doing something
out = barrier.wait()
if __name__ == '__main__':
freeze_support()
processes = []
b = Barrier(nb_threads)
for i in range(0, nb_threads):
processes.append( Process( target=run_process, args=(b,) ) )
for process in processes:
process.start()
for process in processes:
process.join()
Barrier
保证在所有进程都调用 Barrier.wait()
之前,没有进程可以继续进行循环的下一次迭代,此时所有三个进程都能够同时进行。 Barrier
对象支持重用,因此可以在每次迭代时安全地调用它。
我已阅读文档 here,似乎要确保值不会挂起,我们需要使用锁。我就是这么做的,但它仍然卡住了:
from multiprocessing import Process, Value, freeze_support, Lock
nb_threads = 3
nbloops = 10
v = Value('i', 0)
def run_process(lock):
global nbloops
i = 0
while i < nbloops:
# do stuff
i += 1
with lock:
v.value += 1
# wait for all the processes to finish doing something
while v.value % nb_threads != 0:
pass
if __name__ == '__main__':
freeze_support()
processes = []
lock = Lock()
for i in range(0, 3):
processes.append( Process( target=run_process, args=(lock,) ) )
for process in processes:
process.start()
for process in processes:
process.join()
我已经尝试使用锁访问该值,但它仍然阻塞:
val = -1
while val % nb_threads != 0:
with lock:
val = v.value
我该如何解决这个问题?谢谢
您的代码存在竞争条件;您不保证所有三个进程在允许它们继续前进之前都脱离 while v.value % nb_threads != 0
循环。这允许一个或两个进程继续进行 while i < nbloops
循环的下一次迭代,增加 v.value
,然后防止剩余的 process/processes 突破它们自己的 while v.value % nb_threads != 0
循环。您尝试执行的同步类型最好由 Barrier
处理,而不是循环并反复检查值。
此外,multiprocessing.Value
默认也有一个内置的同步,你可以通过调用Value.get_lock
显式访问它使用的Lock
,所以没有必要为每个进程显式设置一个 Lock
。放在一起,你有:
from multiprocessing import Process, Value, freeze_support, Lock, Barrier
nb_threads = 3
nbloops = 10
v = Value('i', 0)
def run_process(barrier):
global nbloops
i = 0
while i < nbloops:
# do stuff
i += 1
with v.get_lock():
v.value += 1
# wait for all the processes to finish doing something
out = barrier.wait()
if __name__ == '__main__':
freeze_support()
processes = []
b = Barrier(nb_threads)
for i in range(0, nb_threads):
processes.append( Process( target=run_process, args=(b,) ) )
for process in processes:
process.start()
for process in processes:
process.join()
Barrier
保证在所有进程都调用 Barrier.wait()
之前,没有进程可以继续进行循环的下一次迭代,此时所有三个进程都能够同时进行。 Barrier
对象支持重用,因此可以在每次迭代时安全地调用它。