如何分配共享内存,以便父进程可以看到子进程中的写入?
How to allocate shared memory such that writes in child process will be seen by parent process?
背景:我正在编写一个程序,可以同时从多个摄像头读取视频帧。我想要 1 个执行帧读取的进程和第二个将这些帧写入磁盘的进程。我一直在尝试找出最好的方法(在 Python 3.6 中)使 "read" 进程中的帧可用于 "write" 进程进行保存。我已经将共享内存作为最佳选择。
问题: 当我为两个进程分配共享内存时,子进程看不到父进程在共享内存中所做的更改space。
之前的努力: 我一直在尝试 Is shared readonly data copied to different processes for multiprocessing? 建议的方法。但是,将此代码直接粘贴到 Atom 中并尝试在 Python 2.7 或 Python 3.6 下 运行 它不会产生与链接答案提供的相同结果(即父进程看不到my_func
在子进程中所做的更改)
注:
import multiprocessing
import ctypes
import numpy as np
shared_array_base = multiprocessing.Array(ctypes.c_double, 10*10)
shared_array = np.ctypeslib.as_array(shared_array_base.get_obj())
shared_array = shared_array.reshape(10, 10)
# Parallel processing
def my_func(i, def_param=shared_array):
shared_array[i,:] = i
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=4)
pool.map(my_func, range(10))
print shared_array
一直在玩一些应该与 Unix 和 Windows 兼容的东西。我想到的第一个想法是使用内存映射文件,因为这基本上是共享内存背后的机制
在 Python 下使用它们非常容易。例如,这是一个以这种方式创建 numpy 数组的函数:
import numpy as np
from ctypes import sizeof, c_double
from mmap import mmap, ACCESS_DEFAULT
def shared_array(
shape, path, mode='rb', *,
dtype=c_double, access=ACCESS_DEFAULT
):
for n in reversed(shape):
dtype *= n
with open(path, mode) as fd:
if fd.writable():
size = fd.seek(0, 2)
if size < sizeof(dtype):
fd.truncate(sizeof(dtype))
buf = mmap(fd.fileno(), sizeof(dtype), access=access)
return np.ctypeslib.as_array(
dtype.from_buffer(buf)
)
即创建一个具有给定形状(即 (len,)
或 (rows, cols)
)的 "shared numpy array",它出现在给定路径的文件系统中,并具有由 mode
提供的访问权限(即 rb
是只读的,r+b
是读写的)。
这可以与如下代码一起使用:
from multiprocessing import Pool, set_start_method
def fn(shape, path):
array = shared_array(shape, path, 'r+b')
np.fill_diagonal(array, 42)
def main():
shape = (5, 10)
path = 'tmp.buffer'
array = shared_array(shape, path, 'w+b')
with Pool() as pool:
pool.apply(fn, (shape, path))
print(array)
if __name__ == '__main__':
set_start_method('spawn')
main()
我使用 Linux,所以明确设置了一个 Windows 样式过程 spawn
ing 样式
背景:我正在编写一个程序,可以同时从多个摄像头读取视频帧。我想要 1 个执行帧读取的进程和第二个将这些帧写入磁盘的进程。我一直在尝试找出最好的方法(在 Python 3.6 中)使 "read" 进程中的帧可用于 "write" 进程进行保存。我已经将共享内存作为最佳选择。
问题: 当我为两个进程分配共享内存时,子进程看不到父进程在共享内存中所做的更改space。
之前的努力: 我一直在尝试 Is shared readonly data copied to different processes for multiprocessing? 建议的方法。但是,将此代码直接粘贴到 Atom 中并尝试在 Python 2.7 或 Python 3.6 下 运行 它不会产生与链接答案提供的相同结果(即父进程看不到my_func
在子进程中所做的更改)
注:
import multiprocessing
import ctypes
import numpy as np
shared_array_base = multiprocessing.Array(ctypes.c_double, 10*10)
shared_array = np.ctypeslib.as_array(shared_array_base.get_obj())
shared_array = shared_array.reshape(10, 10)
# Parallel processing
def my_func(i, def_param=shared_array):
shared_array[i,:] = i
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=4)
pool.map(my_func, range(10))
print shared_array
一直在玩一些应该与 Unix 和 Windows 兼容的东西。我想到的第一个想法是使用内存映射文件,因为这基本上是共享内存背后的机制
在 Python 下使用它们非常容易。例如,这是一个以这种方式创建 numpy 数组的函数:
import numpy as np
from ctypes import sizeof, c_double
from mmap import mmap, ACCESS_DEFAULT
def shared_array(
shape, path, mode='rb', *,
dtype=c_double, access=ACCESS_DEFAULT
):
for n in reversed(shape):
dtype *= n
with open(path, mode) as fd:
if fd.writable():
size = fd.seek(0, 2)
if size < sizeof(dtype):
fd.truncate(sizeof(dtype))
buf = mmap(fd.fileno(), sizeof(dtype), access=access)
return np.ctypeslib.as_array(
dtype.from_buffer(buf)
)
即创建一个具有给定形状(即 (len,)
或 (rows, cols)
)的 "shared numpy array",它出现在给定路径的文件系统中,并具有由 mode
提供的访问权限(即 rb
是只读的,r+b
是读写的)。
这可以与如下代码一起使用:
from multiprocessing import Pool, set_start_method
def fn(shape, path):
array = shared_array(shape, path, 'r+b')
np.fill_diagonal(array, 42)
def main():
shape = (5, 10)
path = 'tmp.buffer'
array = shared_array(shape, path, 'w+b')
with Pool() as pool:
pool.apply(fn, (shape, path))
print(array)
if __name__ == '__main__':
set_start_method('spawn')
main()
我使用 Linux,所以明确设置了一个 Windows 样式过程 spawn
ing 样式