使用 Python 从 Google Cloud Storage 逐行读取巨大的 JSON

Read huge JSON line by line from Google Cloud Storage with Python

我知道我应该有一个代码,但我还没有任何有用的东西。

我的 GCS 上有 ~300GB JSON 文件 gs://path/listings_all.json 最后我试图将它导入 BigQuery 但它有一些错误的数据结构(我通过 mongoexport 从 MongoDB 获得它)

invalid field name "$date". Fields must contain only letters, numbers, and underscores, start with a letter or underscore, and be at most 128 characters long

所以,现在我的方法是以某种方式从 GCS 处理它逐行读取源文件,然后使用 python API.

将每个处理过的行上传到 BigQuery

下面是简单的 reader 我已经将原始大文件中的 100 行样本放在一起进行测试:

import json
from pprint import pprint

with open('schema_in_10.json') as f:
    for line in f:
        j_content = json.loads(line)

        # print(j_content['id'], j_content['city'], j_content['country'], j_content['state'], j_content['country_code'], j_content['smart_location'], j_content['address'], j_content['market'], j_content['neighborhood'])
        # // geo { lat, lng}'])
        print('------')
        pprint(j_content['is_location_exact'])
        pprint(j_content['zipcode'])
        pprint(j_content['name'])

能否请您帮助我如何使用 Python3 从 Google 云存储逐行读取或流式传输巨大的 JSON?

使用内置 json 解析器逐行解析 json 文件是行不通的(除非它实际上是 "json lines" doc of course), so you want a streaming parser

但是 虽然这会解决内存使用问题,但它不会修复无效的 json,所以最好的办法是先修复无效的 [=20] =] 将源作为纯文本文件,在 python 中或使用 sed 或一些类似的工具,然后使用增量解析器来解析您的内容。

def fixfile(sourcepath, destpath):
    with open(sourcepath) as source, open(destpath, "w") as dest:
        for line in source:
            # you may want to use a regexp if this simple solution
            # breaks something else
            line = line.replace("$date", "date")
            dest.write(line)

逐行读取它然后尝试流式传输到 BigQuery 不会在您的本地计算机上扩展到 300GB,您将很难让这个 TBH 正常工作。

有几个可扩展的选项:

  1. 编写 Cloud Dataflow 管道以从 GCS 读取您的文件(它将为您扩展并并行读取),更正字段名称,然后写入 BigQuery。参见 here
  2. 使用 CSV 而不是 JSON 作为格式并使用数据中未出现的分隔符将其直接加载到 BigQuery 中。这会将每条记录加载到单个字符串列中,然后您可以使用 BigQuery 的 JSON 函数来提取您需要的内容。参见 here

这是 GCP 数据流中的解决方案实施示例,对应于 . You'll need to implement the json correction in function json_processor. You can run this code in a Datalab 笔记本中的第一个建议。

# Datalab might need an older version of pip
# !pip install pip==9.0.3

import apache_beam as beam
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions

project_id = 'my-project'
bigquery_dataset_name = 'testdataset' # needs to exist 
table_name = 'testtable'
bucket_name = 'my-bucket'
json_file_gcs_path = 'gs://path/to/my/file.json'
schema = "name:STRING,zipcode:STRING"

def json_processor(row):
    import json
    d = json.loads(row)
    return {'name': d['name'], 'zipcode': d['zipcode']}

options = beam.options.pipeline_options.PipelineOptions()
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = project_id
google_cloud_options.job_name = "myjob"
google_cloud_options.staging_location = 'gs://{}/binaries'.format(bucket_name)
google_cloud_options.temp_location = 'gs://{}/temp'.format(bucket_name)
options.view_as(StandardOptions).runner = 'DataflowRunner'
google_cloud_options.region = "europe-west1"

p = beam.Pipeline(options=options)

(p | "read_from_gcs" >> beam.io.ReadFromText(json_file_gcs_path)
   | "json_processor" >> beam.Map(json_processor)
   | "write_to_bq" >> beam.io.Write(beam.io.gcp.bigquery.BigQuerySink(table=table_name, 
                                                       dataset=bigquery_dataset_name, 
                                                       project=project_id, 
                                                       schema=schema, 
                                                       create_disposition='CREATE_IF_NEEDED',
                                                       write_disposition='WRITE_EMPTY'))
)

p.run()

smart_open 现在支持流式传输 GCS 文件。

from smart_open import open

# stream from GCS
with open('gs://my_bucket/my_file.txt') as fin:
    for line in fin:
        print(line)

# stream content *into* GCS (write mode):
with open('gs://my_bucket/my_file.txt', 'wb') as fout:
    fout.write(b'hello world')