python3 多进程共享 numpy 数组(read-only)

python3 multiprocess shared numpy array(read-only)

我不确定这个标题是否适合我的情况:我想分享 numpy 数组的原因是它可能是我案例的潜在解决方案之一,但如果您有其他解决方案也要乖

我的任务:我需要使用 multiprocessing 实现 iterative 算法,而这些进程中的每一个都需要有一份数据副本(这个数据很大,并且read-only,在迭代算法过程中不会改变)。

我写了一些伪代码来证明我的想法:

import multiprocessing


def worker_func(data, args):
    # do sth...
    return res

def compute(data, process_num, niter):
    data
    result = []
    args = init()

    for iter in range(niter):
        args_chunk = split_args(args, process_num)
        pool = multiprocessing.Pool()
        for i in range(process_num):
            result.append(pool.apply_async(worker_func,(data, args_chunk[i])))
        pool.close()
        pool.join()
        # aggregate result and update args
        for res in result:
            args = update_args(res.get())

if __name__ == "__main__":
    compute(data, 4, 100)

问题是在每次迭代中,我都必须将数据传递给子进程,这非常time-consuming。

我提出了两个可能的解决方案:

  1. 在进程之间共享数据(它是 ndarray),这就是这个问题的标题。
  2. 让子进程保持活动状态,例如守护进程或其他...并等待调用。这样一来,我只需要在一开始就传递数据。

那么,有什么方法可以在进程之间共享一个 read-only numpy 数组吗?或者,如果您很好地实施了解决方案 2,它也可以。

提前致谢。

从概念上讲,对于您的问题,使用 mmap 是一种标准方法。 这样,多个进程就可以从映射内存中检索信息

对mmap的基本了解:

https://en.wikipedia.org/wiki/Mmap

Python 有 "mmap" 个模块(import mmap)

python 标准的文档和一些示例如下 link

https://docs.python.org/2/library/mmap.html

如果你绝对必须使用 Python 多处理,那么你可以使用 Python 多处理和 Arrow's Plasma object store to store the object in shared memory and access it from each of the workers. See this example,它使用 Pandas 数据帧而不是一个 numpy 数组。

如果您不是绝对需要使用 Python 多处理,使用 Ray 可以更轻松地完成此操作。 Ray 的一个优点是它开箱即用,不仅适用于数组,还适用于 Python 包含数组的对象。

在幕后,Ray 使用 Apache Arrow, which is a zero-copy data layout, and stores the result in Arrow's Plasma object store. This allows worker tasks to have read-only access to the objects without creating their own copies. You can read more about how this works.

序列化 Python 个对象

这是运行的示例的修改版本。

import numpy as np
import ray

ray.init()

@ray.remote
def worker_func(data, i):
    # Do work. This function will have read-only access to
    # the data array.
    return 0

data = np.zeros(10**7)
# Store the large array in shared memory once so that it can be accessed
# by the worker tasks without creating copies.
data_id = ray.put(data)

# Run worker_func 10 times in parallel. This will not create any copies
# of the array. The tasks will run in separate processes.
result_ids = []
for i in range(10):
    result_ids.append(worker_func.remote(data_id, i))

# Get the results.
results = ray.get(result_ids)

请注意,如果我们省略行 data_id = ray.put(data) 而改为调用 worker_func.remote(data, i),则每次函数调用时 data 数组将存储在共享内存中一次,这将是低效的。通过首先调用 ray.put,我们可以将对象存储在对象存储中一次。