如何使用共享内存而不是通过多个进程之间的酸洗来传递对象

How to use shared memory instead of passing objects via pickling between multiple processes

我正在研究一个以加法模型为中心的 CPU 强化 ML 问题。由于加法是主要操作,我可以将输入数据分成几部分并生成多个模型,然后通过覆盖的 __add__ 方法合并这些模型。

与多处理相关的代码如下所示:

def pool_worker(filename, doshuffle):
    print(f"Processing file: {filename}")
    with open(filename, 'r') as f:
        partial = FragmentModel(order=args.order, indata=f, shuffle=doshuffle)
        return partial

def generateModel(is_mock=False, save=True):
    model = None
    with ThreadPool(args.nthreads) as pool:
        from functools import partial
        partial_models = pool.imap_unordered(partial(pool_worker, doshuffle=is_mock), args.input)
        i = 0
        for m in partial_models:
            logger.info(f'Starting to merge model {i}')
            if model is None:
                import copy
                model = copy.deepcopy(m)
            else:
                model += m
            logger.info(f'Done merging...')
            i += 1

    return model

问题在于,随着模型阶数的增加,内存消耗呈指数级增长,因此在阶数 4 时,模型的每个实例大约为 4-5 GB,这会导致线程池崩溃,因为中间模型对象不再存在腌制的。

我读了一点,看起来即使酸洗不是问题,像这样传递数据仍然非常低效,正如对 this answer 的评论。

但是,关于如何为此目的使用共享内存的指导很少。是否可以在不必更改模型对象的内部结构的情况下避免此问题?

您应该为共享的可编辑对象使用 Manager 代理对象:https://docs.python.org/3/library/multiprocessing.html#multiprocessing-managers 访问锁将由该 Manager 代理对象处理。

Customized managers部分有一个例子,应该适合你:

from multiprocessing.managers import BaseManager

class MathsClass:
    def add(self, x, y):
        return x + y
    def mul(self, x, y):
        return x * y

class MyManager(BaseManager):
    pass

MyManager.register('Maths', MathsClass)

if __name__ == '__main__':
    with MyManager() as manager:
        maths = manager.Maths()
        print(maths.add(4, 3))         # prints 7
        print(maths.mul(7, 8))         # prints 56

之后,您必须从不同的进程(如 using a remote manager 所示)连接到该管理器并根据需要对其进行编辑。

查看 ray project which is a distributed execution framework that makes use of apache arrow 进行序列化。如果您使用的是 numpy 数组,它会特别棒,因此是 ML 工作流程的绝佳工具。

这是 object serialization

上的文档片段

In Ray, we optimize for numpy arrays by using the Apache Arrow data format. When we deserialize a list of numpy arrays from the object store, we still create a Python list of numpy array objects. However, rather than copy each numpy array, each numpy array object holds a pointer to the relevant array held in shared memory. There are some advantages to this form of serialization.

  • Deserialization can be very fast.
  • Memory is shared between processes so worker processes can all read the same data without having to copy it.

在我看来,它比并行执行的多处理库更容易使用,尤其是在寻求使用共享内存时,tutorial.

中的用法介绍

使用文件!

不,真的,使用文件——它们很高效(OS 将缓存内容),并允许您处理更大的问题(数据集不必适合 RAM) .

使用 https://docs.scipy.org/doc/numpy-1.15.0/reference/routines.io.html 到 dump/load numpy 数组 to/from 文件中的任何一个,并且只在进程之间传递文件名。

P.S。基准序列化方法,取决于中间数组大小,最快的可能是 "raw"(无转换开销)或 "compressed"(如果文件最终被写入磁盘)或其他。 IIRC 加载 "raw" 文件可能需要提前了解数据格式(维度、大小)。

从Python 3.8 开始,有multiprocessing.shared_memory 可以在进程之间直接共享内存,类似于C 或Java 中“真正的”多线程。直接内存共享比通过文件、套接字或数据复制共享要快得多 serialization/deserialization.

它的工作原理是提供一个共享内存缓冲区,不同进程可以通过基本 SharedMemory class instance or a more advanced SharedMemoryManager class instance. Variables in basic python data types can be conveniently declared using the built-in ShareableList 在该缓冲区上声明和声明变量。 numpy.ndarray等高级数据类型的变量可以通过声明时指定内存缓冲区来共享

示例numpy.ndarray

import numpy as np
from multiprocessing import shared_memory

# setting up
data = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
d_shape = (len(data),)
d_type = np.int64
d_size = np.dtype(d_type).itemsize * np.prod(d_shape)

# IN THE MAIN PROCESS
# allocate new shared memory
shm = shared_memory.SharedMemory(create=True, size=d_size)
shm_name = shm.name
# numpy array on shared memory buffer
a = np.ndarray(shape=d_shape, dtype=d_type, buffer=shm.buf)
# copy data into shared memory ndarray once
a[:] = data[:]

# IN ANOTHER PROCESS
# reuse existing shared memory
ex_shm = shared_memory.SharedMemory(name=shm_name)
# numpy array b uses the same memory buffer as a
b = np.ndarray(shape=d_shape, dtype=d_type, buffer=ex_shm.buf)
# changes in b will be reflected in a and vice versa...
ex_shm.close()  # close after using

# IN THE MAIN PROCESS
shm.close()  # close after using
shm.unlink()  # free memory

在上面的代码中,ab数组使用相同的底层内存,可以直接更新相同的内存,这在机器学习中非常有用。但是,您应该注意并发更新问题并决定如何处理它们,要么使用 Lock/partitioned accesses/or 接受随机更新(所谓的 HogWild 样式)。