如何将二进制文件从 hdfs 读入 Spark 数据帧?
How can I read in a binary file from hdfs into a Spark dataframe?
我正在尝试将一些代码从 pandas 移植到 (py)Spark。不幸的是,我已经在输入部分失败了,我想在其中读取二进制数据并将其放入 Spark Dataframe。
到目前为止,我正在使用 numpy 中的 fromfile
:
dt = np.dtype([('val1', '<i4'),('val2','<i4'),('val3','<i4'),('val4','f8')])
data = np.fromfile('binary_file.bin', dtype=dt)
data=data[1:] #throw away header
df_bin = pd.DataFrame(data, columns=data.dtype.names)
但是对于 Spark,我找不到如何去做。到目前为止,我的解决方法是使用 csv-Files 而不是二进制文件,但这不是理想的解决方案。我知道我不应该将 numpy 的 fromfile
与 spark 一起使用。
如何读取已加载到 hdfs 中的二进制文件?
我试过
fileRDD=sc.parallelize(['hdfs:///user/bin_file1.bin','hdfs:///user/bin_file2.bin])
fileRDD.map(lambda x: ???)
但它给我一个 No such file or directory
错误。
我看到这个问题:
但这只有在我将文件存储在驱动程序节点的主目录中时才有效。
编辑:
请查看此处提到的 sc.binaryFiles 的使用:
尝试使用:
hdfs://machine_host_name:8020/user/bin_file1.bin
你是 fs.defaultFS 中的主机名 core-site.xml
因此,对于像我一样以 Spark 开头并偶然发现二进制文件的任何人。这是我解决它的方法:
dt=np.dtype([('idx_metric','>i4'),('idx_resource','>i4'),('date','>i4'),
('value','>f8'),('pollID','>i2')])
schema=StructType([StructField('idx_metric',IntegerType(),False),
StructField('idx_resource',IntegerType(),False),
StructField('date',IntegerType),False),
StructField('value',DoubleType(),False),
StructField('pollID',IntegerType(),False)])
filenameRdd=sc.binaryFiles('hdfs://nameservice1:8020/user/*.binary')
def read_array(rdd):
#output=zlib.decompress((bytes(rdd[1])),15+32) # in case also zipped
array=np.frombuffer(bytes(rdd[1])[20:],dtype=dt) # remove Header (20 bytes)
array=array.newbyteorder().byteswap() # big Endian
return array.tolist()
unzipped=filenameRdd.flatMap(read_array)
bin_df=sqlContext.createDataFrame(unzipped,schema)
现在您可以使用数据框在 Spark 中做任何您想做的事情。
我最近做了这样的事情:
from struct import unpack_from
# creates an RDD of binaryrecords for determinted record length
binary_rdd = sc.binaryRecords("hdfs://" + file_name, record_length)
# map()s each binary record to unpack() it
unpacked_rdd = binary_rdd.map(lambda record: unpack_from(unpack_format, record))
# registers a data frame with this schema; registerTempTable() it as table_name
raw_df = sqlc.createDataFrame(unpacked_rdd, sparkSchema)
raw_df.registerTempTable(table_name)
其中 unpack_format 和 sparkSchema 必须是 "in-sync"。
unpack_format 是 Python 的 unpack() 和 unpack_from() 函数使用的格式,如中所述
https://docs.python.org/2/library/struct.html#format-characters
sparkSchema 是定义数据框模式的变量。请参阅 https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html#pyspark.sql.SQLContext.createDataFrame
中的示例
我有一个动态生成 unpack_format 和 sparkSchema 变量的脚本;两者同时进行。 (它是一个更大的代码库的一部分,所以为了便于阅读而不在这里发布)
unpack_format 和 sparkSchema 可以定义如下,例如,
from pyspark.sql.types import *
unpack_format = '<' # '<' means little-endian: https://docs.python.org/2/library/struct.html#byte-order-size-and-alignment
sparkSchema = StructType()
record_length = 0
unpack_format += '35s' # 35 bytes that represent a character string
sparkSchema.add("FirstName", 'string', True) # True = nullable
record_length += 35
unpack_format += 'H' # 'H' = unsigned 2-byte integer
sparkSchema.add("ZipCode", 'integer', True)
record_length += 2
# and so on for each field..
从Spark 3.0开始,Spark支持二进制文件数据源,它读取二进制文件并将每个文件转换为包含文件原始内容和元数据的单个记录。
https://spark.apache.org/docs/latest/sql-data-sources-binaryFile.html
我正在尝试将一些代码从 pandas 移植到 (py)Spark。不幸的是,我已经在输入部分失败了,我想在其中读取二进制数据并将其放入 Spark Dataframe。
到目前为止,我正在使用 numpy 中的 fromfile
:
dt = np.dtype([('val1', '<i4'),('val2','<i4'),('val3','<i4'),('val4','f8')])
data = np.fromfile('binary_file.bin', dtype=dt)
data=data[1:] #throw away header
df_bin = pd.DataFrame(data, columns=data.dtype.names)
但是对于 Spark,我找不到如何去做。到目前为止,我的解决方法是使用 csv-Files 而不是二进制文件,但这不是理想的解决方案。我知道我不应该将 numpy 的 fromfile
与 spark 一起使用。
如何读取已加载到 hdfs 中的二进制文件?
我试过
fileRDD=sc.parallelize(['hdfs:///user/bin_file1.bin','hdfs:///user/bin_file2.bin])
fileRDD.map(lambda x: ???)
但它给我一个 No such file or directory
错误。
我看到这个问题:
编辑: 请查看此处提到的 sc.binaryFiles 的使用:
尝试使用:
hdfs://machine_host_name:8020/user/bin_file1.bin
你是 fs.defaultFS 中的主机名 core-site.xml
因此,对于像我一样以 Spark 开头并偶然发现二进制文件的任何人。这是我解决它的方法:
dt=np.dtype([('idx_metric','>i4'),('idx_resource','>i4'),('date','>i4'),
('value','>f8'),('pollID','>i2')])
schema=StructType([StructField('idx_metric',IntegerType(),False),
StructField('idx_resource',IntegerType(),False),
StructField('date',IntegerType),False),
StructField('value',DoubleType(),False),
StructField('pollID',IntegerType(),False)])
filenameRdd=sc.binaryFiles('hdfs://nameservice1:8020/user/*.binary')
def read_array(rdd):
#output=zlib.decompress((bytes(rdd[1])),15+32) # in case also zipped
array=np.frombuffer(bytes(rdd[1])[20:],dtype=dt) # remove Header (20 bytes)
array=array.newbyteorder().byteswap() # big Endian
return array.tolist()
unzipped=filenameRdd.flatMap(read_array)
bin_df=sqlContext.createDataFrame(unzipped,schema)
现在您可以使用数据框在 Spark 中做任何您想做的事情。
我最近做了这样的事情:
from struct import unpack_from
# creates an RDD of binaryrecords for determinted record length
binary_rdd = sc.binaryRecords("hdfs://" + file_name, record_length)
# map()s each binary record to unpack() it
unpacked_rdd = binary_rdd.map(lambda record: unpack_from(unpack_format, record))
# registers a data frame with this schema; registerTempTable() it as table_name
raw_df = sqlc.createDataFrame(unpacked_rdd, sparkSchema)
raw_df.registerTempTable(table_name)
其中 unpack_format 和 sparkSchema 必须是 "in-sync"。
unpack_format 是 Python 的 unpack() 和 unpack_from() 函数使用的格式,如中所述 https://docs.python.org/2/library/struct.html#format-characters
sparkSchema 是定义数据框模式的变量。请参阅 https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html#pyspark.sql.SQLContext.createDataFrame
中的示例
我有一个动态生成 unpack_format 和 sparkSchema 变量的脚本;两者同时进行。 (它是一个更大的代码库的一部分,所以为了便于阅读而不在这里发布)
unpack_format 和 sparkSchema 可以定义如下,例如,
from pyspark.sql.types import *
unpack_format = '<' # '<' means little-endian: https://docs.python.org/2/library/struct.html#byte-order-size-and-alignment
sparkSchema = StructType()
record_length = 0
unpack_format += '35s' # 35 bytes that represent a character string
sparkSchema.add("FirstName", 'string', True) # True = nullable
record_length += 35
unpack_format += 'H' # 'H' = unsigned 2-byte integer
sparkSchema.add("ZipCode", 'integer', True)
record_length += 2
# and so on for each field..
从Spark 3.0开始,Spark支持二进制文件数据源,它读取二进制文件并将每个文件转换为包含文件原始内容和元数据的单个记录。
https://spark.apache.org/docs/latest/sql-data-sources-binaryFile.html