在 Python 中从 DataFlow 读取 snappy 或 lzo 压缩文件
Read snappy or lzo compressed files from DataFlow in Python
有没有办法使用 Apache Beam 的 Python SDK 读取 DataFlow 上的 snappy 或 lzo 压缩文件?
由于我找不到更简单的方法,这是我目前的方法(这似乎完全过分且效率低下):
- 启动 DataProc 集群
- 在新集群中使用 hive 解压缩此类数据并将其放置在临时位置
- 停止 DataProc 集群
- 运行 从这些临时未压缩数据中读取的 DataFlow 作业
- 清理临时未压缩数据
我不认为现在有任何内置的方法可以用 Beam 来做到这一点。 Python beam 支持 Gzip、bzip2 和 deflate。
选项1:读入整个文件并手动解压
- 创建一个 custom source 以生成文件名列表(即通过列出目录从管道选项播种),并将它们作为记录发出
- 在下面的ParDo中,手动读取每个文件并解压。如果您已将数据存储在 GCS 文件中,则需要使用 GCS 库来读取 GCS 文件。
此解决方案的执行速度可能不会那么快,并且无法将大文件加载到内存中。但是如果你的文件很小,它可能就足够了。
选项 2:向 Beam 添加新的解压缩器。
您可以为 beam 贡献一个 decompressor。看起来您需要实现解压缩器逻辑,在编写管道时提供一些常量来指定它。
我认为限制之一是必须能够扫描文件并一次将其分块解压缩。如果压缩格式需要将整个文件读入内存,那么它可能无法工作。这是因为 TextIO 库被设计为基于记录的,它支持读取不适合内存的大文件并将它们分解成小记录进行处理。
有没有办法使用 Apache Beam 的 Python SDK 读取 DataFlow 上的 snappy 或 lzo 压缩文件?
由于我找不到更简单的方法,这是我目前的方法(这似乎完全过分且效率低下):
- 启动 DataProc 集群
- 在新集群中使用 hive 解压缩此类数据并将其放置在临时位置
- 停止 DataProc 集群
- 运行 从这些临时未压缩数据中读取的 DataFlow 作业
- 清理临时未压缩数据
我不认为现在有任何内置的方法可以用 Beam 来做到这一点。 Python beam 支持 Gzip、bzip2 和 deflate。
选项1:读入整个文件并手动解压
- 创建一个 custom source 以生成文件名列表(即通过列出目录从管道选项播种),并将它们作为记录发出
- 在下面的ParDo中,手动读取每个文件并解压。如果您已将数据存储在 GCS 文件中,则需要使用 GCS 库来读取 GCS 文件。
此解决方案的执行速度可能不会那么快,并且无法将大文件加载到内存中。但是如果你的文件很小,它可能就足够了。
选项 2:向 Beam 添加新的解压缩器。
您可以为 beam 贡献一个 decompressor。看起来您需要实现解压缩器逻辑,在编写管道时提供一些常量来指定它。
我认为限制之一是必须能够扫描文件并一次将其分块解压缩。如果压缩格式需要将整个文件读入内存,那么它可能无法工作。这是因为 TextIO 库被设计为基于记录的,它支持读取不适合内存的大文件并将它们分解成小记录进行处理。