Python 多处理共享变量不稳定行为

Python Multiprocessing shared variables erratic behavior

据我所知,下面的简单代码应该总是在最后打印出“0”。但是,当 运行 它与 "lock = True" 时,它经常打印出其他正数或负数。

  import multiprocessing as mp
  import sys
  import time

  num = mp.Value('d', 0.0, lock = False)


  def func1():
      global num
      print ('start func1')
      #While num.value < 100000:
      for x in range(1000):
          num.value += 1
          #print(num.value)
      print ('end func1')

  def func2():
      global num
      print ('start func2')
      #while num.value > -10000:
      for x in range(1000):
          num.value -= 1
          #print(num.value)
      print ('end func2')

if __name__=='__main__':
    ctx = mp.get_context('fork')
    p1 =  ctx.Process(target=func1)
    p1.start()
    p2 = ctx.Process(target=func2)
    p2.start()
    p1.join()
    p2.join()
    sys.stdout.flush()
    time.sleep(25)
    print(num.value)

谁能解释一下?

澄清一下:当锁定设置为 "False" 时,它的行为符合预期,打印出“0”,但是,当它设置为 "True" 时,它通常不会。

对于 'range' 的较大值,这更 noticeable/happens 更常见。

在两个平台(Mac OSx 和 Ubuntu 14.04.01)上测试了这个,都使用 python 3.6。

multiprocessing.Valuedocs 对此非常明确:

Operations like += which involve a read and write are not atomic. So if, for instance, you want to atomically increment a shared value it is insufficient to just do

counter.value += 1

Assuming the associated lock is recursive (which it is by default) you can instead do

with counter.get_lock():
    counter.value += 1

根据您的评论,这不是“1000 次增量”。这是 1000 次迭代:

# Take lock on num.value
temp_value = num.value    # (1)
# release lock on num.value (anything can modify it now)
temp_value += 1           # (2)
# Take lock on num.value
num.value = temp_value    # (3)
# release lock on num.value

这就是它说 += 不是原子的意思。

如果 num.value 在第 2 行中被另一个进程修改,那么第 3 行将错误的值写入 num.value


为了举例说明如何更好地处理您正在做的事情,这里有一个使用队列的版本,可确保一切都在锁定步骤中保持滴答作响:

import multiprocessing as mp
import queue
import sys


# An increment process. Takes a value, increments it, passes it along
def func1(in_queue: mp.Queue, out_queue: mp.Queue):
    print('start func1')

    for x in range(1000):
        n = in_queue.get()
        n += 1
        print("inc", n)
        out_queue.put(n)
    print('end func1')


# An decrement process. Takes a value, decrements it, passes it along
def func2(in_queue: mp.Queue, out_queue: mp.Queue):
    print('start func2')
    for x in range(1000):
        n = in_queue.get()
        n -= 1
        print("dec", n)
        out_queue.put(n)
    print('end func2')


if __name__ == '__main__':
    ctx = mp.get_context('fork')

    queue1 = mp.Queue()
    queue2 = mp.Queue()

    # Make two processes and tie their queues back to back. They hand a value
    # back and forth until they've run their course.
    p1 = ctx.Process(target=func1, args=(queue1, queue2,))
    p1.start()
    p2 = ctx.Process(target=func2, args=(queue2, queue1,))
    p2.start()

    # Get it started
    queue1.put(0)

    # Wait from them to finish
    p1.join()
    p2.join()

    # Since this is a looping process, the result is on the queue we put() to.
    # (Using block=False because I'd rather throw an exception if something
    # went wrong rather than deadlock.)
    num = queue1.get(block=False)

    print("FINAL=%d" % num)

这是一个非常简单的例子。在更健壮的代码中,您需要考虑在失败情况下会发生什么。例如,如果 p1 抛出异常,p2 将死锁等待其值。在许多方面这是一件好事,因为这意味着您可以通过使用相同队列启动新的 p1 进程来恢复系统。如果您想进一步研究,这种处理并发的方式称为Actor model