如何将数据写入 Redshift,这是在 Python 中创建的数据帧的结果?

How to write data to Redshift that is a result of a dataframe created in Python?

我在 Python 中有一个数据框。我可以将此数据作为新 table 写入 Redshift 吗? 我已成功创建到 Redshift 的数据库连接,并且能够执行简单的 sql 查询。 现在我需要给它写一个数据框。

就本次对话而言,Postgres = RedShift 您有两个选择:

选项 1:

来自Pandas: http://pandas.pydata.org/pandas-docs/stable/io.html#io-sql

pandas.io.sql 模块提供了一组查询包装器,以促进数据检索并减少对特定数据库的依赖 API。如果已安装,则由 SQLAlchemy 提供数据库抽象。此外,您还需要一个用于数据库的驱动程序库。此类驱动程序的示例是用于 PostgreSQL 的 psycopg2 或用于 MySQL.

的 pymysql

写入数据帧

假设以下数据是DataFrame数据,我们可以使用to_sql()将其插入数据库。

id  Date    Col_1   Col_2   Col_3
26  2012-10-18  X   25.7    True
42  2012-10-19  Y   -12.4   False
63  2012-10-20  Z   5.73    True

In [437]: data.to_sql('data', engine)

对于某些数据库,写入大型 DataFrame 可能会由于超出数据包大小限制而导致错误。这可以通过在调用 to_sql 时设置 chunksize 参数来避免。例如,以下将数据以每次 1000 行的批次写入数据库:

In [438]: data.to_sql('data_chunked', engine, chunksize=1000)

选项 2

或者你可以自己做 如果您有一个名为 data 的数据框,只需使用 iterrows:

对其进行循环即可
for row in data.iterrows():

然后将每一行添加到您的数据库中。我会为每一行使用复制而不是插入,因为它会快得多。

http://initd.org/psycopg/docs/usage.html#using-copy-to-and-copy-from

您可以使用 to_sql 将数据推送到 Redshift 数据库。我已经能够通过 SQLAlchemy 引擎使用与我的数据库的连接来执行此操作。请务必在 to_sql 调用中设置 index = False。如果 table 不存在,将创建它,您可以指定是否要调用来替换 table、追加到 table,或者如果 [=23= 则失败=] 已经存在。

from sqlalchemy import create_engine
import pandas as pd

conn = create_engine('postgresql://username:password@yoururl.com:5439/yourdatabase')

df = pd.DataFrame([{'A': 'foo', 'B': 'green', 'C': 11},{'A':'bar', 'B':'blue', 'C': 20}])

df.to_sql('your_table', conn, index=False, if_exists='replace')

请注意,您可能需要 pip install psycopg2 才能通过 SQLAlchemy 连接到 Redshift。

to_sql Documentation

假设您可以访问 S3,这种方法应该有效:

第 1 步:将 DataFrame 作为 csv 写入 S3(我为此使用 AWS SDK boto3)
第 2 步:您从 DataFrame 中知道 Redshift table 的列、数据类型和 key/index,因此您应该能够生成一个 create table 脚本并将其推送到 Redshift 以创建一个空 table
第 3 步:从您的 Python 环境向 Redshift 发送 copy 命令,将数据从 S3 复制到在第 2 步

中创建的空 table

每次都很有魅力。

第 4 步:在您的云存储人员开始对您大喊大叫之前,请从 S3

中删除 csv 如果您发现自己多次执行此操作,将所有四个步骤包装在一个函数中可以保持整洁。

我尝试使用 pandas df.to_sql(),但速度非常慢。插入 50 行花了我 10 多分钟。请参阅 this 未决问题(撰写本文时)

我尝试使用来自 blaze 生态系统的 odo(根据问题讨论中的建议),但遇到了一个 ProgrammingError,我没有费心去调查。

终于成功了:

import psycopg2

# Fill in the blanks for the conn object
conn = psycopg2.connect(user = 'user',
                              password = 'password',
                              host = 'host',
                              dbname = 'db',
                              port = 666)
cursor = conn.cursor()

# Adjust ... according to number of columns
args_str = b','.join(cursor.mogrify("(%s,%s,...)", x) for x in tuple(map(tuple,np_data)))
cursor.execute("insert into table (a,b,...) VALUES "+args_str.decode("utf-8"))

cursor.close()
conn.commit()
conn.close()

是的,很普通 psycopg2。这是一个 numpy 数组,但从 df 转换为 ndarray 应该不会太困难。这给了我大约 3k rows/minute.

但是,根据其他队友的建议,最快的解决方案是在将数据帧作为 TSV/CSV 转储到 S3 集群中然后复制过来之后使用 COPY 命令。如果您要复制非常庞大的数据集,则应该对此进行调查。 (如果我尝试了,我会在这里更新)

import pandas_redshift as pr

pr.connect_to_redshift(dbname = <dbname>,
                        host = <host>,
                        port = <port>,
                        user = <user>,
                        password = <password>)

pr.connect_to_s3(aws_access_key_id = <aws_access_key_id>,
                aws_secret_access_key = <aws_secret_access_key>,
                bucket = <bucket>,
                subdirectory = <subdirectory>)

# Write the DataFrame to S3 and then to redshift
pr.pandas_to_redshift(data_frame = data_frame,
                        redshift_table_name = 'gawronski.nba_shots_log')

详情:https://github.com/agawronski/pandas_redshift

我曾经依赖pandas to_sql() 功能,但它太慢了。我最近切换到执行以下操作:

import pandas as pd
import s3fs # great module which allows you to read/write to s3 easily
import sqlalchemy

df = pd.DataFrame([{'A': 'foo', 'B': 'green', 'C': 11},{'A':'bar', 'B':'blue', 'C': 20}])

s3 = s3fs.S3FileSystem(anon=False)
filename = 'my_s3_bucket_name/file.csv'
with s3.open(filename, 'w') as f:
    df.to_csv(f, index=False, header=False)

con = sqlalchemy.create_engine('postgresql://username:password@yoururl.com:5439/yourdatabase')
# make sure the schema for mytable exists

# if you need to delete the table but not the schema leave DELETE mytable
# if you want to only append, I think just removing the DELETE mytable would work

con.execute("""
    DELETE mytable;
    COPY mytable
    from 's3://%s'
    iam_role 'arn:aws:iam::xxxx:role/role_name'
    csv;""" % filename)

该角色必须允许 redshift 访问 S3 有关详细信息,请参阅 here

我发现对于一个 300KB 的文件(12000x2 数据帧),这需要 4 秒,而我使用 pandas to_sql() 函数

需要 8 分钟

鉴于所有答案都无法解决我的查询,所以我用谷歌搜索并获得了以下代码片段,它在 2 分钟内完成了工作。我在 windows.

上使用 Python 3.8.5
from red_panda import RedPanda
import pandas as pd
df = pd.read_csv('path_to_read_csv_file')
redshift_conf = {
    "user": "username",
    "password": "password",
    "host": "hostname",
    "port": port number in integer,
    "dbname": "dbname",
}

aws_conf = {
    "aws_access_key_id": "<access_key>",
    "aws_secret_access_key": "<secret_key>",
    # "aws_session_token": "temporary-token-if-you-have-one",
}

rp = RedPanda(redshift_conf, aws_conf)
s3_bucket = "bucketname"
s3_path = "subfolder if any" # optional, if you don't have any sub folders
s3_file_name = "filename" # optional, randomly generated if not provided
rp.df_to_redshift(df, "table_name", bucket=s3_bucket, path=s3_path, append=False)

有关更多信息,请查看 github here

上的软件包