使用 read() 方法从 Amazon S3 读取大型 JSON 文件时出现内存错误

MemoryError when Using the read() Method in Reading a Large Size of JSON file from Amazon S3

我正在尝试使用 Python 从 Amazon S3 将大型 JSON 文件导入 AWS RDS-PostgreSQL。但是,这些错误发生了,

Traceback (most recent call last):

File "my_code.py", line 67, in

file_content = obj['Body'].read().decode('utf-8').splitlines(True)

File "/home/user/asd-to-qwe/fgh-to-hjk/env/local/lib/python3.6/site-packages/botocore/response.py", line 76, in read

chunk = self._raw_stream.read(amt)

File "/home/user/asd-to-qwe/fgh-to-hjk/env/local/lib/python3.6/site-packages/botocore/vendored/requests/packages/urllib3/response.py", line 239, in read

data = self._fp.read()

File "/usr/lib64/python3.6/http/client.py", line 462, in read

s = self._safe_read(self.length)

File "/usr/lib64/python3.6/http/client.py", line 617, in _safe_read

return b"".join(s)

MemoryError

// my_code.py

import sys
import boto3
import psycopg2
import zipfile
import io
import json

s3 = boto3.client('s3', aws_access_key_id=<aws_access_key_id>, aws_secret_access_key=<aws_secret_access_key>)
connection = psycopg2.connect(host=<host>, dbname=<dbname>, user=<user>, password=<password>)
cursor = connection.cursor()

bucket = sys.argv[1]
key = sys.argv[2]
obj = s3.get_object(Bucket=bucket, Key=key)

def insert_query(data):
    query = """
        INSERT INTO data_table
        SELECT
            (src.test->>'url')::varchar, (src.test->>'id')::bigint,
            (src.test->>'external_id')::bigint, (src.test->>'via')::jsonb
        FROM (SELECT CAST(%s AS JSONB) AS test) src
    """
    cursor.execute(query, (json.dumps(data),))


if key.endswith('.zip'):
    zip_files = obj['Body'].read()
    with io.BytesIO(zip_files) as zf:
        zf.seek(0)
        with zipfile.ZipFile(zf, mode='r') as z:
            for filename in z.namelist():
                with z.open(filename) as f:
                    for line in f:
                        insert_query(json.loads(line.decode('utf-8')))
if key.endswith('.json'):
    file_content = obj['Body'].read().decode('utf-8').splitlines(True)
    for line in file_content:
        insert_query(json.loads(line))


connection.commit()
connection.close()

这些问题有什么解决办法吗?任何帮助都可以,非常感谢!

避免将整个输入文件作为 list 行写入内存,可以节省大量资金。

具体来说,这些行在内存使用方面非常糟糕,因为它们涉及整个文件大小的 bytes 对象的峰值内存使用,加上 list 行的完整文件内容以及:

file_content = obj['Body'].read().decode('utf-8').splitlines(True)
for line in file_content:

对于一个 1 GB 的 500 万行 ASCII 文本文件,在 64 位 Python 3.3+ 上,just 的峰值内存需求大约为 2.3 GB bytes 对象、listlist 中的个体 str。需要 2.3 倍 RAM 的程序将无法扩展到大文件。

要修复,请将原始代码更改为:

file_content = io.TextIOWrapper(obj['Body'], encoding='utf-8')
for line in file_content:

鉴于 obj['Body'] appears to be usable for lazy streaming 这应该从内存中删除 两个 完整文件数据的副本。使用 TextIOWrapper 意味着 obj['Body'] 以块(一次几 KB)的形式延迟读取和解码,并且行也延迟迭代;这将内存需求减少到一个很小的、基本固定的数量(峰值内存成本将取决于最长行的长度),而不管文件大小。

更新:

看起来 StreamingBody 没有实现 io.BufferedIOBase ABC。它确实有 its own documented API ,但可以用于类似的目的。如果你不能让 TextIOWrapper 为你完成工作(如果它可以工作,它会更有效率和简单),另一种方法是:

file_content = (line.decode('utf-8') for line in obj['Body'].iter_lines())
for line in file_content:

与使用 TextIOWrapper 不同,它不会从块的批量解码中获益(每一行都单独解码),但在减少内存使用方面它仍应获得相同的好处。