在pyspark中加载大于内存的hdf5文件
Loading bigger than memory hdf5 file in pyspark
我有一个以 HDF5 格式存储的大文件(比如 20 Gb)。该文件基本上是一组随时间演变的 3D 坐标(分子模拟轨迹)。这基本上是一个形状数组 (8000 (frames), 50000 (particles), 3 (coordinates))
在常规 python 中,我会简单地使用 h5py
或 pytables
加载 hdf5 数据文件并为数据文件编制索引,就像它是一个 numpy 一样(该库会延迟加载它需要的任何数据需要)。
但是,如果我尝试使用 SparkContext.parallelize
在 Spark 中加载此文件,它显然会阻塞内存:
sc.parallelize(data, 10)
我该如何处理这个问题?大型数组是否有首选数据格式?我可以让rdd在不经过内存的情况下写入磁盘吗?
Spark(和 Hadoop)不支持读取部分 HDF5 二进制文件。 (我怀疑这是因为 HDF5 是一种用于存储文档的容器格式,它允许为文档指定树状层次结构)。
但是如果您需要从本地磁盘读取文件——使用 Spark 是可行的,尤其是当您知道 HDF5 文件的内部结构时。
这是一个 example - 它假定您将 运行 本地 spark 作业,并且您事先知道您的 HDF5 数据集“/mydata”由 100 个块组成。
h5file_path="/absolute/path/to/file"
def readchunk(v):
empty = h5.File(h5file_path)
return empty['/mydata'][v,:]
foo = sc.parallelize(range(0,100)).map(lambda v: readchunk(v))
foo.count()
更进一步,您可以修改程序以使用 f5['/mydata'].shape[0]
检测块数
下一步将迭代多个数据集(您可以使用 f5.keys()
列出数据集)。
还有 another article "From HDF5 Datasets to Apache Spark RDDs" 描述了类似的方法。
同样的方法也适用于分布式集群,但效率不高。 h5py 要求文件位于本地文件系统中。因此,这可以通过多种方式实现:将文件复制到所有工作人员并将其保存在工作人员磁盘上的同一位置;或者将文件放入 HDFS 并使用 fusefs 挂载 HDFS - 这样工作人员就可以访问该文件。这两种方式都有一些低效,但它应该足以应付临时任务。
这里是优化版本,在每个执行器上只打开一次h5:
h5file_path="/absolute/path/to/file"
_h5file = None
def readchunk(v):
# code below will be executed on executor - in another python process on remote server
# original value for _h5file (None) is sent from driver
# and on executor is updated to h5.File object when the `readchunk` is called for the first time
global _h5file
if _h5file is None:
_h5file = h5.File(h5file_path)
return _h5file['/mydata'][v,:]
foo = sc.parallelize(range(0,100)).map(lambda v: readchunk(v))
foo.count()
我有一个以 HDF5 格式存储的大文件(比如 20 Gb)。该文件基本上是一组随时间演变的 3D 坐标(分子模拟轨迹)。这基本上是一个形状数组 (8000 (frames), 50000 (particles), 3 (coordinates))
在常规 python 中,我会简单地使用 h5py
或 pytables
加载 hdf5 数据文件并为数据文件编制索引,就像它是一个 numpy 一样(该库会延迟加载它需要的任何数据需要)。
但是,如果我尝试使用 SparkContext.parallelize
在 Spark 中加载此文件,它显然会阻塞内存:
sc.parallelize(data, 10)
我该如何处理这个问题?大型数组是否有首选数据格式?我可以让rdd在不经过内存的情况下写入磁盘吗?
Spark(和 Hadoop)不支持读取部分 HDF5 二进制文件。 (我怀疑这是因为 HDF5 是一种用于存储文档的容器格式,它允许为文档指定树状层次结构)。
但是如果您需要从本地磁盘读取文件——使用 Spark 是可行的,尤其是当您知道 HDF5 文件的内部结构时。
这是一个 example - 它假定您将 运行 本地 spark 作业,并且您事先知道您的 HDF5 数据集“/mydata”由 100 个块组成。
h5file_path="/absolute/path/to/file"
def readchunk(v):
empty = h5.File(h5file_path)
return empty['/mydata'][v,:]
foo = sc.parallelize(range(0,100)).map(lambda v: readchunk(v))
foo.count()
更进一步,您可以修改程序以使用 f5['/mydata'].shape[0]
下一步将迭代多个数据集(您可以使用 f5.keys()
列出数据集)。
还有 another article "From HDF5 Datasets to Apache Spark RDDs" 描述了类似的方法。
同样的方法也适用于分布式集群,但效率不高。 h5py 要求文件位于本地文件系统中。因此,这可以通过多种方式实现:将文件复制到所有工作人员并将其保存在工作人员磁盘上的同一位置;或者将文件放入 HDFS 并使用 fusefs 挂载 HDFS - 这样工作人员就可以访问该文件。这两种方式都有一些低效,但它应该足以应付临时任务。
这里是优化版本,在每个执行器上只打开一次h5:
h5file_path="/absolute/path/to/file"
_h5file = None
def readchunk(v):
# code below will be executed on executor - in another python process on remote server
# original value for _h5file (None) is sent from driver
# and on executor is updated to h5.File object when the `readchunk` is called for the first time
global _h5file
if _h5file is None:
_h5file = h5.File(h5file_path)
return _h5file['/mydata'][v,:]
foo = sc.parallelize(range(0,100)).map(lambda v: readchunk(v))
foo.count()