Python 在多处理进程之间共享双端队列
Python sharing a deque between multiprocessing processes
我一直在看以下问题,但没有任何运气:
Python sharing a dictionary between parallel processes
multiprocessing: sharing a large read-only object between processes?
multiprocessing in python - sharing large object (e.g. pandas dataframe) between multiple processes
我已经编写了一个非常基本的测试文件来说明我正在尝试做什么:
from collections import deque
from multiprocessing import Process
import numpy as np
class TestClass:
def __init__(self):
self.mem = deque(maxlen=4)
self.process = Process(target=self.run)
def run(self):
while True:
self.mem.append(np.array([0, 1, 2, 3, 4]))
def print_values(x):
while True:
print(x)
test = TestClass()
process = Process(target=print_values(test.mem))
test.process.start()
process.start()
目前输出如下:
deque([], maxlen=4)
如何从运行 "print_values" 的主代码或进程访问内存值?
遗憾的是 multiprocessing.Manager()
不支持 deque
但它确实适用于 list
、dict
、Queue
、Value
和 Array
。 list
非常接近,所以我在下面的示例中使用了它..
from multiprocessing import Process, Manager, Lock
import numpy as np
class TestClass:
def __init__(self):
self.maxlen = 4
self.manager = Manager()
self.mem = self.manager.list()
self.lock = self.manager.Lock()
self.process = Process(target=self.run, args=(self.mem, self.lock))
def run(self, mem, lock):
while True:
array = np.random.randint(0, high=10, size=5)
with lock:
if len(mem) >= self.maxlen:
mem.pop(0)
mem.append(array)
def print_values(mem, lock):
while True:
with lock:
print mem
test = TestClass()
print_process = Process(target=print_values, args=(test.mem, test.lock))
test.process.start()
print_process.start()
test.process.join()
print_process.join()
你必须小心使用管理器对象。您可以像使用它们引用的对象一样使用它们,但是您不能像... mem = mem[-4:]
那样截断值,因为您正在更改引用的对象。
至于编码风格,我可能会将 Manager
对象移到 class 之外,或者将 print_values
函数移到其中,但举个例子,这是可行的。如果你移动东西,请注意你不能在 run
方法中直接使用 self.mem
。您需要在启动进程时传入它,否则 python 在后台执行的 fork
将创建一个新实例并且不会共享。
希望这适合你的情况,如果不适合,我们可以尝试稍微调整一下。
因此,结合@bivouac0 提供的代码和@Marijn Pieters 发表的评论,我想出了以下解决方案:
from multiprocessing import Process, Manager, Queue
class testClass:
def __init__(self, maxlen=4):
self.mem = Queue(maxsize=maxlen)
self.process = Process(target=self.run)
def run(self):
i = 0
while True:
self.mem.empty()
while not self.mem.full():
self.mem.put(i)
i += 1
def print_values(queue):
while True:
values = queue.get()
print(values)
if __name__ == "__main__":
test = testClass()
print_process = Process(target=print_values, args=(test.mem,))
test.process.start()
print_process.start()
test.process.join()
print_process.join()
我一直在看以下问题,但没有任何运气:
Python sharing a dictionary between parallel processes
multiprocessing: sharing a large read-only object between processes?
multiprocessing in python - sharing large object (e.g. pandas dataframe) between multiple processes
我已经编写了一个非常基本的测试文件来说明我正在尝试做什么:
from collections import deque
from multiprocessing import Process
import numpy as np
class TestClass:
def __init__(self):
self.mem = deque(maxlen=4)
self.process = Process(target=self.run)
def run(self):
while True:
self.mem.append(np.array([0, 1, 2, 3, 4]))
def print_values(x):
while True:
print(x)
test = TestClass()
process = Process(target=print_values(test.mem))
test.process.start()
process.start()
目前输出如下:
deque([], maxlen=4)
如何从运行 "print_values" 的主代码或进程访问内存值?
遗憾的是 multiprocessing.Manager()
不支持 deque
但它确实适用于 list
、dict
、Queue
、Value
和 Array
。 list
非常接近,所以我在下面的示例中使用了它..
from multiprocessing import Process, Manager, Lock
import numpy as np
class TestClass:
def __init__(self):
self.maxlen = 4
self.manager = Manager()
self.mem = self.manager.list()
self.lock = self.manager.Lock()
self.process = Process(target=self.run, args=(self.mem, self.lock))
def run(self, mem, lock):
while True:
array = np.random.randint(0, high=10, size=5)
with lock:
if len(mem) >= self.maxlen:
mem.pop(0)
mem.append(array)
def print_values(mem, lock):
while True:
with lock:
print mem
test = TestClass()
print_process = Process(target=print_values, args=(test.mem, test.lock))
test.process.start()
print_process.start()
test.process.join()
print_process.join()
你必须小心使用管理器对象。您可以像使用它们引用的对象一样使用它们,但是您不能像... mem = mem[-4:]
那样截断值,因为您正在更改引用的对象。
至于编码风格,我可能会将 Manager
对象移到 class 之外,或者将 print_values
函数移到其中,但举个例子,这是可行的。如果你移动东西,请注意你不能在 run
方法中直接使用 self.mem
。您需要在启动进程时传入它,否则 python 在后台执行的 fork
将创建一个新实例并且不会共享。
希望这适合你的情况,如果不适合,我们可以尝试稍微调整一下。
因此,结合@bivouac0 提供的代码和@Marijn Pieters 发表的评论,我想出了以下解决方案:
from multiprocessing import Process, Manager, Queue
class testClass:
def __init__(self, maxlen=4):
self.mem = Queue(maxsize=maxlen)
self.process = Process(target=self.run)
def run(self):
i = 0
while True:
self.mem.empty()
while not self.mem.full():
self.mem.put(i)
i += 1
def print_values(queue):
while True:
values = queue.get()
print(values)
if __name__ == "__main__":
test = testClass()
print_process = Process(target=print_values, args=(test.mem,))
test.process.start()
print_process.start()
test.process.join()
print_process.join()