您如何通过 python 读取 apache beam(数据流)中的 JSON 文件?
How do you read JSON files in apache beam (dataflow) via python?
我正在尝试通过 python 中的 apache beam 读取 JSON 文件,并对其应用一些数据质量规则。
目前我正在使用 beam.io.ReadFromText 读取每个 json 行并使用一些函数来修改数据。
读取 JSON 数据并修改它们的更好方法是什么?
(p
| 'Getdata' >> beam.io.ReadFromText(input)
| 'filter_name' >> beam.FlatMap(lambda line: dq_name(line))
| 'filter_phone' >> beam.FlatMap(lambda line: dq_phone(line))
| 'filter_zip' >> beam.FlatMap(lambda line: dq_zip(line))
| 'filter_address' >> beam.FlatMap(lambda line: dq_city(line))
| 'filter_website' >> beam.FlatMap(lambda line: dq_website(line))
| 'write' >> beam.io.WriteToText(output_prefix) )
注意:我对此还很陌生,如果我目前的方法看起来太笨拙,我很抱歉。
您从错误的方向接近 Apache Beam (Dataflow)。
您正在尝试读取一行,然后一次对这一行应用转换。
相反,您需要将 Beam 视为并行处理器。您将读入所有行 ReadFromText()
,然后将转换并行应用到每一行。
查看函数 beam.ParDo()
。这将允许您创建一个 class 来处理 JSON 文件的每一行。然后,您的代码将包含 ReadFromText()
、ParDo(MyJsonProcessor())
、WriteToText()
.
等主要步骤
请记住,您的 JSON 需要使用换行符分隔 JSON。 http://ndjson.org/
我觉得你的流水线没问题。它将 运行 并行,没有任何问题。仅供参考,如果你使用 FlatMap
只是为了过滤元素,你也可以使用 Filter
.
我正在尝试通过 python 中的 apache beam 读取 JSON 文件,并对其应用一些数据质量规则。 目前我正在使用 beam.io.ReadFromText 读取每个 json 行并使用一些函数来修改数据。 读取 JSON 数据并修改它们的更好方法是什么?
(p
| 'Getdata' >> beam.io.ReadFromText(input)
| 'filter_name' >> beam.FlatMap(lambda line: dq_name(line))
| 'filter_phone' >> beam.FlatMap(lambda line: dq_phone(line))
| 'filter_zip' >> beam.FlatMap(lambda line: dq_zip(line))
| 'filter_address' >> beam.FlatMap(lambda line: dq_city(line))
| 'filter_website' >> beam.FlatMap(lambda line: dq_website(line))
| 'write' >> beam.io.WriteToText(output_prefix) )
注意:我对此还很陌生,如果我目前的方法看起来太笨拙,我很抱歉。
您从错误的方向接近 Apache Beam (Dataflow)。
您正在尝试读取一行,然后一次对这一行应用转换。
相反,您需要将 Beam 视为并行处理器。您将读入所有行 ReadFromText()
,然后将转换并行应用到每一行。
查看函数 beam.ParDo()
。这将允许您创建一个 class 来处理 JSON 文件的每一行。然后,您的代码将包含 ReadFromText()
、ParDo(MyJsonProcessor())
、WriteToText()
.
请记住,您的 JSON 需要使用换行符分隔 JSON。 http://ndjson.org/
我觉得你的流水线没问题。它将 运行 并行,没有任何问题。仅供参考,如果你使用 FlatMap
只是为了过滤元素,你也可以使用 Filter
.