使用 apache beam 读取 gzip 文件,包装在 TextIOWrapper 中导致“'CompressedFile' object has no attribute 'writable'”

Reading a gzipped file using apache beam, wrapped in TextIOWrapper results in "'CompressedFile' object has no attribute 'writable'"

我正在努力在 apache beam 中实现一个简单的 CSV reader,然后是来自 beam 存储库的测试:https://github.com/apache/beam/blob/b85795adbd22d8b5cf9ebc684ce43e172a789587/sdks/python/apache_beam/io/fileio_test.py#L128-L148

def get_csv_reader(readable_file):
  import sys
  import csv
  import io
  if sys.version_info >= (3, 0):
    return csv.reader(io.TextIOWrapper(readable_file.open()))
  else:
    return csv.reader(readable_file.open())

with beam.Pipeline() as p:
  content_pc = (p
                | beam.Create([CSV_FILE])
                | fileio.ReadMatches()
                | beam.FlatMap(get_csv_reader)
                | beam.Map(print))

如果 CSV_FILE 没有被压缩并且我没有收到任何错误,这会正常工作。但是,如果我使用压缩文件作为输入,我会得到:

<ipython-input-114-4830c3592163> in get_csv_reader(readable_file)
      6   import io
      7   if sys.version_info >= (3, 0):
----> 8     return csv.reader(io.TextIOWrapper(readable_file.open()))
      9   else:
     10     return csv.reader(readable_file.open())

AttributeError: 'CompressedFile' object has no attribute 'writable' [while running 'FlatMap(get_csv_reader)']

我明白为什么会这样(TextIOWrapper 正在寻找可读和可写的对象)。是否有对 apache beam/dataflow 有更多了解的人可以建议如何最好地实现它以处理压缩和未压缩的输入?

从 Beam 2.18.0 版开始,您将能够执行以下操作:

def get_csv_reader(readable_file):
  import sys
  import csv
  import io
  if sys.version_info >= (3, 0):
    return csv.reader(io.TextIOWrapper(readable_file.open(compression_type=MY_COMPRESSION)))
  else:
    return csv.reader(readable_file.open(compression_type=MY_COMPRESSION))

with beam.Pipeline() as p:
  content_pc = (p
                | beam.Create([CSV_FILE])
                | fileio.ReadMatches()
                | beam.FlatMap(get_csv_reader)
                | beam.Map(print))

即使我将 Beam 版本更新到 2.18.0 或 2.19.0,Pablo () 发布的答案也不起作用。

重现失败

GZ_FILE_PATTERN='/local/path/to/some-wildcard-*.gz'

def get_csv_reader(readable_file):
    return csv.reader(io.TextIOWrapper(readable_file.open(compression_type=CompressionTypes.GZIP)))


with beam.Pipeline() as p:
    process = \
        (p
         | MatchFiles(GZ_FILE_PATTERN)
         | ReadMatches()
         | beam.FlatMap(get_csv_reader)
         | beam.Map(print)
         )

错误信息:

AttributeError: 'CompressedFile' object has no attribute 'writable' [while running 'FlatMap(get_csv_reader)']

(v2.19.0 还没有合并补丁和修复问题?)

目前我决定避免使用CompressionTypes.GZIP和自己解压文件。

重现以正常工作。

GZ_FILE_PATTERN='/local/path/to/some-wildcard-*.gz'

def get_csv_reader(readable_file_metadata):
    # Hack: Pass UNCOMPRESSED to BEAM
    with FileSystems.open(readable_file_metadata.path, compression_type=CompressionTypes.UNCOMPRESSED) as fopen:
        # decompress by myself
        decompressed_str = io.StringIO(gzip.decompress(fopen.read()).decode('utf-8'))
        return csv.reader(decompressed_str)


with beam.Pipeline() as p:
    process = \
        (p
         | MatchFiles(GZ_FILE_PATTERN)
         | beam.FlatMap(get_csv_reader)
         | beam.Map(print)
         )

所述,接受的答案可能会导致错误。我们可以通过自己解压缩文件来避免建议的解决方法。

def csv_reader(fn: str) -> List[str]:
    fp = GcsIO().open(fn)
    for r in gzip.open(fp):
        s = r.decode('utf-8')
        yield s.strip('\n').split(',')


def parse_csv(fn: str):
    csv = csv_reader(fn)
    # todo: manipulate csv and return iterable


result = (p | beam.Create([known_args.input]) | beam.FlatMap(parse_csv))