如何在 Python 中使用 Apache Beam 读取和操作 Json 文件
How to read and manipulate a Json file with Apache beam in Python
我有一个 JSON 格式的 .txt 文件。我想阅读、操作和重组文件(更改字段名称...)
我如何在 Python 中使用 Apache Beam 执行此操作?
为了能够在 Python 上使用 Apache Beam 读取 Json 文件,您可以制作自定义编码器:
CF : https://beam.apache.org/documentation/programming-guide/#specifying-coders
class JsonCoder(object):
"""A JSON coder interpreting each line as a JSON string."""
def encode(self, x):
return json.dumps(x)
def decode(self, x):
return json.loads(x)
然后在读取或写入数据时必须指定它,例如:
lines = p | 'read_data' >> ReadFromText(known_args.input, coder=JsonCoder())
此致,工作顺利 ;)
假设您有这样的示例数据:
{
"magic": "atMSG",
"type": "DT",
"headers": null,
"messageschemaid": null,
"messageschema": null,
"message": {
"data": {
"Column_Name_1": "data_in_quotes",
"Column_Name_2": "data_in_quotes",
"Column_Name_n": "data_in_quotes"
},
"beforeData": null,
"headers": {
"operation": "INSERT",
"changeSequence": "20200822230048000000000017887787417",
"timestamp": "2020-08-22T23:00:48.000",
"streamPosition": "00003EB9_0000000000000006_00000F4D9C6F8AFF01000001000CD387000C00580188000100000F4D9C333900",
"transactionId": "some_id"
}
}
}
并且您只想从以下位置读取数据:
“消息”:{“数据”:{“Column_Name_1”:“data_in_quotes”,“Column_Name_2”:“data_in_quotes”,“Column_Name_n”: “data_in_quotes”}
我使用下面的代码来读取这种类型的 NEWLINE_DELIMITED_JSON 并写入 bigquery:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
import json
from pandas.io.json import json_normalize
class Printer(beam.DoFn):
def process(self,data_item):
print (data_item)
def printer(data_item):
print (data_item)
class custom_json_parser(beam.DoFn):
def process(self, element):
norm = json_normalize(element, max_level=1)
l = norm["message.data"].to_list()
return l
table_schema = 'Column_name_1:Data_Type,Column_name_2:Data_Type,Column_name_n:Data_Type'
options = PipelineOptions()
p = beam.Pipeline(options=options)
projectId='your_project_id'
datasetId='Landing'
data_from_source = (p
| "READ FROM JSON" >> ReadFromText("gs://bucket/folder/file_name_having json data")
| "PARSE JSON" >> beam.Map(json.loads)
| "CUSTOM JOSN PARSE" >> beam.ParDo(custom_json_parser())
#| "PRINT DATA" >> beam.ParDo(Printer()) <- uncomment this line to see data onto GCP Dataflow Notebooks Console
#| WriteToText( "gs://ti-project-1/output/",".txt") <- to write it to text file
|"WriteToBigQuery" >> beam.io.WriteToBigQuery(
"{0}:{1}.table_name".format(projectId, datasetId),
schema=table_schema,
# write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
)
)
result = p.run()
以上代码将做以下事情:
- 读取并解析 JSON 文件
- 在 GCP BigQuery
中创建 table
- 在 TRUNCATE 模式下将数据加载到 table。对于追加,注释 WRITE_TRUNCATE 并取消注释 WRITE_APPEND
我有一个 JSON 格式的 .txt 文件。我想阅读、操作和重组文件(更改字段名称...) 我如何在 Python 中使用 Apache Beam 执行此操作?
为了能够在 Python 上使用 Apache Beam 读取 Json 文件,您可以制作自定义编码器:
CF : https://beam.apache.org/documentation/programming-guide/#specifying-coders
class JsonCoder(object):
"""A JSON coder interpreting each line as a JSON string."""
def encode(self, x):
return json.dumps(x)
def decode(self, x):
return json.loads(x)
然后在读取或写入数据时必须指定它,例如:
lines = p | 'read_data' >> ReadFromText(known_args.input, coder=JsonCoder())
此致,工作顺利 ;)
假设您有这样的示例数据:
{
"magic": "atMSG",
"type": "DT",
"headers": null,
"messageschemaid": null,
"messageschema": null,
"message": {
"data": {
"Column_Name_1": "data_in_quotes",
"Column_Name_2": "data_in_quotes",
"Column_Name_n": "data_in_quotes"
},
"beforeData": null,
"headers": {
"operation": "INSERT",
"changeSequence": "20200822230048000000000017887787417",
"timestamp": "2020-08-22T23:00:48.000",
"streamPosition": "00003EB9_0000000000000006_00000F4D9C6F8AFF01000001000CD387000C00580188000100000F4D9C333900",
"transactionId": "some_id"
}
}
}
并且您只想从以下位置读取数据: “消息”:{“数据”:{“Column_Name_1”:“data_in_quotes”,“Column_Name_2”:“data_in_quotes”,“Column_Name_n”: “data_in_quotes”}
我使用下面的代码来读取这种类型的 NEWLINE_DELIMITED_JSON 并写入 bigquery:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
import json
from pandas.io.json import json_normalize
class Printer(beam.DoFn):
def process(self,data_item):
print (data_item)
def printer(data_item):
print (data_item)
class custom_json_parser(beam.DoFn):
def process(self, element):
norm = json_normalize(element, max_level=1)
l = norm["message.data"].to_list()
return l
table_schema = 'Column_name_1:Data_Type,Column_name_2:Data_Type,Column_name_n:Data_Type'
options = PipelineOptions()
p = beam.Pipeline(options=options)
projectId='your_project_id'
datasetId='Landing'
data_from_source = (p
| "READ FROM JSON" >> ReadFromText("gs://bucket/folder/file_name_having json data")
| "PARSE JSON" >> beam.Map(json.loads)
| "CUSTOM JOSN PARSE" >> beam.ParDo(custom_json_parser())
#| "PRINT DATA" >> beam.ParDo(Printer()) <- uncomment this line to see data onto GCP Dataflow Notebooks Console
#| WriteToText( "gs://ti-project-1/output/",".txt") <- to write it to text file
|"WriteToBigQuery" >> beam.io.WriteToBigQuery(
"{0}:{1}.table_name".format(projectId, datasetId),
schema=table_schema,
# write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
)
)
result = p.run()
以上代码将做以下事情:
- 读取并解析 JSON 文件
- 在 GCP BigQuery 中创建 table
- 在 TRUNCATE 模式下将数据加载到 table。对于追加,注释 WRITE_TRUNCATE 并取消注释 WRITE_APPEND