如何在pyspark中读取二进制数据
How to read binary data in pyspark
我正在读取二进制文件http://snap.stanford.edu/data/amazon/productGraph/image_features/image_features.b
使用 pyspark。
import array
from io import StringIO
img_embedding_file = sc.binaryRecords("s3://bucket/image_features.b", 4106)
def mapper(features):
a = array.array('f')
a.frombytes(features)
return a.tolist()
def byte_mapper(bytes):
return str(bytes)
decoded_embeddings = img_embedding_file.map(lambda x: [byte_mapper(x[:10]), mapper(x[10:])])
当使用
从rdd中选择product_id
时
decoded_embeddings = img_embedding_file.map(lambda x: [byte_mapper(x[:10]), mapper(x[10:])])
product_id
的输出是
["b'1582480311'", "b'\x00\x00\x00\x00\x88c-?\xeb\xe2'", "b'7@\x00\x00\x00\x00\x00\x00\x00\x00'", "b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'", "b'\xec/\x0b?\x00\x00\x00\x00K\xea'", "b'\x00\x00c\x7f\xd9?\x00\x00\x00\x00'", "b'L\xa6\n>\x00\x00\x00\x00\xfe\xd4'", "b'\x00\x00\x00\x00\x00\x00\xe5\xd0\xa2='", "b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'", "b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'"]
文件托管在 s3 上。
每行中的文件前 10 个字节为 product_id
,接下来的 4096 个字节为 image_features
我能够提取所有 4096 个图像特征,但在读取前 10 个字节并将其转换为正确的可读格式时遇到问题。
编辑:
最后,问题来自于recordLength
。不是 4096 + 10
,而是 4096*4 + 10
。正在更新:
img_embedding_file = sc.binaryRecords("s3://bucket/image_features.b", 16394)
应该可以。
实际上,您可以在下载二进制文件的网站 provided code 中找到它:
for i in range(4096):
feature.append(struct.unpack('f', f.read(4))) # <-- so 4096 * 4
旧答案:
我认为问题出在您的 byte_mapper
函数上。
这不是将字节转换为字符串的正确方法。你应该使用 decode
:
bytes = b'1582480311'
print(str(bytes))
# output: "b'1582480311'"
print(bytes.decode("utf-8"))
# output: '1582480311'
如果您遇到错误:
UnicodeDecodeError: 'utf-8' codec can't decode byte 0x88 in position 4: invalid start byte
这意味着 product_id
字符串包含非 utf8 字符。如果不知道输入编码,很难转换成字符串。
但是,您可能希望通过将选项 ignore
添加到 decode
函数来忽略这些字符:
bytes.decode("utf-8", "ignore")
我正在读取二进制文件http://snap.stanford.edu/data/amazon/productGraph/image_features/image_features.b 使用 pyspark。
import array
from io import StringIO
img_embedding_file = sc.binaryRecords("s3://bucket/image_features.b", 4106)
def mapper(features):
a = array.array('f')
a.frombytes(features)
return a.tolist()
def byte_mapper(bytes):
return str(bytes)
decoded_embeddings = img_embedding_file.map(lambda x: [byte_mapper(x[:10]), mapper(x[10:])])
当使用
从rdd中选择product_id
时
decoded_embeddings = img_embedding_file.map(lambda x: [byte_mapper(x[:10]), mapper(x[10:])])
product_id
的输出是
["b'1582480311'", "b'\x00\x00\x00\x00\x88c-?\xeb\xe2'", "b'7@\x00\x00\x00\x00\x00\x00\x00\x00'", "b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'", "b'\xec/\x0b?\x00\x00\x00\x00K\xea'", "b'\x00\x00c\x7f\xd9?\x00\x00\x00\x00'", "b'L\xa6\n>\x00\x00\x00\x00\xfe\xd4'", "b'\x00\x00\x00\x00\x00\x00\xe5\xd0\xa2='", "b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'", "b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'"]
文件托管在 s3 上。
每行中的文件前 10 个字节为 product_id
,接下来的 4096 个字节为 image_features
我能够提取所有 4096 个图像特征,但在读取前 10 个字节并将其转换为正确的可读格式时遇到问题。
编辑:
最后,问题来自于recordLength
。不是 4096 + 10
,而是 4096*4 + 10
。正在更新:
img_embedding_file = sc.binaryRecords("s3://bucket/image_features.b", 16394)
应该可以。 实际上,您可以在下载二进制文件的网站 provided code 中找到它:
for i in range(4096):
feature.append(struct.unpack('f', f.read(4))) # <-- so 4096 * 4
旧答案:
我认为问题出在您的 byte_mapper
函数上。
这不是将字节转换为字符串的正确方法。你应该使用 decode
:
bytes = b'1582480311'
print(str(bytes))
# output: "b'1582480311'"
print(bytes.decode("utf-8"))
# output: '1582480311'
如果您遇到错误:
UnicodeDecodeError: 'utf-8' codec can't decode byte 0x88 in position 4: invalid start byte
这意味着 product_id
字符串包含非 utf8 字符。如果不知道输入编码,很难转换成字符串。
但是,您可能希望通过将选项 ignore
添加到 decode
函数来忽略这些字符:
bytes.decode("utf-8", "ignore")