使用 read() 方法从 Amazon S3 读取大型 JSON 文件时出现内存错误
MemoryError when Using the read() Method in Reading a Large Size of JSON file from Amazon S3
我正在尝试使用 Python 从 Amazon S3 将大型 JSON 文件导入 AWS RDS-PostgreSQL。但是,这些错误发生了,
Traceback (most recent call last):
File "my_code.py", line 67, in
file_content = obj['Body'].read().decode('utf-8').splitlines(True)
File "/home/user/asd-to-qwe/fgh-to-hjk/env/local/lib/python3.6/site-packages/botocore/response.py", line 76, in read
chunk = self._raw_stream.read(amt)
File "/home/user/asd-to-qwe/fgh-to-hjk/env/local/lib/python3.6/site-packages/botocore/vendored/requests/packages/urllib3/response.py", line 239, in read
data = self._fp.read()
File "/usr/lib64/python3.6/http/client.py", line 462, in read
s = self._safe_read(self.length)
File "/usr/lib64/python3.6/http/client.py", line 617, in _safe_read
return b"".join(s)
MemoryError
// my_code.py
import sys
import boto3
import psycopg2
import zipfile
import io
import json
s3 = boto3.client('s3', aws_access_key_id=<aws_access_key_id>, aws_secret_access_key=<aws_secret_access_key>)
connection = psycopg2.connect(host=<host>, dbname=<dbname>, user=<user>, password=<password>)
cursor = connection.cursor()
bucket = sys.argv[1]
key = sys.argv[2]
obj = s3.get_object(Bucket=bucket, Key=key)
def insert_query(data):
query = """
INSERT INTO data_table
SELECT
(src.test->>'url')::varchar, (src.test->>'id')::bigint,
(src.test->>'external_id')::bigint, (src.test->>'via')::jsonb
FROM (SELECT CAST(%s AS JSONB) AS test) src
"""
cursor.execute(query, (json.dumps(data),))
if key.endswith('.zip'):
zip_files = obj['Body'].read()
with io.BytesIO(zip_files) as zf:
zf.seek(0)
with zipfile.ZipFile(zf, mode='r') as z:
for filename in z.namelist():
with z.open(filename) as f:
for line in f:
insert_query(json.loads(line.decode('utf-8')))
if key.endswith('.json'):
file_content = obj['Body'].read().decode('utf-8').splitlines(True)
for line in file_content:
insert_query(json.loads(line))
connection.commit()
connection.close()
这些问题有什么解决办法吗?任何帮助都可以,非常感谢!
避免将整个输入文件作为 list
行写入内存,可以节省大量资金。
具体来说,这些行在内存使用方面非常糟糕,因为它们涉及整个文件大小的 bytes
对象的峰值内存使用,加上 list
行的完整文件内容以及:
file_content = obj['Body'].read().decode('utf-8').splitlines(True)
for line in file_content:
对于一个 1 GB 的 500 万行 ASCII 文本文件,在 64 位 Python 3.3+ 上,just 的峰值内存需求大约为 2.3 GB bytes
对象、list
和 list
中的个体 str
。需要 2.3 倍 RAM 的程序将无法扩展到大文件。
要修复,请将原始代码更改为:
file_content = io.TextIOWrapper(obj['Body'], encoding='utf-8')
for line in file_content:
鉴于 obj['Body']
appears to be usable for lazy streaming 这应该从内存中删除 两个 完整文件数据的副本。使用 TextIOWrapper
意味着 obj['Body']
以块(一次几 KB)的形式延迟读取和解码,并且行也延迟迭代;这将内存需求减少到一个很小的、基本固定的数量(峰值内存成本将取决于最长行的长度),而不管文件大小。
更新:
看起来 StreamingBody
没有实现 io.BufferedIOBase
ABC。它确实有 its own documented API ,但可以用于类似的目的。如果你不能让 TextIOWrapper
为你完成工作(如果它可以工作,它会更有效率和简单),另一种方法是:
file_content = (line.decode('utf-8') for line in obj['Body'].iter_lines())
for line in file_content:
与使用 TextIOWrapper
不同,它不会从块的批量解码中获益(每一行都单独解码),但在减少内存使用方面它仍应获得相同的好处。
我正在尝试使用 Python 从 Amazon S3 将大型 JSON 文件导入 AWS RDS-PostgreSQL。但是,这些错误发生了,
Traceback (most recent call last):
File "my_code.py", line 67, in
file_content = obj['Body'].read().decode('utf-8').splitlines(True)
File "/home/user/asd-to-qwe/fgh-to-hjk/env/local/lib/python3.6/site-packages/botocore/response.py", line 76, in read
chunk = self._raw_stream.read(amt)
File "/home/user/asd-to-qwe/fgh-to-hjk/env/local/lib/python3.6/site-packages/botocore/vendored/requests/packages/urllib3/response.py", line 239, in read
data = self._fp.read()
File "/usr/lib64/python3.6/http/client.py", line 462, in read
s = self._safe_read(self.length)
File "/usr/lib64/python3.6/http/client.py", line 617, in _safe_read
return b"".join(s)
MemoryError
// my_code.py
import sys
import boto3
import psycopg2
import zipfile
import io
import json
s3 = boto3.client('s3', aws_access_key_id=<aws_access_key_id>, aws_secret_access_key=<aws_secret_access_key>)
connection = psycopg2.connect(host=<host>, dbname=<dbname>, user=<user>, password=<password>)
cursor = connection.cursor()
bucket = sys.argv[1]
key = sys.argv[2]
obj = s3.get_object(Bucket=bucket, Key=key)
def insert_query(data):
query = """
INSERT INTO data_table
SELECT
(src.test->>'url')::varchar, (src.test->>'id')::bigint,
(src.test->>'external_id')::bigint, (src.test->>'via')::jsonb
FROM (SELECT CAST(%s AS JSONB) AS test) src
"""
cursor.execute(query, (json.dumps(data),))
if key.endswith('.zip'):
zip_files = obj['Body'].read()
with io.BytesIO(zip_files) as zf:
zf.seek(0)
with zipfile.ZipFile(zf, mode='r') as z:
for filename in z.namelist():
with z.open(filename) as f:
for line in f:
insert_query(json.loads(line.decode('utf-8')))
if key.endswith('.json'):
file_content = obj['Body'].read().decode('utf-8').splitlines(True)
for line in file_content:
insert_query(json.loads(line))
connection.commit()
connection.close()
这些问题有什么解决办法吗?任何帮助都可以,非常感谢!
避免将整个输入文件作为 list
行写入内存,可以节省大量资金。
具体来说,这些行在内存使用方面非常糟糕,因为它们涉及整个文件大小的 bytes
对象的峰值内存使用,加上 list
行的完整文件内容以及:
file_content = obj['Body'].read().decode('utf-8').splitlines(True)
for line in file_content:
对于一个 1 GB 的 500 万行 ASCII 文本文件,在 64 位 Python 3.3+ 上,just 的峰值内存需求大约为 2.3 GB bytes
对象、list
和 list
中的个体 str
。需要 2.3 倍 RAM 的程序将无法扩展到大文件。
要修复,请将原始代码更改为:
file_content = io.TextIOWrapper(obj['Body'], encoding='utf-8')
for line in file_content:
鉴于 obj['Body']
appears to be usable for lazy streaming 这应该从内存中删除 两个 完整文件数据的副本。使用 TextIOWrapper
意味着 obj['Body']
以块(一次几 KB)的形式延迟读取和解码,并且行也延迟迭代;这将内存需求减少到一个很小的、基本固定的数量(峰值内存成本将取决于最长行的长度),而不管文件大小。
更新:
看起来 StreamingBody
没有实现 io.BufferedIOBase
ABC。它确实有 its own documented API ,但可以用于类似的目的。如果你不能让 TextIOWrapper
为你完成工作(如果它可以工作,它会更有效率和简单),另一种方法是:
file_content = (line.decode('utf-8') for line in obj['Body'].iter_lines())
for line in file_content:
与使用 TextIOWrapper
不同,它不会从块的批量解码中获益(每一行都单独解码),但在减少内存使用方面它仍应获得相同的好处。