使用 psycopg2 批量更新 Postgres DB 中的行
Bulk update of rows in Postgres DB using psycopg2
我们需要对 Postgres 数据库中的许多行进行批量更新,并希望使用下面的 SQL 语法。我们如何使用 psycopg2 做到这一点?
UPDATE table_to_be_updated
SET msg = update_payload.msg
FROM (VALUES %(update_payload)s) AS update_payload(id, msg)
WHERE table_to_be_updated.id = update_payload.id
RETURNING *
尝试 1 - 传递值
我们需要将嵌套的可迭代格式传递给 psycopg2 查询。对于 update_payload
,我尝试传递列表列表、元组列表和元组的元组。这一切都因各种错误而失败。
尝试 2 - 使用 __conform__
编写自定义 class
我已经尝试编写一个自定义的 class,我们可以将其用于这些操作,这将 return
(VALUES (row1_col1, row1_col2), (row2_col1, row2_col2), (...))
我按照说明 here 编写了这样的代码,但很明显我做错了什么。例如,在这种方法中,我将不得不处理 table 中所有值的引用,这会很麻烦并且容易出错。
class ValuesTable(list):
def __init__(self, *args, **kwargs):
super(ValuesTable, self).__init__(*args, **kwargs)
def __repr__(self):
data_in_sql = ""
for row in self:
str_values = ", ".join([str(value) for value in row])
data_in_sql += "({})".format(str_values)
return "(VALUES {})".format(data_in_sql)
def __conform__(self, proto):
return self.__repr__()
def getquoted(self):
return self.__repr__()
def __str__(self):
return self.__repr__()
编辑: 如果批量更新可以 faster/cleaner 方式使用另一种语法而不是我原来的问题中的语法来完成,那么我洗耳恭听!
要求:
- Postgres table,由字段 id 和 msg(以及可能的其他字段)组成
- Python 数据包含 msg
的新值
- Postgres table 应该通过 psycopg2
更新
例子Table
CREATE TABLE einstein(
id CHAR(5) PRIMARY KEY,
msg VARCHAR(1024) NOT NULL
);
测试数据
INSERT INTO einstein VALUES ('a', 'empty');
INSERT INTO einstein VALUES ('b', 'empty');
INSERT INTO einstein VALUES ('c', 'empty');
Python 计划
假设的、独立的示例程序,引用了一位著名物理学家的话。
import sys
import psycopg2
from psycopg2.extras import execute_values
def print_table(con):
cur = con.cursor()
cur.execute("SELECT * FROM einstein")
rows = cur.fetchall()
for row in rows:
print(f"{row[0]} {row[1]}")
def update(con, einstein_quotes):
cur = con.cursor()
execute_values(cur, """UPDATE einstein
SET msg = update_payload.msg
FROM (VALUES %s) AS update_payload (id, msg)
WHERE einstein.id = update_payload.id""", einstein_quotes)
con.commit()
def main():
con = None
einstein_quotes = [("a", "Few are those who see with their own eyes and feel with their own hearts."),
("b", "I have no special talent. I am only passionately curious."),
("c", "Life is like riding a bicycle. To keep your balance you must keep moving.")]
try:
con = psycopg2.connect("dbname='stephan' user='stephan' host='localhost' password=''")
print_table(con)
update(con, einstein_quotes)
print("rows updated:")
print_table(con)
except psycopg2.DatabaseError as e:
print(f'Error {e}')
sys.exit(1)
finally:
if con:
con.close()
if __name__ == '__main__':
main()
备选语句
import sys
import psycopg2
from psycopg2.extras import execute_batch
def print_table(con):
cur = con.cursor()
cur.execute("SELECT * FROM einstein")
rows = cur.fetchall()
for row in rows:
print(f"{row[0]} {row[1]}")
def update(con, einstein_quotes, page_size):
cur = con.cursor()
cur.execute("PREPARE updateStmt AS UPDATE einstein SET msg= WHERE id=")
execute_batch(cur, "EXECUTE updateStmt (%(msg)s, %(id)s)", einstein_quotes, page_size=page_size)
cur.execute("DEALLOCATE updateStmt")
con.commit()
def main():
con = None
einstein_quotes = ({"id": "a", "msg": "Few are those who see with their own eyes and feel with their own hearts."},
{"id": "b", "msg": "I have no special talent. I am only passionately curious."},
{"id": "c", "msg": "Life is like riding a bicycle. To keep your balance you must keep moving."})
try:
con = psycopg2.connect("dbname='stephan' user='stephan' host='localhost' password=''")
print_table(con)
update(con, einstein_quotes, 100) #choose some meaningful page_size here
print("rows updated:")
print_table(con)
except psycopg2.DatabaseError as e:
print(f'Error {e}')
sys.exit(1)
finally:
if con:
con.close()
if __name__ == '__main__':
main()
输出
上述程序将向调试控制台输出以下内容:
a empty
b empty
c empty
rows updated:
a Few are those who see with their own eyes and feel with their own hearts.
b I have no special talent. I am only passionately curious.
c Life is like riding a bicycle. To keep your balance you must keep moving.
我们需要对 Postgres 数据库中的许多行进行批量更新,并希望使用下面的 SQL 语法。我们如何使用 psycopg2 做到这一点?
UPDATE table_to_be_updated
SET msg = update_payload.msg
FROM (VALUES %(update_payload)s) AS update_payload(id, msg)
WHERE table_to_be_updated.id = update_payload.id
RETURNING *
尝试 1 - 传递值
我们需要将嵌套的可迭代格式传递给 psycopg2 查询。对于 update_payload
,我尝试传递列表列表、元组列表和元组的元组。这一切都因各种错误而失败。
尝试 2 - 使用 __conform__
编写自定义 class我已经尝试编写一个自定义的 class,我们可以将其用于这些操作,这将 return
(VALUES (row1_col1, row1_col2), (row2_col1, row2_col2), (...))
我按照说明 here 编写了这样的代码,但很明显我做错了什么。例如,在这种方法中,我将不得不处理 table 中所有值的引用,这会很麻烦并且容易出错。
class ValuesTable(list):
def __init__(self, *args, **kwargs):
super(ValuesTable, self).__init__(*args, **kwargs)
def __repr__(self):
data_in_sql = ""
for row in self:
str_values = ", ".join([str(value) for value in row])
data_in_sql += "({})".format(str_values)
return "(VALUES {})".format(data_in_sql)
def __conform__(self, proto):
return self.__repr__()
def getquoted(self):
return self.__repr__()
def __str__(self):
return self.__repr__()
编辑: 如果批量更新可以 faster/cleaner 方式使用另一种语法而不是我原来的问题中的语法来完成,那么我洗耳恭听!
要求:
- Postgres table,由字段 id 和 msg(以及可能的其他字段)组成
- Python 数据包含 msg 的新值
- Postgres table 应该通过 psycopg2 更新
例子Table
CREATE TABLE einstein(
id CHAR(5) PRIMARY KEY,
msg VARCHAR(1024) NOT NULL
);
测试数据
INSERT INTO einstein VALUES ('a', 'empty');
INSERT INTO einstein VALUES ('b', 'empty');
INSERT INTO einstein VALUES ('c', 'empty');
Python 计划
假设的、独立的示例程序,引用了一位著名物理学家的话。
import sys
import psycopg2
from psycopg2.extras import execute_values
def print_table(con):
cur = con.cursor()
cur.execute("SELECT * FROM einstein")
rows = cur.fetchall()
for row in rows:
print(f"{row[0]} {row[1]}")
def update(con, einstein_quotes):
cur = con.cursor()
execute_values(cur, """UPDATE einstein
SET msg = update_payload.msg
FROM (VALUES %s) AS update_payload (id, msg)
WHERE einstein.id = update_payload.id""", einstein_quotes)
con.commit()
def main():
con = None
einstein_quotes = [("a", "Few are those who see with their own eyes and feel with their own hearts."),
("b", "I have no special talent. I am only passionately curious."),
("c", "Life is like riding a bicycle. To keep your balance you must keep moving.")]
try:
con = psycopg2.connect("dbname='stephan' user='stephan' host='localhost' password=''")
print_table(con)
update(con, einstein_quotes)
print("rows updated:")
print_table(con)
except psycopg2.DatabaseError as e:
print(f'Error {e}')
sys.exit(1)
finally:
if con:
con.close()
if __name__ == '__main__':
main()
备选语句
import sys
import psycopg2
from psycopg2.extras import execute_batch
def print_table(con):
cur = con.cursor()
cur.execute("SELECT * FROM einstein")
rows = cur.fetchall()
for row in rows:
print(f"{row[0]} {row[1]}")
def update(con, einstein_quotes, page_size):
cur = con.cursor()
cur.execute("PREPARE updateStmt AS UPDATE einstein SET msg= WHERE id=")
execute_batch(cur, "EXECUTE updateStmt (%(msg)s, %(id)s)", einstein_quotes, page_size=page_size)
cur.execute("DEALLOCATE updateStmt")
con.commit()
def main():
con = None
einstein_quotes = ({"id": "a", "msg": "Few are those who see with their own eyes and feel with their own hearts."},
{"id": "b", "msg": "I have no special talent. I am only passionately curious."},
{"id": "c", "msg": "Life is like riding a bicycle. To keep your balance you must keep moving."})
try:
con = psycopg2.connect("dbname='stephan' user='stephan' host='localhost' password=''")
print_table(con)
update(con, einstein_quotes, 100) #choose some meaningful page_size here
print("rows updated:")
print_table(con)
except psycopg2.DatabaseError as e:
print(f'Error {e}')
sys.exit(1)
finally:
if con:
con.close()
if __name__ == '__main__':
main()
输出
上述程序将向调试控制台输出以下内容:
a empty
b empty
c empty
rows updated:
a Few are those who see with their own eyes and feel with their own hearts.
b I have no special talent. I am only passionately curious.
c Life is like riding a bicycle. To keep your balance you must keep moving.