Python 多处理 - 读取/写入多个二维数组
Python multiprocessing - Reading from / writing to multiple 2D arrays
我正在尝试通过将 this tutorial here 应用于我的用例来加快对大型 numpy
数组的繁重计算。原则上,我有一个输入数组和一个结果数组,并希望在许多过程中共享它们,在这些过程中,数据从输入数组中读取,经过调整,然后写入输出数组。我认为我不需要锁,因为用于读取和写入的数组索引对于每个进程都是唯一的。
这是我的测试示例,基于链接教程:
import numpy as np
import multiprocessing as mp
WORKER_DICT = dict()
def shared_array_from_np_array(data_array, init_value=None):
raw_array = mp.RawArray(np.ctypeslib.as_ctypes_type(data_array.dtype), data_array.size)
shared_array = np.frombuffer(raw_array, dtype=data_array.dtype).reshape(data_array.shape)
if init_value:
np.copyto(shared_array, np.full_like(data_array, init_value))
return raw_array, shared_array
else:
np.copyto(shared_array, data_array)
return raw_array, shared_array
def init_worker(data_array, result_array):
WORKER_DICT['data_array'] = data_array
WORKER_DICT['result_array'] = result_array
WORKER_DICT['shape'] = data_array.shape
def worker(i, j):
data = np.frombuffer(WORKER_DICT['data_array']).reshape(WORKER_DICT['shape'])
result = np.frombuffer(WORKER_DICT['worker_array']).reshape(WORKER_DICT['shape'])
result[i, j] = np.multiply(data[i, j], 2)
return
if __name__ == '__main__':
sh_in_arr, shared_input_array = shared_array_from_np_array(np.array(
[[1, 1, 2, 2],
[1, 1, 2, 2],
[3, 3, 4, 4],
[3, 3, 4, 4]]))
sh_res_arr, shared_result_array = shared_array_from_np_array(shared_input_array, 0)
init_args = (sh_in_arr, sh_res_arr)
with mp.Pool(processes=2, initializer=init_worker, initargs=init_args) as pool:
pool.map_async(worker, range(shared_input_array.shape[0]))
print('Input:', shared_input_array)
print('Output:', shared_result_array)
当我 运行 它时,我再次得到相同的数组:
Input:
[[1 1 2 2]
[1 1 2 2]
[3 3 4 4]
[3 3 4 4]]
Output:
[[1 1 2 2]
[1 1 2 2]
[3 3 4 4]
[3 3 4 4]]
我是在正确的轨道上还是有什么实质性的错误? Combine Pool.map with shared memory Array in Python multiprocessing 看起来简单多了,但我什至不明白原来的问题。
编辑:在对旧 Python 版本发表评论后,我将其切换到 Python 3.9 并添加了实际结果。
所以在多次失败和无数次尝试之后,我想出了这个,似乎可行:
import time
import numpy as np
from multiprocessing import Pool, RawArray, cpu_count
# A global dictionary storing the variables passed from the initializer.
GLOBAL_DICT = {}
def init_worker(input_array, array_shape):
# Using a dictionary is not strictly necessary. You can also
# use global variables.
GLOBAL_DICT['input_array'] = input_array
GLOBAL_DICT['array_shape'] = array_shape
def worker_func(i):
# Simply doubles all entries
data = np.frombuffer(GLOBAL_DICT['input_array']).reshape(GLOBAL_DICT['array_shape'])
return data[i, :] * 2
# We need this check for Windows to prevent infinitely spawning new child
# processes.
if __name__ == '__main__':
start = time.time()
my_array = np.array(
[[1, 1, 2, 2],
[1, 1, 2, 2],
[3, 3, 4, 4],
[3, 3, 4, 4]])
array_shape = my_array.shape
raw_array = RawArray('d', array_shape[0] * array_shape[1])
# Wrap my_array as an numpy array.
shared_array = np.frombuffer(raw_array).reshape(array_shape)
# Copy my_array to shared array.
np.copyto(shared_array, my_array)
# Start the process pool and do the computation.
args = (raw_array, array_shape)
with Pool(processes=cpu_count()-2, initializer=init_worker, initargs=args) as pool:
result = pool.map(worker_func, range(array_shape[0]))
print(f'Input:\n{my_array}')
print(f'Result:\n{np.array(result)}')
打印:
Input:
[[1 1 2 2]
[1 1 2 2]
[3 3 4 4]
[3 3 4 4]]
Result:
[[2. 2. 4. 4.]
[2. 2. 4. 4.]
[6. 6. 8. 8.]
[6. 6. 8. 8.]]
我想有更有效或更漂亮的方法可以做到这一点。理想情况下,我想直接写入共享输出数组,但现在,它可以工作。
我正在尝试通过将 this tutorial here 应用于我的用例来加快对大型 numpy
数组的繁重计算。原则上,我有一个输入数组和一个结果数组,并希望在许多过程中共享它们,在这些过程中,数据从输入数组中读取,经过调整,然后写入输出数组。我认为我不需要锁,因为用于读取和写入的数组索引对于每个进程都是唯一的。
这是我的测试示例,基于链接教程:
import numpy as np
import multiprocessing as mp
WORKER_DICT = dict()
def shared_array_from_np_array(data_array, init_value=None):
raw_array = mp.RawArray(np.ctypeslib.as_ctypes_type(data_array.dtype), data_array.size)
shared_array = np.frombuffer(raw_array, dtype=data_array.dtype).reshape(data_array.shape)
if init_value:
np.copyto(shared_array, np.full_like(data_array, init_value))
return raw_array, shared_array
else:
np.copyto(shared_array, data_array)
return raw_array, shared_array
def init_worker(data_array, result_array):
WORKER_DICT['data_array'] = data_array
WORKER_DICT['result_array'] = result_array
WORKER_DICT['shape'] = data_array.shape
def worker(i, j):
data = np.frombuffer(WORKER_DICT['data_array']).reshape(WORKER_DICT['shape'])
result = np.frombuffer(WORKER_DICT['worker_array']).reshape(WORKER_DICT['shape'])
result[i, j] = np.multiply(data[i, j], 2)
return
if __name__ == '__main__':
sh_in_arr, shared_input_array = shared_array_from_np_array(np.array(
[[1, 1, 2, 2],
[1, 1, 2, 2],
[3, 3, 4, 4],
[3, 3, 4, 4]]))
sh_res_arr, shared_result_array = shared_array_from_np_array(shared_input_array, 0)
init_args = (sh_in_arr, sh_res_arr)
with mp.Pool(processes=2, initializer=init_worker, initargs=init_args) as pool:
pool.map_async(worker, range(shared_input_array.shape[0]))
print('Input:', shared_input_array)
print('Output:', shared_result_array)
当我 运行 它时,我再次得到相同的数组:
Input:
[[1 1 2 2]
[1 1 2 2]
[3 3 4 4]
[3 3 4 4]]
Output:
[[1 1 2 2]
[1 1 2 2]
[3 3 4 4]
[3 3 4 4]]
我是在正确的轨道上还是有什么实质性的错误? Combine Pool.map with shared memory Array in Python multiprocessing 看起来简单多了,但我什至不明白原来的问题。
编辑:在对旧 Python 版本发表评论后,我将其切换到 Python 3.9 并添加了实际结果。
所以在多次失败和无数次尝试之后,我想出了这个,似乎可行:
import time
import numpy as np
from multiprocessing import Pool, RawArray, cpu_count
# A global dictionary storing the variables passed from the initializer.
GLOBAL_DICT = {}
def init_worker(input_array, array_shape):
# Using a dictionary is not strictly necessary. You can also
# use global variables.
GLOBAL_DICT['input_array'] = input_array
GLOBAL_DICT['array_shape'] = array_shape
def worker_func(i):
# Simply doubles all entries
data = np.frombuffer(GLOBAL_DICT['input_array']).reshape(GLOBAL_DICT['array_shape'])
return data[i, :] * 2
# We need this check for Windows to prevent infinitely spawning new child
# processes.
if __name__ == '__main__':
start = time.time()
my_array = np.array(
[[1, 1, 2, 2],
[1, 1, 2, 2],
[3, 3, 4, 4],
[3, 3, 4, 4]])
array_shape = my_array.shape
raw_array = RawArray('d', array_shape[0] * array_shape[1])
# Wrap my_array as an numpy array.
shared_array = np.frombuffer(raw_array).reshape(array_shape)
# Copy my_array to shared array.
np.copyto(shared_array, my_array)
# Start the process pool and do the computation.
args = (raw_array, array_shape)
with Pool(processes=cpu_count()-2, initializer=init_worker, initargs=args) as pool:
result = pool.map(worker_func, range(array_shape[0]))
print(f'Input:\n{my_array}')
print(f'Result:\n{np.array(result)}')
打印:
Input:
[[1 1 2 2]
[1 1 2 2]
[3 3 4 4]
[3 3 4 4]]
Result:
[[2. 2. 4. 4.]
[2. 2. 4. 4.]
[6. 6. 8. 8.]
[6. 6. 8. 8.]]
我想有更有效或更漂亮的方法可以做到这一点。理想情况下,我想直接写入共享输出数组,但现在,它可以工作。