使用 Python 从 Google Cloud Storage 逐行读取巨大的 JSON
Read huge JSON line by line from Google Cloud Storage with Python
我知道我应该有一个代码,但我还没有任何有用的东西。
我的 GCS 上有 ~300GB JSON 文件 gs://path/listings_all.json
最后我试图将它导入 BigQuery 但它有一些错误的数据结构(我通过 mongoexport
从 MongoDB 获得它)
invalid field name "$date". Fields must contain
only letters, numbers, and underscores, start with a letter or
underscore, and be at most 128 characters long
所以,现在我的方法是以某种方式从 GCS 处理它逐行读取源文件,然后使用 python API.
将每个处理过的行上传到 BigQuery
下面是简单的 reader 我已经将原始大文件中的 100 行样本放在一起进行测试:
import json
from pprint import pprint
with open('schema_in_10.json') as f:
for line in f:
j_content = json.loads(line)
# print(j_content['id'], j_content['city'], j_content['country'], j_content['state'], j_content['country_code'], j_content['smart_location'], j_content['address'], j_content['market'], j_content['neighborhood'])
# // geo { lat, lng}'])
print('------')
pprint(j_content['is_location_exact'])
pprint(j_content['zipcode'])
pprint(j_content['name'])
能否请您帮助我如何使用 Python3 从 Google 云存储逐行读取或流式传输巨大的 JSON?
使用内置 json 解析器逐行解析 json 文件是行不通的(除非它实际上是 "json lines" doc of course), so you want a streaming parser。
但是 虽然这会解决内存使用问题,但它不会修复无效的 json,所以最好的办法是先修复无效的 [=20] =] 将源作为纯文本文件,在 python 中或使用 sed
或一些类似的工具,然后使用增量解析器来解析您的内容。
def fixfile(sourcepath, destpath):
with open(sourcepath) as source, open(destpath, "w") as dest:
for line in source:
# you may want to use a regexp if this simple solution
# breaks something else
line = line.replace("$date", "date")
dest.write(line)
逐行读取它然后尝试流式传输到 BigQuery 不会在您的本地计算机上扩展到 300GB,您将很难让这个 TBH 正常工作。
有几个可扩展的选项:
- 编写 Cloud Dataflow 管道以从 GCS 读取您的文件(它将为您扩展并并行读取),更正字段名称,然后写入 BigQuery。参见 here。
- 使用 CSV 而不是 JSON 作为格式并使用数据中未出现的分隔符将其直接加载到 BigQuery 中。这会将每条记录加载到单个字符串列中,然后您可以使用 BigQuery 的 JSON 函数来提取您需要的内容。参见 here。
这是 GCP 数据流中的解决方案实施示例,对应于 . You'll need to implement the json correction in function json_processor. You can run this code in a Datalab 笔记本中的第一个建议。
# Datalab might need an older version of pip
# !pip install pip==9.0.3
import apache_beam as beam
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
project_id = 'my-project'
bigquery_dataset_name = 'testdataset' # needs to exist
table_name = 'testtable'
bucket_name = 'my-bucket'
json_file_gcs_path = 'gs://path/to/my/file.json'
schema = "name:STRING,zipcode:STRING"
def json_processor(row):
import json
d = json.loads(row)
return {'name': d['name'], 'zipcode': d['zipcode']}
options = beam.options.pipeline_options.PipelineOptions()
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = project_id
google_cloud_options.job_name = "myjob"
google_cloud_options.staging_location = 'gs://{}/binaries'.format(bucket_name)
google_cloud_options.temp_location = 'gs://{}/temp'.format(bucket_name)
options.view_as(StandardOptions).runner = 'DataflowRunner'
google_cloud_options.region = "europe-west1"
p = beam.Pipeline(options=options)
(p | "read_from_gcs" >> beam.io.ReadFromText(json_file_gcs_path)
| "json_processor" >> beam.Map(json_processor)
| "write_to_bq" >> beam.io.Write(beam.io.gcp.bigquery.BigQuerySink(table=table_name,
dataset=bigquery_dataset_name,
project=project_id,
schema=schema,
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_EMPTY'))
)
p.run()
smart_open 现在支持流式传输 GCS 文件。
from smart_open import open
# stream from GCS
with open('gs://my_bucket/my_file.txt') as fin:
for line in fin:
print(line)
# stream content *into* GCS (write mode):
with open('gs://my_bucket/my_file.txt', 'wb') as fout:
fout.write(b'hello world')
我知道我应该有一个代码,但我还没有任何有用的东西。
我的 GCS 上有 ~300GB JSON 文件 gs://path/listings_all.json
最后我试图将它导入 BigQuery 但它有一些错误的数据结构(我通过 mongoexport
从 MongoDB 获得它)
invalid field name "$date". Fields must contain only letters, numbers, and underscores, start with a letter or underscore, and be at most 128 characters long
所以,现在我的方法是以某种方式从 GCS 处理它逐行读取源文件,然后使用 python API.
将每个处理过的行上传到 BigQuery下面是简单的 reader 我已经将原始大文件中的 100 行样本放在一起进行测试:
import json
from pprint import pprint
with open('schema_in_10.json') as f:
for line in f:
j_content = json.loads(line)
# print(j_content['id'], j_content['city'], j_content['country'], j_content['state'], j_content['country_code'], j_content['smart_location'], j_content['address'], j_content['market'], j_content['neighborhood'])
# // geo { lat, lng}'])
print('------')
pprint(j_content['is_location_exact'])
pprint(j_content['zipcode'])
pprint(j_content['name'])
能否请您帮助我如何使用 Python3 从 Google 云存储逐行读取或流式传输巨大的 JSON?
使用内置 json 解析器逐行解析 json 文件是行不通的(除非它实际上是 "json lines" doc of course), so you want a streaming parser。
但是 虽然这会解决内存使用问题,但它不会修复无效的 json,所以最好的办法是先修复无效的 [=20] =] 将源作为纯文本文件,在 python 中或使用 sed
或一些类似的工具,然后使用增量解析器来解析您的内容。
def fixfile(sourcepath, destpath):
with open(sourcepath) as source, open(destpath, "w") as dest:
for line in source:
# you may want to use a regexp if this simple solution
# breaks something else
line = line.replace("$date", "date")
dest.write(line)
逐行读取它然后尝试流式传输到 BigQuery 不会在您的本地计算机上扩展到 300GB,您将很难让这个 TBH 正常工作。
有几个可扩展的选项:
- 编写 Cloud Dataflow 管道以从 GCS 读取您的文件(它将为您扩展并并行读取),更正字段名称,然后写入 BigQuery。参见 here。
- 使用 CSV 而不是 JSON 作为格式并使用数据中未出现的分隔符将其直接加载到 BigQuery 中。这会将每条记录加载到单个字符串列中,然后您可以使用 BigQuery 的 JSON 函数来提取您需要的内容。参见 here。
这是 GCP 数据流中的解决方案实施示例,对应于
# Datalab might need an older version of pip
# !pip install pip==9.0.3
import apache_beam as beam
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
project_id = 'my-project'
bigquery_dataset_name = 'testdataset' # needs to exist
table_name = 'testtable'
bucket_name = 'my-bucket'
json_file_gcs_path = 'gs://path/to/my/file.json'
schema = "name:STRING,zipcode:STRING"
def json_processor(row):
import json
d = json.loads(row)
return {'name': d['name'], 'zipcode': d['zipcode']}
options = beam.options.pipeline_options.PipelineOptions()
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = project_id
google_cloud_options.job_name = "myjob"
google_cloud_options.staging_location = 'gs://{}/binaries'.format(bucket_name)
google_cloud_options.temp_location = 'gs://{}/temp'.format(bucket_name)
options.view_as(StandardOptions).runner = 'DataflowRunner'
google_cloud_options.region = "europe-west1"
p = beam.Pipeline(options=options)
(p | "read_from_gcs" >> beam.io.ReadFromText(json_file_gcs_path)
| "json_processor" >> beam.Map(json_processor)
| "write_to_bq" >> beam.io.Write(beam.io.gcp.bigquery.BigQuerySink(table=table_name,
dataset=bigquery_dataset_name,
project=project_id,
schema=schema,
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_EMPTY'))
)
p.run()
smart_open 现在支持流式传输 GCS 文件。
from smart_open import open
# stream from GCS
with open('gs://my_bucket/my_file.txt') as fin:
for line in fin:
print(line)
# stream content *into* GCS (write mode):
with open('gs://my_bucket/my_file.txt', 'wb') as fout:
fout.write(b'hello world')