Spark 最快的创建 numpy 数组 RDD 的方法
Spark fastest way for creating RDD of numpy arrays
我的 spark 应用程序使用 RDD 的 numpy 数组。
目前,我正在从 AWS S3 读取我的数据,它表示为
一个简单的文本文件,其中每一行都是一个向量,每个元素由 space 分隔,例如:
1 2 3
5.1 3.6 2.1
3 0.24 1.333
我正在使用 numpy 的函数 loadtxt()
以便从中创建一个 numpy 数组。
但是,这种方法似乎很慢,我的应用程序花费了太多时间(我认为)将我的数据集转换为 numpy 数组。
你能建议我一个更好的方法吗?例如,我应该将我的数据集保存为二进制文件吗?
我应该以其他方式创建 RDD 吗?
关于我如何创建 RDD 的一些代码:
data = sc.textFile("s3_url", initial_num_of_partitions).mapPartitions(readData)
读取数据函数:
def readPointBatch(iterator):
return [(np.loadtxt(iterator,dtype=np.float64)]
您不应在使用 Spark 时使用 numpy
。 Spark 有自己的数据处理方法,确保您有时非常大的文件不会立即加载到内存中,从而超过内存限制。你应该像这样用 Spark 加载你的文件:
data = sc.textFile("s3_url", initial_num_of_partitions) \
.map(lambda row: map(lambda x: float(x), row.split(' ')))
现在这将根据您的示例输出这样的 RDD
:
>>> print(data.collect())
[[1.0, 2.0, 3.0], [5.1, 3.6, 2.1], [3.0, 0.24, 1.333]]
@edit关于文件格式和numpy
用法的一些建议:
文本文件与 CSV、TSV、Parquet 或任何您喜欢的格式一样好。根据有关二进制文件加载的 Spark 文档,二进制文件不是首选:
binaryFiles(path, minPartitions=None)
Note: Experimental
Read a directory of binary files from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI as a byte array. Each file is read as a single record and returned in a key-value pair, where the key is the path of each file, the value is the content of each file.
Note: Small files are preferred, large file is also allowable, but may cause bad performance.
至于 numpy
用法,如果我是你,我肯定会尝试用本机 Spark 替换任何外部包,例如 pyspark.mlib.random
用于随机化:http://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#module-pyspark.mllib.random
简单地用 numpy.fromstring
映射会更加地道,速度也稍微快一些,如下所示:
import numpy as np.
path = ...
initial_num_of_partitions = ...
data = (sc.textFile(path, initial_num_of_partitions)
.map(lambda s: np.fromstring(s, dtype=np.float64, sep=" ")))
但是忽略了你的方法没有什么特别的错误。据我所知,使用基本配置,读取数据大约慢两倍,比创建虚拟 numpy 数组慢一点。
看来问题出在其他地方。这可能是集群配置错误、从 S3 获取数据的成本,甚至是不切实际的期望。
在这种情况下最好的办法是使用 pandas io 库。
请参考这个问题:
.
在那里您将看到如何替换 np.loadtxt()
函数,这样
创建 numpy 数组的 RDD 会更快。
我的 spark 应用程序使用 RDD 的 numpy 数组。
目前,我正在从 AWS S3 读取我的数据,它表示为
一个简单的文本文件,其中每一行都是一个向量,每个元素由 space 分隔,例如:
1 2 3
5.1 3.6 2.1
3 0.24 1.333
我正在使用 numpy 的函数 loadtxt()
以便从中创建一个 numpy 数组。
但是,这种方法似乎很慢,我的应用程序花费了太多时间(我认为)将我的数据集转换为 numpy 数组。
你能建议我一个更好的方法吗?例如,我应该将我的数据集保存为二进制文件吗? 我应该以其他方式创建 RDD 吗?
关于我如何创建 RDD 的一些代码:
data = sc.textFile("s3_url", initial_num_of_partitions).mapPartitions(readData)
读取数据函数:
def readPointBatch(iterator):
return [(np.loadtxt(iterator,dtype=np.float64)]
您不应在使用 Spark 时使用 numpy
。 Spark 有自己的数据处理方法,确保您有时非常大的文件不会立即加载到内存中,从而超过内存限制。你应该像这样用 Spark 加载你的文件:
data = sc.textFile("s3_url", initial_num_of_partitions) \
.map(lambda row: map(lambda x: float(x), row.split(' ')))
现在这将根据您的示例输出这样的 RDD
:
>>> print(data.collect())
[[1.0, 2.0, 3.0], [5.1, 3.6, 2.1], [3.0, 0.24, 1.333]]
@edit关于文件格式和numpy
用法的一些建议:
文本文件与 CSV、TSV、Parquet 或任何您喜欢的格式一样好。根据有关二进制文件加载的 Spark 文档,二进制文件不是首选:
binaryFiles(path, minPartitions=None)
Note: Experimental
Read a directory of binary files from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI as a byte array. Each file is read as a single record and returned in a key-value pair, where the key is the path of each file, the value is the content of each file.
Note: Small files are preferred, large file is also allowable, but may cause bad performance.
至于 numpy
用法,如果我是你,我肯定会尝试用本机 Spark 替换任何外部包,例如 pyspark.mlib.random
用于随机化:http://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#module-pyspark.mllib.random
简单地用 numpy.fromstring
映射会更加地道,速度也稍微快一些,如下所示:
import numpy as np.
path = ...
initial_num_of_partitions = ...
data = (sc.textFile(path, initial_num_of_partitions)
.map(lambda s: np.fromstring(s, dtype=np.float64, sep=" ")))
但是忽略了你的方法没有什么特别的错误。据我所知,使用基本配置,读取数据大约慢两倍,比创建虚拟 numpy 数组慢一点。
看来问题出在其他地方。这可能是集群配置错误、从 S3 获取数据的成本,甚至是不切实际的期望。
在这种情况下最好的办法是使用 pandas io 库。
请参考这个问题:
在那里您将看到如何替换 np.loadtxt()
函数,这样
创建 numpy 数组的 RDD 会更快。