Avro DataFileReader 需要一个可搜索的文件

Avro DataFileReader needs a seekable file

问题是 stdin 不支持 avro 所需的搜索,所以我们读取所有内容进行缓冲,然后将其提供给 avro_wrapper。它在 Python 2 中有效,但在 Python 3 中无效。我尝试了一些解决方案,但其中 none 有效。

# stdin doesn't support seek which is needed by avro... so this hack worked in python 2. This does not work in Python 3. 
# Reading everything to buffer and then giving this to avro_wrapper. 
buf = StringIO()
buf.write(args.input_file.read())
r = DataFileReader(buf, DatumReader())
# Very first record the headers information. Which gives the header names in order along with munge header names for all the record types
# For e.g if we have 2 ports then it will hold the header information of
#   1. port1 on name1 key
#   2. port2 on name2 key and so on 
headers_record = next(r)['headers']

以上会产生 UnicodeDecodeError: 'utf-8' codec can't decode byte 0xf0 in position 17: invalid continuation byte 错误。

然后我们尝试这样做:

input_stream = io.TextIOWrapper(args.input_file.buffer, encoding='latin-1')
sio = io.StringIO(input_stream.read())
r = DataFileReader(sio, DatumReader())
headers_record = next(r)['headers']

这会产生 avro.schema.AvroException: Not an Avro data file: Obj doesn't match b'Obj\x01'. 错误。

另一种方式:

input_stream = io.TextIOWrapper(args.input_file.buffer, encoding='latin-1')
buf = io.BytesIO(input_stream.read().encode('latin-1'))
r = DataFileReader(buf.read(), DatumReader())
headers_record = next(r)['headers']

这会产生 AttributeError: 'bytes' object has no attribute 'seek'" error.

io.BytesIO() 是用于创建包含二进制数据的可搜索 in-memory 文件对象的正确类型。

但是,您错误地从 io.BytesIO() 文件对象中读取了 bytes 数据,并将这些数据而不是实际的文件对象传入。

不要读取,传入实际的io.BytesIO文件对象和从stdin:

读取的二进制数据
buf = io.BytesIO(args.input_file.buffer.read())
r = DataFileReader(buf, DatumReader())

我直接传入args.input_file.buffer数据,假设args.input是解码stdin字节的TextIOWrapper实例,.buffer是底层BufferedReader 提供原始二进制数据的实例。将此数据解码为 Latin-1,然后再次编码为 Latin-1 是没有意义的。只需传递字节即可。