与线程和进程共享管理器列表创建单独的副本

Sharing manager list with threads and process creates seperate copies

我想在进程和线程之间共享一个列表。所以我找到了用于创建列表的 multiprocessing.Manager() 。我写了下面这段代码:

import multiprocessing
import threading
import time

def consumer(l):
    counter = 0
    while counter < 20:
        time.sleep(5)
        l = l[len(l)/2:]
        print "consumer l: {}".format(l)
        counter += 5

def producer(l):
    for i in range(10):
        time.sleep(i)
        l.append(i)
        print "producer l: {}".format(l)

if __name__=="__main__":
    mgr = multiprocessing.Manager()
    l = mgr.list()

    p = threading.Thread(target=producer, args=(l,))
    c = multiprocessing.Process(target=consumer, args=(l,))

    p.start()
    c.start()

    p.join()
    c.join()
    print "done"

它创建一个进程和一个线程共享一个由管理器创建的列表。生产者每 i 秒后追加到列表中,而消费者每 5 秒将列表减半。我预计列表会相同,并且都将在其上运行。但我观察到消费者和生产者只共享列表一次,下次列表分开时。

观察到的输出:

producer l: [0]
producer l: [0, 1]
producer l: [0, 1, 2]
consumer l: [1, 2]
producer l: [0, 1, 2, 3]
producer l: [0, 1, 2, 3, 4]
consumer l: [2]
producer l: [0, 1, 2, 3, 4, 5]
consumer l: [2]
consumer l: [2]
producer l: [0, 1, 2, 3, 4, 5, 6]
producer l: [0, 1, 2, 3, 4, 5, 6, 7]
producer l: [0, 1, 2, 3, 4, 5, 6, 7, 8]
producer l: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
done

应该怎么做才能让他们始终使用相同的列表?

你的问题是这一行:

    l = l[len(l)/2:]

它看起来不错,但它实际上做的是它创建了一个新的l副本并将l分配给这个新对象。这意味着变量在您的第一次消费者执行时发散,消费者中的 l 不再绑定到传递给它的 l

考虑这个版本的消费者:

def consumer(l):
    counter = 0
    while counter < 20:
        time.sleep(5)
        n = len(l)/2
        for _ in range(0,n+1):
            try:
                l.pop()
            except IndexError:
                pass
        print "consumer l: {}".format(l)
        counter += 5

它不优雅,可能不是您要找的东西,但我已经用列表操作方法(在本例中为 pop)替换了将 l 分配给新实例。只要您继续使用 popappendinsertremove 等列表管理方法修改您的 l,就可以了。

注意,如果您将 l 分配给新的东西,您的制作人会遇到同样的问题,但您已经在那里 l.append(),所以那部分没问题。

我把程序改为:

import multiprocessing
import threading
import time

def consumer(l):
    counter = 0
    while counter < 20:
        time.sleep(5)
        del l[0 : len(l)/2+1]                                                                                                                                                   
        print("consumer l address {}".format(hex(id(l))))
        print "consumer l: {}".format(l)
        counter += 5

def producer(l):
    for i in range(10):
        time.sleep(i)
        print("producer l address {}".format(hex(id(l))))
        l.append(i)
        print "producer l: {}".format(l)

if __name__=="__main__":
    mgr = multiprocessing.Manager()
    l = mgr.list()

    print("list address {}".format(hex(id(l))))
    p = multiprocessing.Process(target=producer, args=(l,))
    c = multiprocessing.Process(target=consumer, args=(l,))

    p.start()
    c.start()

    p.join()
    c.join()
    print "done"

请注意,我使用了 del 而不是创建新列表并将其分配给旧列表。我还在上面的代码中添加了地址打印语句。如果你按原样 运行 ,你应该看到列表的地址在任何地方都保持不变,因为我们没有重新创建列表。如果将第 9 行更改回 l = l[len(l)/2:] 和 运行 程序,您应该注意到消费者和生产者列表的地址在第一次操作后发生了变化。