如何在 Python 中使用 Apache Beam 读取和操作 Json 文件

How to read and manipulate a Json file with Apache beam in Python

我有一个 JSON 格式的 .txt 文件。我想阅读、操作和重组文件(更改字段名称...) 我如何在 Python 中使用 Apache Beam 执行此操作?

为了能够在 Python 上使用 Apache Beam 读取 Json 文件,您可以制作自定义编码器:

CF : https://beam.apache.org/documentation/programming-guide/#specifying-coders

class JsonCoder(object):
"""A JSON coder interpreting each line as a JSON string."""

def encode(self, x):
    return json.dumps(x)

def decode(self, x):
    return json.loads(x)

然后在读取或写入数据时必须指定它,例如:

lines = p | 'read_data' >> ReadFromText(known_args.input, coder=JsonCoder())

此致,工作顺利 ;)

假设您有这样的示例数据:

{
    "magic": "atMSG",
    "type": "DT",
    "headers": null,
    "messageschemaid": null,
    "messageschema": null,
    "message": {
        "data": {
            "Column_Name_1": "data_in_quotes",
            "Column_Name_2": "data_in_quotes",
            "Column_Name_n": "data_in_quotes"
        },
        "beforeData": null,
        "headers": {
            "operation": "INSERT",
            "changeSequence": "20200822230048000000000017887787417",
            "timestamp": "2020-08-22T23:00:48.000",
            "streamPosition": "00003EB9_0000000000000006_00000F4D9C6F8AFF01000001000CD387000C00580188000100000F4D9C333900",
            "transactionId": "some_id"
        }
    }
}

并且您只想从以下位置读取数据: “消息”:{“数据”:{“Column_Name_1”:“data_in_quotes”,“Column_Name_2”:“data_in_quotes”,“Column_Name_n”: “data_in_quotes”}

我使用下面的代码来读取这种类型的 NEWLINE_DELIMITED_JSON 并写入 bigquery:

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
import json  
from pandas.io.json import json_normalize

class Printer(beam.DoFn):
 def process(self,data_item):
    print (data_item)
def printer(data_item):
    print (data_item)
    
class custom_json_parser(beam.DoFn):
    def process(self, element):
        norm = json_normalize(element, max_level=1)
        l = norm["message.data"].to_list()
        return l
table_schema = 'Column_name_1:Data_Type,Column_name_2:Data_Type,Column_name_n:Data_Type'

options = PipelineOptions()
p = beam.Pipeline(options=options)

projectId='your_project_id'
datasetId='Landing'

data_from_source = (p
                    | "READ FROM JSON" >> ReadFromText("gs://bucket/folder/file_name_having json data")
                    | "PARSE JSON" >> beam.Map(json.loads)
                    | "CUSTOM JOSN PARSE" >> beam.ParDo(custom_json_parser())
                    #| "PRINT DATA" >> beam.ParDo(Printer()) <- uncomment this line to see data onto GCP Dataflow Notebooks Console
                    #| WriteToText( "gs://ti-project-1/output/",".txt") <- to write it to text file
                    |"WriteToBigQuery" >> beam.io.WriteToBigQuery(
                        "{0}:{1}.table_name".format(projectId, datasetId),
                        schema=table_schema,
                    #    write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                        write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
                        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
                    )
)

result = p.run()

以上代码将做以下事情:

  1. 读取并解析 JSON 文件
  2. 在 GCP BigQuery
  3. 中创建 table
  4. 在 TRUNCATE 模式下将数据加载到 table。对于追加,注释 WRITE_TRUNCATE 并取消注释 WRITE_APPEND