如何使用 psycopg2 将 JSON 批量加载到 Postgres 中?

How can I bulk load JSON into Postgres using psycopg2?

我一直在从这样的文件中加载 json 数据:

with open("data.json") as jd:
    print("loading json")
    j = json.load(jd)
    print("inserting")
    SendToPostGres(j)

def SendToPostGres(incs):
    length = len(incs)
    processed = 0
    pgParams = {
            'database': 'mydb',
            'user': 'hi',
            'password': '2u',
            'host': 'somedb.com',
            'port': 1111
            }
    conn = psycopg2.connect(**pgParams)
    curs = conn.cursor()

    for i in incs:
        curs.execute("insert into MY_TABLE (data) values (%s)", [Json(i)])
        processed += 1
        conn.commit()
        print("%s processed, %s remaining" % (processed, length+1-processed))

这是非常低效的。我试过用谷歌搜索这个并查看其他帖子,但我似乎无法获得预期的效果:"For each item in my list of json, create a row in my database with the corresponding data stored as a json type in postgres."

有人可以向我解释批量执行此操作的最有效方法吗?

更新:

根据下面的回答,我尝试更新以使用 extras 中的 execute_values 函数。我现在收到的错误是:

"string index out of range"

请注意,我尝试更改页面大小,因为我认为这可能是相关的。我试过的没有用。但这可能仍然是一个问题。

def SendToPostGres(incs):
    values = []
    for i in incs:
        values.append(json.dumps(i))

    pgParams = {
            'database': 'MY_DB',
            'user': 'hi',
            'password': '2u',
            'host': 'somedb.com',
            'port': 5432
            }
    conn = psycopg2.connect(**pgParams)
    curs = conn.cursor()

    try:
        psycopg2.extras.execute_values(curs, "insert into incidents (data) values (%s)", values, page_size=len(values))
    except Exception as e:
        raise e
    rows = curs.fetchall()
    curs.close()

使用来自 psycopg2 的 extras.execute_values。

在您的查询中使用“%s”语法来指定应在何处注入值。

与您当前的方法相比,这非常快。

from psycopg2 import extras

def queryPostgresBulk(conn, query, values):

    _query = query
    _values = values
    _conn = conn
    _cur = _conn.cursor()
    try:
        extras.execute_values(_cur, _query, _values, page_size=_values.__len__())
    except Exception, e:
        raise e
    rows = _cur.fetchall()
    _cur.close()

    return rows

OP 评论更新:

使用 json.dumps() 将您的字典列表转换为 字符串 元组 json 字符串的列表,这是函数期望的格式。将 json 字符串 元组 json 字符串的列表传递给它,而不是表示 json 对象的字典。

import json

_values = []
for dict in list
    _values.append((json.dumps(dict),))

或列表理解:

_values = [(json.dumps(x),) for x in list]

另外值得指出的是,您正在加载的数据不是有效的 json 格式,在顶层没有单个键。

再次更新OP评论:

您需要提供一个元组列表作为值,json 字符串在该元组中。如果要在值中注入的唯一数据是 json 字符串,则需要将 for 循环构建值更新为:

for i in incs:
    values.append((json.dumps(i),))

不知道我为什么要发布这个,因为你否决了我对你问题的两个早期版本的正确答案......希望它能帮助其他人。