如何使用 psycopg2 将 JSON 批量加载到 Postgres 中?
How can I bulk load JSON into Postgres using psycopg2?
我一直在从这样的文件中加载 json 数据:
with open("data.json") as jd:
print("loading json")
j = json.load(jd)
print("inserting")
SendToPostGres(j)
def SendToPostGres(incs):
length = len(incs)
processed = 0
pgParams = {
'database': 'mydb',
'user': 'hi',
'password': '2u',
'host': 'somedb.com',
'port': 1111
}
conn = psycopg2.connect(**pgParams)
curs = conn.cursor()
for i in incs:
curs.execute("insert into MY_TABLE (data) values (%s)", [Json(i)])
processed += 1
conn.commit()
print("%s processed, %s remaining" % (processed, length+1-processed))
这是非常低效的。我试过用谷歌搜索这个并查看其他帖子,但我似乎无法获得预期的效果:"For each item in my list of json, create a row in my database with the corresponding data stored as a json type in postgres."
有人可以向我解释批量执行此操作的最有效方法吗?
更新:
根据下面的回答,我尝试更新以使用 extras 中的 execute_values 函数。我现在收到的错误是:
"string index out of range"
请注意,我尝试更改页面大小,因为我认为这可能是相关的。我试过的没有用。但这可能仍然是一个问题。
def SendToPostGres(incs):
values = []
for i in incs:
values.append(json.dumps(i))
pgParams = {
'database': 'MY_DB',
'user': 'hi',
'password': '2u',
'host': 'somedb.com',
'port': 5432
}
conn = psycopg2.connect(**pgParams)
curs = conn.cursor()
try:
psycopg2.extras.execute_values(curs, "insert into incidents (data) values (%s)", values, page_size=len(values))
except Exception as e:
raise e
rows = curs.fetchall()
curs.close()
使用来自 psycopg2 的 extras.execute_values。
在您的查询中使用“%s”语法来指定应在何处注入值。
与您当前的方法相比,这非常快。
from psycopg2 import extras
def queryPostgresBulk(conn, query, values):
_query = query
_values = values
_conn = conn
_cur = _conn.cursor()
try:
extras.execute_values(_cur, _query, _values, page_size=_values.__len__())
except Exception, e:
raise e
rows = _cur.fetchall()
_cur.close()
return rows
OP 评论更新:
使用 json.dumps() 将您的字典列表转换为 字符串 元组 json 字符串的列表,这是函数期望的格式。将 json 字符串 元组 json 字符串的列表传递给它,而不是表示 json 对象的字典。
import json
_values = []
for dict in list
_values.append((json.dumps(dict),))
或列表理解:
_values = [(json.dumps(x),) for x in list]
另外值得指出的是,您正在加载的数据不是有效的 json 格式,在顶层没有单个键。
再次更新OP评论:
您需要提供一个元组列表作为值,json 字符串在该元组中。如果要在值中注入的唯一数据是 json 字符串,则需要将 for 循环构建值更新为:
for i in incs:
values.append((json.dumps(i),))
不知道我为什么要发布这个,因为你否决了我对你问题的两个早期版本的正确答案......希望它能帮助其他人。
我一直在从这样的文件中加载 json 数据:
with open("data.json") as jd:
print("loading json")
j = json.load(jd)
print("inserting")
SendToPostGres(j)
def SendToPostGres(incs):
length = len(incs)
processed = 0
pgParams = {
'database': 'mydb',
'user': 'hi',
'password': '2u',
'host': 'somedb.com',
'port': 1111
}
conn = psycopg2.connect(**pgParams)
curs = conn.cursor()
for i in incs:
curs.execute("insert into MY_TABLE (data) values (%s)", [Json(i)])
processed += 1
conn.commit()
print("%s processed, %s remaining" % (processed, length+1-processed))
这是非常低效的。我试过用谷歌搜索这个并查看其他帖子,但我似乎无法获得预期的效果:"For each item in my list of json, create a row in my database with the corresponding data stored as a json type in postgres."
有人可以向我解释批量执行此操作的最有效方法吗?
更新:
根据下面的回答,我尝试更新以使用 extras 中的 execute_values 函数。我现在收到的错误是:
"string index out of range"
请注意,我尝试更改页面大小,因为我认为这可能是相关的。我试过的没有用。但这可能仍然是一个问题。
def SendToPostGres(incs):
values = []
for i in incs:
values.append(json.dumps(i))
pgParams = {
'database': 'MY_DB',
'user': 'hi',
'password': '2u',
'host': 'somedb.com',
'port': 5432
}
conn = psycopg2.connect(**pgParams)
curs = conn.cursor()
try:
psycopg2.extras.execute_values(curs, "insert into incidents (data) values (%s)", values, page_size=len(values))
except Exception as e:
raise e
rows = curs.fetchall()
curs.close()
使用来自 psycopg2 的 extras.execute_values。
在您的查询中使用“%s”语法来指定应在何处注入值。
与您当前的方法相比,这非常快。
from psycopg2 import extras
def queryPostgresBulk(conn, query, values):
_query = query
_values = values
_conn = conn
_cur = _conn.cursor()
try:
extras.execute_values(_cur, _query, _values, page_size=_values.__len__())
except Exception, e:
raise e
rows = _cur.fetchall()
_cur.close()
return rows
OP 评论更新:
使用 json.dumps() 将您的字典列表转换为 字符串 元组 json 字符串的列表,这是函数期望的格式。将 json 字符串 元组 json 字符串的列表传递给它,而不是表示 json 对象的字典。
import json
_values = []
for dict in list
_values.append((json.dumps(dict),))
或列表理解:
_values = [(json.dumps(x),) for x in list]
另外值得指出的是,您正在加载的数据不是有效的 json 格式,在顶层没有单个键。
再次更新OP评论:
您需要提供一个元组列表作为值,json 字符串在该元组中。如果要在值中注入的唯一数据是 json 字符串,则需要将 for 循环构建值更新为:
for i in incs:
values.append((json.dumps(i),))
不知道我为什么要发布这个,因为你否决了我对你问题的两个早期版本的正确答案......希望它能帮助其他人。