pyspark+psycopg2 将结果写入数据库速度慢
pyspark+psycopg2 is slow in writing the results into the database
我有一个 Spark 作业,它处理数据的速度非常快,但是当它试图将结果写入 postgresql 数据库时,速度非常慢。以下是大部分相关代码:
import psycopg2
def save_df_to_db(records):
# each item in record is a dictionary with 'url', 'tag', 'value' as keys
db_conn = psycopg2.connect(connect_string)
db_conn.autocommit = True
cur = db_conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
upsert_query = """INSERT INTO mytable (url, tag, value)
VALUES (%(url)s, %(tag)s, %(value)s) ON CONFLICT (url, tag) DO UPDATE SET value = %(value)s"""
try:
cursor.executemany(upsert_query, records)
except Exception as e:
print "Error in executing save_df_to_db: ", e.message
data = [...] # initial data
rdd = sc.parallelize(data)
rdd = ... # Some simple RDD transforms...
rdd.foreachPartition(save_df_to_db)
table 也有关于 url+标签唯一性的约束。我正在寻找提高这段代码速度的解决方案。欢迎任何建议或推荐。
我认为主要瓶颈是 cursor.executemany
和 connection.autocommit
的组合。正如 executemany
的官方文档中所解释的那样
In its current implementation this method is not faster than executing han executing execute()
in a loop.
由于您将它与 connection.autocommit
结合使用,因此您可以在每次插入后有效地提交。
Psycopg 提供 fast execution helpers:
可用于执行批处理操作。手动处理提交也更有意义。
您也可以额外限制具有大量并发写入和索引更新的数据库服务器。通常我会建议写入磁盘并使用 COPY
执行批量导入,但不能保证在这里有帮助。
由于您使用的是不带时间戳的可变记录,因此您不能仅删除索引并在导入后重新创建它作为提高性能的另一种方式。
感谢您的回复。由于我使用的 psycopg2 版本不支持批量执行,因此我不得不依靠使用复制命令的稍微不同的方法。我写下了一个小函数,它有助于将保存时间从 20 分钟减少到大约 30 秒。这是功能。它需要一个 pandas 数据帧作为输入并将其写入 table (curso):
import StringIO
import pandas as pd
def write_dataframe_to_table(cursor, table, dataframe, batch_size=100, null='None'):
"""
Write a pandas dataframe into a postgres table.
It only works if the table columns have the same name as the dataframe columns.
:param cursor: the psycopg2 cursor object
:param table: the table name
:param dataframe: the dataframe
:param batch_size: batch size
:param null: textual representation of NULL in the file. The default is the string None.
"""
for i in range(0, len(dataframe), batch_size):
chunk_df = dataframe[i: batch_size + i]
content = "\n".join(chunk_df.apply(lambda x: "\t".join(map(str, x)), axis=1))
cursor.copy_from(StringIO.StringIO(content), table, columns=list(chunk_df.columns), null=null)
我有一个 Spark 作业,它处理数据的速度非常快,但是当它试图将结果写入 postgresql 数据库时,速度非常慢。以下是大部分相关代码:
import psycopg2
def save_df_to_db(records):
# each item in record is a dictionary with 'url', 'tag', 'value' as keys
db_conn = psycopg2.connect(connect_string)
db_conn.autocommit = True
cur = db_conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
upsert_query = """INSERT INTO mytable (url, tag, value)
VALUES (%(url)s, %(tag)s, %(value)s) ON CONFLICT (url, tag) DO UPDATE SET value = %(value)s"""
try:
cursor.executemany(upsert_query, records)
except Exception as e:
print "Error in executing save_df_to_db: ", e.message
data = [...] # initial data
rdd = sc.parallelize(data)
rdd = ... # Some simple RDD transforms...
rdd.foreachPartition(save_df_to_db)
table 也有关于 url+标签唯一性的约束。我正在寻找提高这段代码速度的解决方案。欢迎任何建议或推荐。
我认为主要瓶颈是 cursor.executemany
和 connection.autocommit
的组合。正如 executemany
In its current implementation this method is not faster than executing han executing
execute()
in a loop.
由于您将它与 connection.autocommit
结合使用,因此您可以在每次插入后有效地提交。
Psycopg 提供 fast execution helpers:
可用于执行批处理操作。手动处理提交也更有意义。
您也可以额外限制具有大量并发写入和索引更新的数据库服务器。通常我会建议写入磁盘并使用 COPY
执行批量导入,但不能保证在这里有帮助。
由于您使用的是不带时间戳的可变记录,因此您不能仅删除索引并在导入后重新创建它作为提高性能的另一种方式。
感谢您的回复。由于我使用的 psycopg2 版本不支持批量执行,因此我不得不依靠使用复制命令的稍微不同的方法。我写下了一个小函数,它有助于将保存时间从 20 分钟减少到大约 30 秒。这是功能。它需要一个 pandas 数据帧作为输入并将其写入 table (curso):
import StringIO
import pandas as pd
def write_dataframe_to_table(cursor, table, dataframe, batch_size=100, null='None'):
"""
Write a pandas dataframe into a postgres table.
It only works if the table columns have the same name as the dataframe columns.
:param cursor: the psycopg2 cursor object
:param table: the table name
:param dataframe: the dataframe
:param batch_size: batch size
:param null: textual representation of NULL in the file. The default is the string None.
"""
for i in range(0, len(dataframe), batch_size):
chunk_df = dataframe[i: batch_size + i]
content = "\n".join(chunk_df.apply(lambda x: "\t".join(map(str, x)), axis=1))
cursor.copy_from(StringIO.StringIO(content), table, columns=list(chunk_df.columns), null=null)