Python 多处理共享变量不稳定行为
Python Multiprocessing shared variables erratic behavior
据我所知,下面的简单代码应该总是在最后打印出“0”。但是,当 运行 它与 "lock = True" 时,它经常打印出其他正数或负数。
import multiprocessing as mp
import sys
import time
num = mp.Value('d', 0.0, lock = False)
def func1():
global num
print ('start func1')
#While num.value < 100000:
for x in range(1000):
num.value += 1
#print(num.value)
print ('end func1')
def func2():
global num
print ('start func2')
#while num.value > -10000:
for x in range(1000):
num.value -= 1
#print(num.value)
print ('end func2')
if __name__=='__main__':
ctx = mp.get_context('fork')
p1 = ctx.Process(target=func1)
p1.start()
p2 = ctx.Process(target=func2)
p2.start()
p1.join()
p2.join()
sys.stdout.flush()
time.sleep(25)
print(num.value)
谁能解释一下?
澄清一下:当锁定设置为 "False" 时,它的行为符合预期,打印出“0”,但是,当它设置为 "True" 时,它通常不会。
对于 'range' 的较大值,这更 noticeable/happens 更常见。
在两个平台(Mac OSx 和 Ubuntu 14.04.01)上测试了这个,都使用 python 3.6。
multiprocessing.Value
的 docs 对此非常明确:
Operations like += which involve a read and write are not atomic. So if, for instance, you want to atomically increment a shared value it is insufficient to just do
counter.value += 1
Assuming the associated lock is recursive (which it is by default) you can instead do
with counter.get_lock():
counter.value += 1
根据您的评论,这不是“1000 次增量”。这是 1000 次迭代:
# Take lock on num.value
temp_value = num.value # (1)
# release lock on num.value (anything can modify it now)
temp_value += 1 # (2)
# Take lock on num.value
num.value = temp_value # (3)
# release lock on num.value
这就是它说 +=
不是原子的意思。
如果 num.value
在第 2 行中被另一个进程修改,那么第 3 行将错误的值写入 num.value
。
为了举例说明如何更好地处理您正在做的事情,这里有一个使用队列的版本,可确保一切都在锁定步骤中保持滴答作响:
import multiprocessing as mp
import queue
import sys
# An increment process. Takes a value, increments it, passes it along
def func1(in_queue: mp.Queue, out_queue: mp.Queue):
print('start func1')
for x in range(1000):
n = in_queue.get()
n += 1
print("inc", n)
out_queue.put(n)
print('end func1')
# An decrement process. Takes a value, decrements it, passes it along
def func2(in_queue: mp.Queue, out_queue: mp.Queue):
print('start func2')
for x in range(1000):
n = in_queue.get()
n -= 1
print("dec", n)
out_queue.put(n)
print('end func2')
if __name__ == '__main__':
ctx = mp.get_context('fork')
queue1 = mp.Queue()
queue2 = mp.Queue()
# Make two processes and tie their queues back to back. They hand a value
# back and forth until they've run their course.
p1 = ctx.Process(target=func1, args=(queue1, queue2,))
p1.start()
p2 = ctx.Process(target=func2, args=(queue2, queue1,))
p2.start()
# Get it started
queue1.put(0)
# Wait from them to finish
p1.join()
p2.join()
# Since this is a looping process, the result is on the queue we put() to.
# (Using block=False because I'd rather throw an exception if something
# went wrong rather than deadlock.)
num = queue1.get(block=False)
print("FINAL=%d" % num)
这是一个非常简单的例子。在更健壮的代码中,您需要考虑在失败情况下会发生什么。例如,如果 p1
抛出异常,p2
将死锁等待其值。在许多方面这是一件好事,因为这意味着您可以通过使用相同队列启动新的 p1
进程来恢复系统。如果您想进一步研究,这种处理并发的方式称为Actor model。
据我所知,下面的简单代码应该总是在最后打印出“0”。但是,当 运行 它与 "lock = True" 时,它经常打印出其他正数或负数。
import multiprocessing as mp
import sys
import time
num = mp.Value('d', 0.0, lock = False)
def func1():
global num
print ('start func1')
#While num.value < 100000:
for x in range(1000):
num.value += 1
#print(num.value)
print ('end func1')
def func2():
global num
print ('start func2')
#while num.value > -10000:
for x in range(1000):
num.value -= 1
#print(num.value)
print ('end func2')
if __name__=='__main__':
ctx = mp.get_context('fork')
p1 = ctx.Process(target=func1)
p1.start()
p2 = ctx.Process(target=func2)
p2.start()
p1.join()
p2.join()
sys.stdout.flush()
time.sleep(25)
print(num.value)
谁能解释一下?
澄清一下:当锁定设置为 "False" 时,它的行为符合预期,打印出“0”,但是,当它设置为 "True" 时,它通常不会。
对于 'range' 的较大值,这更 noticeable/happens 更常见。
在两个平台(Mac OSx 和 Ubuntu 14.04.01)上测试了这个,都使用 python 3.6。
multiprocessing.Value
的 docs 对此非常明确:
Operations like += which involve a read and write are not atomic. So if, for instance, you want to atomically increment a shared value it is insufficient to just do
counter.value += 1
Assuming the associated lock is recursive (which it is by default) you can instead do
with counter.get_lock(): counter.value += 1
根据您的评论,这不是“1000 次增量”。这是 1000 次迭代:
# Take lock on num.value
temp_value = num.value # (1)
# release lock on num.value (anything can modify it now)
temp_value += 1 # (2)
# Take lock on num.value
num.value = temp_value # (3)
# release lock on num.value
这就是它说 +=
不是原子的意思。
如果 num.value
在第 2 行中被另一个进程修改,那么第 3 行将错误的值写入 num.value
。
为了举例说明如何更好地处理您正在做的事情,这里有一个使用队列的版本,可确保一切都在锁定步骤中保持滴答作响:
import multiprocessing as mp
import queue
import sys
# An increment process. Takes a value, increments it, passes it along
def func1(in_queue: mp.Queue, out_queue: mp.Queue):
print('start func1')
for x in range(1000):
n = in_queue.get()
n += 1
print("inc", n)
out_queue.put(n)
print('end func1')
# An decrement process. Takes a value, decrements it, passes it along
def func2(in_queue: mp.Queue, out_queue: mp.Queue):
print('start func2')
for x in range(1000):
n = in_queue.get()
n -= 1
print("dec", n)
out_queue.put(n)
print('end func2')
if __name__ == '__main__':
ctx = mp.get_context('fork')
queue1 = mp.Queue()
queue2 = mp.Queue()
# Make two processes and tie their queues back to back. They hand a value
# back and forth until they've run their course.
p1 = ctx.Process(target=func1, args=(queue1, queue2,))
p1.start()
p2 = ctx.Process(target=func2, args=(queue2, queue1,))
p2.start()
# Get it started
queue1.put(0)
# Wait from them to finish
p1.join()
p2.join()
# Since this is a looping process, the result is on the queue we put() to.
# (Using block=False because I'd rather throw an exception if something
# went wrong rather than deadlock.)
num = queue1.get(block=False)
print("FINAL=%d" % num)
这是一个非常简单的例子。在更健壮的代码中,您需要考虑在失败情况下会发生什么。例如,如果 p1
抛出异常,p2
将死锁等待其值。在许多方面这是一件好事,因为这意味着您可以通过使用相同队列启动新的 p1
进程来恢复系统。如果您想进一步研究,这种处理并发的方式称为Actor model。