如何将数据写入 Redshift,这是在 Python 中创建的数据帧的结果?
How to write data to Redshift that is a result of a dataframe created in Python?
我在 Python 中有一个数据框。我可以将此数据作为新 table 写入 Redshift 吗?
我已成功创建到 Redshift 的数据库连接,并且能够执行简单的 sql 查询。
现在我需要给它写一个数据框。
就本次对话而言,Postgres = RedShift
您有两个选择:
选项 1:
来自Pandas:
http://pandas.pydata.org/pandas-docs/stable/io.html#io-sql
pandas.io.sql 模块提供了一组查询包装器,以促进数据检索并减少对特定数据库的依赖 API。如果已安装,则由 SQLAlchemy 提供数据库抽象。此外,您还需要一个用于数据库的驱动程序库。此类驱动程序的示例是用于 PostgreSQL 的 psycopg2 或用于 MySQL.
的 pymysql
写入数据帧
假设以下数据是DataFrame数据,我们可以使用to_sql()将其插入数据库。
id Date Col_1 Col_2 Col_3
26 2012-10-18 X 25.7 True
42 2012-10-19 Y -12.4 False
63 2012-10-20 Z 5.73 True
In [437]: data.to_sql('data', engine)
对于某些数据库,写入大型 DataFrame 可能会由于超出数据包大小限制而导致错误。这可以通过在调用 to_sql 时设置 chunksize 参数来避免。例如,以下将数据以每次 1000 行的批次写入数据库:
In [438]: data.to_sql('data_chunked', engine, chunksize=1000)
选项 2
或者你可以自己做
如果您有一个名为 data 的数据框,只需使用 iterrows:
对其进行循环即可
for row in data.iterrows():
然后将每一行添加到您的数据库中。我会为每一行使用复制而不是插入,因为它会快得多。
http://initd.org/psycopg/docs/usage.html#using-copy-to-and-copy-from
您可以使用 to_sql
将数据推送到 Redshift 数据库。我已经能够通过 SQLAlchemy 引擎使用与我的数据库的连接来执行此操作。请务必在 to_sql
调用中设置 index = False
。如果 table 不存在,将创建它,您可以指定是否要调用来替换 table、追加到 table,或者如果 [=23= 则失败=] 已经存在。
from sqlalchemy import create_engine
import pandas as pd
conn = create_engine('postgresql://username:password@yoururl.com:5439/yourdatabase')
df = pd.DataFrame([{'A': 'foo', 'B': 'green', 'C': 11},{'A':'bar', 'B':'blue', 'C': 20}])
df.to_sql('your_table', conn, index=False, if_exists='replace')
请注意,您可能需要 pip install psycopg2
才能通过 SQLAlchemy 连接到 Redshift。
假设您可以访问 S3,这种方法应该有效:
第 1 步:将 DataFrame 作为 csv 写入 S3(我为此使用 AWS SDK boto3)
第 2 步:您从 DataFrame 中知道 Redshift table 的列、数据类型和 key/index,因此您应该能够生成一个 create table
脚本并将其推送到 Redshift 以创建一个空 table
第 3 步:从您的 Python 环境向 Redshift 发送 copy
命令,将数据从 S3 复制到在第 2 步
中创建的空 table
每次都很有魅力。
第 4 步:在您的云存储人员开始对您大喊大叫之前,请从 S3
中删除 csv
如果您发现自己多次执行此操作,将所有四个步骤包装在一个函数中可以保持整洁。
我尝试使用 pandas df.to_sql()
,但速度非常慢。插入 50 行花了我 10 多分钟。请参阅 this 未决问题(撰写本文时)
我尝试使用来自 blaze 生态系统的 odo
(根据问题讨论中的建议),但遇到了一个 ProgrammingError
,我没有费心去调查。
终于成功了:
import psycopg2
# Fill in the blanks for the conn object
conn = psycopg2.connect(user = 'user',
password = 'password',
host = 'host',
dbname = 'db',
port = 666)
cursor = conn.cursor()
# Adjust ... according to number of columns
args_str = b','.join(cursor.mogrify("(%s,%s,...)", x) for x in tuple(map(tuple,np_data)))
cursor.execute("insert into table (a,b,...) VALUES "+args_str.decode("utf-8"))
cursor.close()
conn.commit()
conn.close()
是的,很普通 psycopg2
。这是一个 numpy 数组,但从 df
转换为 ndarray
应该不会太困难。这给了我大约 3k rows/minute.
但是,根据其他队友的建议,最快的解决方案是在将数据帧作为 TSV/CSV 转储到 S3 集群中然后复制过来之后使用 COPY 命令。如果您要复制非常庞大的数据集,则应该对此进行调查。 (如果我尝试了,我会在这里更新)
import pandas_redshift as pr
pr.connect_to_redshift(dbname = <dbname>,
host = <host>,
port = <port>,
user = <user>,
password = <password>)
pr.connect_to_s3(aws_access_key_id = <aws_access_key_id>,
aws_secret_access_key = <aws_secret_access_key>,
bucket = <bucket>,
subdirectory = <subdirectory>)
# Write the DataFrame to S3 and then to redshift
pr.pandas_to_redshift(data_frame = data_frame,
redshift_table_name = 'gawronski.nba_shots_log')
我曾经依赖pandas to_sql()
功能,但它太慢了。我最近切换到执行以下操作:
import pandas as pd
import s3fs # great module which allows you to read/write to s3 easily
import sqlalchemy
df = pd.DataFrame([{'A': 'foo', 'B': 'green', 'C': 11},{'A':'bar', 'B':'blue', 'C': 20}])
s3 = s3fs.S3FileSystem(anon=False)
filename = 'my_s3_bucket_name/file.csv'
with s3.open(filename, 'w') as f:
df.to_csv(f, index=False, header=False)
con = sqlalchemy.create_engine('postgresql://username:password@yoururl.com:5439/yourdatabase')
# make sure the schema for mytable exists
# if you need to delete the table but not the schema leave DELETE mytable
# if you want to only append, I think just removing the DELETE mytable would work
con.execute("""
DELETE mytable;
COPY mytable
from 's3://%s'
iam_role 'arn:aws:iam::xxxx:role/role_name'
csv;""" % filename)
该角色必须允许 redshift 访问 S3 有关详细信息,请参阅 here
我发现对于一个 300KB 的文件(12000x2 数据帧),这需要 4 秒,而我使用 pandas to_sql()
函数
需要 8 分钟
鉴于所有答案都无法解决我的查询,所以我用谷歌搜索并获得了以下代码片段,它在 2 分钟内完成了工作。我在 windows.
上使用 Python 3.8.5
from red_panda import RedPanda
import pandas as pd
df = pd.read_csv('path_to_read_csv_file')
redshift_conf = {
"user": "username",
"password": "password",
"host": "hostname",
"port": port number in integer,
"dbname": "dbname",
}
aws_conf = {
"aws_access_key_id": "<access_key>",
"aws_secret_access_key": "<secret_key>",
# "aws_session_token": "temporary-token-if-you-have-one",
}
rp = RedPanda(redshift_conf, aws_conf)
s3_bucket = "bucketname"
s3_path = "subfolder if any" # optional, if you don't have any sub folders
s3_file_name = "filename" # optional, randomly generated if not provided
rp.df_to_redshift(df, "table_name", bucket=s3_bucket, path=s3_path, append=False)
有关更多信息,请查看 github here
上的软件包
我在 Python 中有一个数据框。我可以将此数据作为新 table 写入 Redshift 吗? 我已成功创建到 Redshift 的数据库连接,并且能够执行简单的 sql 查询。 现在我需要给它写一个数据框。
就本次对话而言,Postgres = RedShift 您有两个选择:
选项 1:
来自Pandas: http://pandas.pydata.org/pandas-docs/stable/io.html#io-sql
pandas.io.sql 模块提供了一组查询包装器,以促进数据检索并减少对特定数据库的依赖 API。如果已安装,则由 SQLAlchemy 提供数据库抽象。此外,您还需要一个用于数据库的驱动程序库。此类驱动程序的示例是用于 PostgreSQL 的 psycopg2 或用于 MySQL.
的 pymysql写入数据帧
假设以下数据是DataFrame数据,我们可以使用to_sql()将其插入数据库。
id Date Col_1 Col_2 Col_3
26 2012-10-18 X 25.7 True
42 2012-10-19 Y -12.4 False
63 2012-10-20 Z 5.73 True
In [437]: data.to_sql('data', engine)
对于某些数据库,写入大型 DataFrame 可能会由于超出数据包大小限制而导致错误。这可以通过在调用 to_sql 时设置 chunksize 参数来避免。例如,以下将数据以每次 1000 行的批次写入数据库:
In [438]: data.to_sql('data_chunked', engine, chunksize=1000)
选项 2
或者你可以自己做 如果您有一个名为 data 的数据框,只需使用 iterrows:
对其进行循环即可for row in data.iterrows():
然后将每一行添加到您的数据库中。我会为每一行使用复制而不是插入,因为它会快得多。
http://initd.org/psycopg/docs/usage.html#using-copy-to-and-copy-from
您可以使用 to_sql
将数据推送到 Redshift 数据库。我已经能够通过 SQLAlchemy 引擎使用与我的数据库的连接来执行此操作。请务必在 to_sql
调用中设置 index = False
。如果 table 不存在,将创建它,您可以指定是否要调用来替换 table、追加到 table,或者如果 [=23= 则失败=] 已经存在。
from sqlalchemy import create_engine
import pandas as pd
conn = create_engine('postgresql://username:password@yoururl.com:5439/yourdatabase')
df = pd.DataFrame([{'A': 'foo', 'B': 'green', 'C': 11},{'A':'bar', 'B':'blue', 'C': 20}])
df.to_sql('your_table', conn, index=False, if_exists='replace')
请注意,您可能需要 pip install psycopg2
才能通过 SQLAlchemy 连接到 Redshift。
假设您可以访问 S3,这种方法应该有效:
第 1 步:将 DataFrame 作为 csv 写入 S3(我为此使用 AWS SDK boto3)
第 2 步:您从 DataFrame 中知道 Redshift table 的列、数据类型和 key/index,因此您应该能够生成一个 create table
脚本并将其推送到 Redshift 以创建一个空 table
第 3 步:从您的 Python 环境向 Redshift 发送 copy
命令,将数据从 S3 复制到在第 2 步
每次都很有魅力。
第 4 步:在您的云存储人员开始对您大喊大叫之前,请从 S3
中删除 csv 如果您发现自己多次执行此操作,将所有四个步骤包装在一个函数中可以保持整洁。
我尝试使用 pandas df.to_sql()
,但速度非常慢。插入 50 行花了我 10 多分钟。请参阅 this 未决问题(撰写本文时)
我尝试使用来自 blaze 生态系统的 odo
(根据问题讨论中的建议),但遇到了一个 ProgrammingError
,我没有费心去调查。
终于成功了:
import psycopg2
# Fill in the blanks for the conn object
conn = psycopg2.connect(user = 'user',
password = 'password',
host = 'host',
dbname = 'db',
port = 666)
cursor = conn.cursor()
# Adjust ... according to number of columns
args_str = b','.join(cursor.mogrify("(%s,%s,...)", x) for x in tuple(map(tuple,np_data)))
cursor.execute("insert into table (a,b,...) VALUES "+args_str.decode("utf-8"))
cursor.close()
conn.commit()
conn.close()
是的,很普通 psycopg2
。这是一个 numpy 数组,但从 df
转换为 ndarray
应该不会太困难。这给了我大约 3k rows/minute.
但是,根据其他队友的建议,最快的解决方案是在将数据帧作为 TSV/CSV 转储到 S3 集群中然后复制过来之后使用 COPY 命令。如果您要复制非常庞大的数据集,则应该对此进行调查。 (如果我尝试了,我会在这里更新)
import pandas_redshift as pr
pr.connect_to_redshift(dbname = <dbname>,
host = <host>,
port = <port>,
user = <user>,
password = <password>)
pr.connect_to_s3(aws_access_key_id = <aws_access_key_id>,
aws_secret_access_key = <aws_secret_access_key>,
bucket = <bucket>,
subdirectory = <subdirectory>)
# Write the DataFrame to S3 and then to redshift
pr.pandas_to_redshift(data_frame = data_frame,
redshift_table_name = 'gawronski.nba_shots_log')
我曾经依赖pandas to_sql()
功能,但它太慢了。我最近切换到执行以下操作:
import pandas as pd
import s3fs # great module which allows you to read/write to s3 easily
import sqlalchemy
df = pd.DataFrame([{'A': 'foo', 'B': 'green', 'C': 11},{'A':'bar', 'B':'blue', 'C': 20}])
s3 = s3fs.S3FileSystem(anon=False)
filename = 'my_s3_bucket_name/file.csv'
with s3.open(filename, 'w') as f:
df.to_csv(f, index=False, header=False)
con = sqlalchemy.create_engine('postgresql://username:password@yoururl.com:5439/yourdatabase')
# make sure the schema for mytable exists
# if you need to delete the table but not the schema leave DELETE mytable
# if you want to only append, I think just removing the DELETE mytable would work
con.execute("""
DELETE mytable;
COPY mytable
from 's3://%s'
iam_role 'arn:aws:iam::xxxx:role/role_name'
csv;""" % filename)
该角色必须允许 redshift 访问 S3 有关详细信息,请参阅 here
我发现对于一个 300KB 的文件(12000x2 数据帧),这需要 4 秒,而我使用 pandas to_sql()
函数
鉴于所有答案都无法解决我的查询,所以我用谷歌搜索并获得了以下代码片段,它在 2 分钟内完成了工作。我在 windows.
上使用 Python 3.8.5from red_panda import RedPanda
import pandas as pd
df = pd.read_csv('path_to_read_csv_file')
redshift_conf = {
"user": "username",
"password": "password",
"host": "hostname",
"port": port number in integer,
"dbname": "dbname",
}
aws_conf = {
"aws_access_key_id": "<access_key>",
"aws_secret_access_key": "<secret_key>",
# "aws_session_token": "temporary-token-if-you-have-one",
}
rp = RedPanda(redshift_conf, aws_conf)
s3_bucket = "bucketname"
s3_path = "subfolder if any" # optional, if you don't have any sub folders
s3_file_name = "filename" # optional, randomly generated if not provided
rp.df_to_redshift(df, "table_name", bucket=s3_bucket, path=s3_path, append=False)
有关更多信息,请查看 github here
上的软件包