如何在pyspark中读取二进制数据

How to read binary data in pyspark

我正在读取二进制文件http://snap.stanford.edu/data/amazon/productGraph/image_features/image_features.b 使用 pyspark。

import array
from io import StringIO

img_embedding_file = sc.binaryRecords("s3://bucket/image_features.b", 4106)

def mapper(features):
    a = array.array('f')
    a.frombytes(features)
    return a.tolist()

def byte_mapper(bytes):
    return str(bytes)

decoded_embeddings = img_embedding_file.map(lambda x: [byte_mapper(x[:10]), mapper(x[10:])])

当使用

从rdd中选择product_id
decoded_embeddings = img_embedding_file.map(lambda x: [byte_mapper(x[:10]), mapper(x[10:])])

product_id 的输出是

["b'1582480311'", "b'\x00\x00\x00\x00\x88c-?\xeb\xe2'", "b'7@\x00\x00\x00\x00\x00\x00\x00\x00'", "b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'", "b'\xec/\x0b?\x00\x00\x00\x00K\xea'", "b'\x00\x00c\x7f\xd9?\x00\x00\x00\x00'", "b'L\xa6\n>\x00\x00\x00\x00\xfe\xd4'", "b'\x00\x00\x00\x00\x00\x00\xe5\xd0\xa2='", "b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'", "b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'"]

文件托管在 s3 上。 每行中的文件前 10 个字节为 product_id,接下来的 4096 个字节为 image_features 我能够提取所有 4096 个图像特征,但在读取前 10 个字节并将其转换为正确的可读格式时遇到问题。

编辑:

最后,问题来自于recordLength。不是 4096 + 10,而是 4096*4 + 10。正在更新:

img_embedding_file = sc.binaryRecords("s3://bucket/image_features.b", 16394)

应该可以。 实际上,您可以在下载二进制文件的网站 provided code 中找到它:

for i in range(4096):
     feature.append(struct.unpack('f', f.read(4))) # <-- so 4096 * 4

旧答案:

我认为问题出在您的 byte_mapper 函数上。 这不是将字节转换为字符串的正确方法。你应该使用 decode:

bytes = b'1582480311'
print(str(bytes))
# output: "b'1582480311'"

print(bytes.decode("utf-8"))
# output: '1582480311'

如果您遇到错误:

UnicodeDecodeError: 'utf-8' codec can't decode byte 0x88 in position 4: invalid start byte

这意味着 product_id 字符串包含非 utf8 字符。如果不知道输入编码,很难转换成字符串。

但是,您可能希望通过将选项 ignore 添加到 decode 函数来忽略这些字符:

bytes.decode("utf-8", "ignore")