Python 在多处理进程之间共享双端队列

Python sharing a deque between multiprocessing processes

我一直在看以下问题,但没有任何运气:

Python sharing a dictionary between parallel processes

multiprocessing: sharing a large read-only object between processes?

multiprocessing in python - sharing large object (e.g. pandas dataframe) between multiple processes

我已经编写了一个非常基本的测试文件来说明我正在尝试做什么:

from collections import deque
from multiprocessing import Process
import numpy as np


class TestClass:
    def __init__(self):
        self.mem = deque(maxlen=4)
        self.process = Process(target=self.run)

    def run(self):
        while True:
            self.mem.append(np.array([0, 1, 2, 3, 4]))


def print_values(x):
    while True:
        print(x)


test = TestClass()
process = Process(target=print_values(test.mem))

test.process.start()
process.start()

目前输出如下:

deque([], maxlen=4)

如何从运行 "print_values" 的主代码或进程访问内存值?

遗憾的是 multiprocessing.Manager() 不支持 deque 但它确实适用于 listdictQueueValueArraylist 非常接近,所以我在下面的示例中使用了它..

from multiprocessing import Process, Manager, Lock
import numpy as np

class TestClass:
    def __init__(self):
        self.maxlen = 4
        self.manager = Manager()
        self.mem = self.manager.list()
        self.lock = self.manager.Lock()
        self.process = Process(target=self.run, args=(self.mem, self.lock))

    def run(self, mem, lock):
        while True:
            array = np.random.randint(0, high=10, size=5)
            with lock:
                if len(mem) >= self.maxlen:
                    mem.pop(0)
                mem.append(array)

def print_values(mem, lock):
    while True:
        with lock:
            print mem

test = TestClass()
print_process = Process(target=print_values, args=(test.mem, test.lock))
test.process.start()
print_process.start()

test.process.join()
print_process.join()

你必须小心使用管理器对象。您可以像使用它们引用的对象一样使用它们,但是您不能像... mem = mem[-4:] 那样截断值,因为您正在更改引用的对象。

至于编码风格,我可能会将 Manager 对象移到 class 之外,或者将 print_values 函数移到其中,但举个例子,这是可行的。如果你移动东西,请注意你不能在 run 方法中直接使用 self.mem。您需要在启动进程时传入它,否则 python 在后台执行的 fork 将创建一个新实例并且不会共享。

希望这适合你的情况,如果不适合,我们可以尝试稍微调整一下。

因此,结合@bivouac0 提供的代码和@Marijn Pieters 发表的评论,我想出了以下解决方案:

from multiprocessing import Process, Manager, Queue


class testClass:
    def __init__(self, maxlen=4):
        self.mem = Queue(maxsize=maxlen)
        self.process = Process(target=self.run)

    def run(self):
        i = 0

        while True:
            self.mem.empty()
            while not self.mem.full():
                self.mem.put(i)
                i += 1


def print_values(queue):
    while True:
        values = queue.get()
        print(values)


if __name__ == "__main__":
    test = testClass()
    print_process = Process(target=print_values, args=(test.mem,))

    test.process.start()
    print_process.start()

    test.process.join()
    print_process.join()