使用 Python 将 JSON 数据列表导入 Postgresql
Importing List of JSON data into Postgresql using Python
我正在尝试使用 Python 将 JSON 数据列表导入 Postgresql,但是,我收到此错误
psycopg2.ProgrammingError: can't adapt type 'dict'
当我在 Postgresql 数据库中创建的 table 中放置具有 JSONB[] 数据类型的 "custom_fields" 字段时。有什么办法可以解决我的问题吗?任何帮助都可以。非常感谢
//sample.json
{
"url": "https://www.abcd.com/",
"id": 123456789,
"external_id": null,
"via": {
"channel": "email",
"id": 4,
"source": {
"from": {
"address": "abc@abc.com",
"name": "abc def"
},
"rel": null,
"to": {
"address": "def@def.com",
"name": "def"
}
}
},
"custom_fields": [
{
"id": 234567891,
"value": "abc_def_ghi"
},
{
"id": 345678912,
"value": null
},
{
"id": 456789123,
"value": null
}
]
},
{
"url": "http://wxyz.com/",
"id": 987654321,
"external_id": null,
"via": {
"channel": "email",
"id": 4,
"source": {
"from": {
"address": "xyz@xyz.com",
"name": "uvw xyz"
},
"rel": null,
"to": {
"address": "zxc@zxc.com",
"name": "zxc"
}
}
},
"custom_fields": [
{
"id": 876543219,
"value": "zxc_vbn_asd"
},
{
"id": 765432198,
"value": null
},
{
"id": 654321987,
"value": null
}
]
}
//my_code.py
import json
import psycopg2
connection = psycopg2.connect("host=localhost dbname=sample user=gerald password=1234")
cursor = connection.cursor()
data = []
with open('sample.json') as f:
for row in f:
data.append(json.loads(row))
fields = [
'url',
'id',
'external_id',
'via',
'custom_fields'
]
for item in data:
my_data = [item[field] for field in fields]
for key, value in enumerate(my_data):
if isinstance(value, dict):
my_data[key] = json.dumps(value)
insert_query = "INSERT INTO crm VALUES (%s, %s, %s, %s, %s) "
cursor.execute(insert_query, tuple(my_data))
connection.commit()
connection.close()
像 SQLAlchemy 这样的框架将是我进行此类操作的首选。
JSON 上例中的 payload 是嵌套的。您必须确保数据库中的各个目标数据库反映相同的架构。
FYR 示例:
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy_declarative import Base, Review
engine = create_engine('sqlite:///sqlalchemy_try.db') #you might want to tweak this dialect to db of you choice.
Base.metadata.bind = engine
DBSession = sessionmaker(bind=engine)
session = DBSession()
with open('sample.json') as f:
data=f.read()
jsondata=json.loads(data)
r = Review(jsondata['url'], int(jsondata['id']), jsondata['external_id'], jsondata['via']['channel']))
session.add(r)
session.commit()
我正在尝试使用 Python 将 JSON 数据列表导入 Postgresql,但是,我收到此错误
psycopg2.ProgrammingError: can't adapt type 'dict'
当我在 Postgresql 数据库中创建的 table 中放置具有 JSONB[] 数据类型的 "custom_fields" 字段时。有什么办法可以解决我的问题吗?任何帮助都可以。非常感谢
//sample.json
{
"url": "https://www.abcd.com/",
"id": 123456789,
"external_id": null,
"via": {
"channel": "email",
"id": 4,
"source": {
"from": {
"address": "abc@abc.com",
"name": "abc def"
},
"rel": null,
"to": {
"address": "def@def.com",
"name": "def"
}
}
},
"custom_fields": [
{
"id": 234567891,
"value": "abc_def_ghi"
},
{
"id": 345678912,
"value": null
},
{
"id": 456789123,
"value": null
}
]
},
{
"url": "http://wxyz.com/",
"id": 987654321,
"external_id": null,
"via": {
"channel": "email",
"id": 4,
"source": {
"from": {
"address": "xyz@xyz.com",
"name": "uvw xyz"
},
"rel": null,
"to": {
"address": "zxc@zxc.com",
"name": "zxc"
}
}
},
"custom_fields": [
{
"id": 876543219,
"value": "zxc_vbn_asd"
},
{
"id": 765432198,
"value": null
},
{
"id": 654321987,
"value": null
}
]
}
//my_code.py
import json
import psycopg2
connection = psycopg2.connect("host=localhost dbname=sample user=gerald password=1234")
cursor = connection.cursor()
data = []
with open('sample.json') as f:
for row in f:
data.append(json.loads(row))
fields = [
'url',
'id',
'external_id',
'via',
'custom_fields'
]
for item in data:
my_data = [item[field] for field in fields]
for key, value in enumerate(my_data):
if isinstance(value, dict):
my_data[key] = json.dumps(value)
insert_query = "INSERT INTO crm VALUES (%s, %s, %s, %s, %s) "
cursor.execute(insert_query, tuple(my_data))
connection.commit()
connection.close()
像 SQLAlchemy 这样的框架将是我进行此类操作的首选。
JSON 上例中的 payload 是嵌套的。您必须确保数据库中的各个目标数据库反映相同的架构。
FYR 示例:
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy_declarative import Base, Review
engine = create_engine('sqlite:///sqlalchemy_try.db') #you might want to tweak this dialect to db of you choice.
Base.metadata.bind = engine
DBSession = sessionmaker(bind=engine)
session = DBSession()
with open('sample.json') as f:
data=f.read()
jsondata=json.loads(data)
r = Review(jsondata['url'], int(jsondata['id']), jsondata['external_id'], jsondata['via']['channel']))
session.add(r)
session.commit()