在 Apache Beam 中读取整个文件
Read whole file in Apache Beam
是否可以在 Apache Beam 中读取整个文件(不是逐行读取)?
例如,我想读取多行JSONs,我的想法是逐个文件读取,从每个文件中提取数据并从列表中创建PCollection。
将源 JSON 预处理为一个 JSON 文件,其中每一行都是单独的 JSON?
谢谢你的提前。
TextIO
逐行读取文件。因此,在您的 test.json 中,每一行都需要包含一个单独的 Json 对象。
Beam 或任何分布式处理引擎的想法是能够并行化输入数据。从您的问题看来,需要进行一些预处理才能将它们拆分为多个 json。请注意,它不必位于单个文件中,您可以有多个文件,每个文件包含任意数量的 json 个文件。 Beam 将并行读取行。
如果有帮助,请采纳答案。
想要对从文件中读取的对象进行并行处理是一个合理的用例。
import apache_beam as beam
from apache_beam.io import fileio
import json
# Make some fake data
for i in range(0,10):
with open(f'/tmp/data{i}.json', 'w') as f:
json.dump({'somethinig':i,'otherthing':[1,2,3]}, f)
filenames = [f'/tmp/data{i}.json' for i in range(0,10)]
with beam.Pipeline() as pipeline:
lines = (
pipeline
| beam.Create(filenames)
| fileio.MatchAll()
| fileio.ReadMatches()
| beam.Map(lambda file: print(file.read_utf8()))
)
是否可以在 Apache Beam 中读取整个文件(不是逐行读取)?
例如,我想读取多行JSONs,我的想法是逐个文件读取,从每个文件中提取数据并从列表中创建PCollection。
将源 JSON 预处理为一个 JSON 文件,其中每一行都是单独的 JSON?
谢谢你的提前。
TextIO
逐行读取文件。因此,在您的 test.json 中,每一行都需要包含一个单独的 Json 对象。
Beam 或任何分布式处理引擎的想法是能够并行化输入数据。从您的问题看来,需要进行一些预处理才能将它们拆分为多个 json。请注意,它不必位于单个文件中,您可以有多个文件,每个文件包含任意数量的 json 个文件。 Beam 将并行读取行。
如果有帮助,请采纳答案。
想要对从文件中读取的对象进行并行处理是一个合理的用例。
import apache_beam as beam
from apache_beam.io import fileio
import json
# Make some fake data
for i in range(0,10):
with open(f'/tmp/data{i}.json', 'w') as f:
json.dump({'somethinig':i,'otherthing':[1,2,3]}, f)
filenames = [f'/tmp/data{i}.json' for i in range(0,10)]
with beam.Pipeline() as pipeline:
lines = (
pipeline
| beam.Create(filenames)
| fileio.MatchAll()
| fileio.ReadMatches()
| beam.Map(lambda file: print(file.read_utf8()))
)