Python 共享读取内存

Python shared read memory

我正在处理一个大约 8GB 的​​数据集,我还使用 scikit-learn 在其上训练各种 ML 模型。数据集基本上是一维整数向量列表。

如何使数据集可用于多个 python 进程或如何对数据集进行编码以便使用 multiprocessing 的 类?我一直在阅读 ctypes,我也一直在阅读 multiprocessing 的文档,但我很困惑。我只需要让数据对每个进程都可读,这样我就可以用它来训练模型。

我是否需要将共享 multiprocessing 变量作为 ctypes?

如何将数据集表示为 ctypes

可能与 Share Large, Read-Only Numpy Array Between Multiprocessing Processes

重复

您可以将数据集从当前表示形式转换为新的 numpy memmap 对象,并在每个进程中使用它。但无论如何它都不会很快,它只是提供了一些从 ram 中使用数组的抽象,实际上它将是来自 HDD 的文件,部分缓存在 RAM 中。所以你应该更喜欢带有 partial_fit 方法的 scikit-learn 算法,并使用它们。

https://docs.scipy.org/doc/numpy/reference/generated/numpy.memmap.html

实际上 joblib(在 scikit-learn 中用于并行化)会自动将您的数据集转换为 memmap 表示以从不同进程使用它(当然,如果它足够大)。

我假设您能够将整个数据集加载到 RAM 中的 numpy 数组中,并且您正在处理 Linux 或 Mac。 (如果你在 Windows 或者你不能将数组放入 RAM,那么你应该将数组复制到磁盘上的文件并使用 numpy.memmap 访问它。你的计算机将缓存数据尽可能从磁盘到 RAM,并且这些缓存将在进程之间共享,因此这不是一个糟糕的解决方案。)

在上述假设下,如果您需要对通过 multiprocessing 创建的其他进程中的数据集进行只读访问,您只需创建数据集,然后启动其他进程即可。他们将对原始命名空间中的数据具有只读访问权限。他们可以更改原始命名空间中的数据,但这些更改对其他进程不可见(内存管理器会将他们更改的每个内存段复制到本地内存映射中)。

如果您的其他进程需要更改原始数据集并使这些更改对父进程或其他进程可见,您可以使用如下方式:

import multiprocessing
import numpy as np

# create your big dataset
big_data = np.zeros((3, 3))

# create a shared-memory wrapper for big_data's underlying data
# (it doesn't matter what datatype we use, and 'c' is easiest)
# I think if lock=True, you get a serialized object, which you don't want.
# Note: you will need to setup your own method to synchronize access to big_data.
buf = multiprocessing.Array('c', big_data.data, lock=False)

# at this point, buf and big_data.data point to the same block of memory, 
# (try looking at id(buf[0]) and id(big_data.data[0])) but for some reason
# changes aren't propagated between them unless you do the following:
big_data.data = buf

# now you can update big_data from any process:
def add_one_direct():
    big_data[:] = big_data + 1

def add_one(a):
    # People say this won't work, since Process() will pickle the argument.
    # But in my experience Process() seems to pass the argument via shared
    # memory, so it works OK.
    a[:] = a+1

print "starting value:"
print big_data

p = multiprocessing.Process(target=add_one_direct)
p.start()
p.join()

print "after add_one_direct():"
print big_data

p = multiprocessing.Process(target=add_one, args=(big_data,))
p.start()
p.join()

print "after add_one():"
print big_data