将数据从 Pandas 存储到 Snowflake 的最佳方式
Optimal way to store data from Pandas to Snowflake
数据框很大(7-8 百万行)。尝试 to_sql 和 chunksize = 5000 但它从未完成。
正在使用,
from sqlalchemy import create_engine
from snowflake.sqlalchemy import URL
df.to_sql(snowflake_table , engine, if_exists='replace', index=False, index_label=None, chunksize=20000)
将数据从 Pandas DF 存储到 SF 的其他最佳解决方案是什么?或者我在这里做错了什么? DF 的大小通常为 7-1000 万行。
对于使用 SQLAlchemy,您是否也可以在连接参数中添加绑定数据的 paramstyle=qmark
。这也在这里引用:https://github.com/snowflakedb/snowflake-connector-python/issues/37#issuecomment-365503841
此更改后,如果您觉得合适,最好对 SQLAlchemy 方法和将大型 DF 写入文件的批量加载方法进行性能比较,并使用 COPY INTO 将文件加载到 Snowflake table。
ilja-everila 指出的最佳方式是“复制到...”,因为 SF 要求在转换之前将 csv 存储在云端,我犹豫要不要这样做,但似乎这是唯一的选择650 万条记录的性能在 5-10 分钟内。
我能想到的最不痛苦的方法是将文件转储到 S3
并让 Snowpipe 自动将其加载到 Snowflake 中。通过该设置,您根本不必执行任何复制命令或进行任何 Snowflake 调用。
有关如何设置 Snowpipe for S3 的详细信息,请参阅 Snowflake 文档。简而言之,您需要创建一个阶段、一个目标 table、一种文件格式(我想您已经准备好了这些东西)和一个管道。然后为管道将监听的存储桶设置 SQS 通知。
Snowflake suggests 文件大小约为 10-100 MB,因此拆分文件可能是个好主意。
# set up credentials (s3fs is built on BOTO hence this is AWS specific)
fs = s3fs.S3FileSystem(key=key, secret=secret)
# number of files to split into
n_chunks = 2
# loop over dataframe and dump chunk by chunk to S3
# (you likely want to expand file naming logic to avoid overwriting existing files)
for f_name, chunks in enumerate(np.array_split(np.arange(df.shape[0]), n_chunks)):
bytes_to_write = df.iloc[chunks].to_csv(index=False).encode()
with fs.open('s3://mybucket/test/dummy_{}.csv'.format(f_name), 'wb') as f:
f.write(bytes_to_write)
作为参考,我尝试将 7M 行数据帧拆分为 5 个大约 40 MB 的文件。从开始拆分数据帧到所有行都到达 Snowflake 大约用了 3 分 40 秒。
pandas 在幕后执行具有多个值的 'insert into ...'。 Snowflake 最多限制摄取 16384 条记录。请更改您的 chunksize=16384.
Snowflake 提供 the write_pandas
and pd_writer
helper functions 来管理:
from snowflake.connector.pandas_tools import pd_writer
df.to_sql(snowflake_table, engine, index=False, method=pd_writer)
# ^ here
pd_writer()
函数使用write_pandas()
:
write_pandas(): Writes a Pandas DataFrame to a table in a Snowflake database
To write the data to the table, the function saves the data to Parquet files, uses the PUT command to upload these files to a temporary stage, and uses the COPY INTO command to copy the data from the files to the table.
数据框很大(7-8 百万行)。尝试 to_sql 和 chunksize = 5000 但它从未完成。
正在使用,
from sqlalchemy import create_engine
from snowflake.sqlalchemy import URL
df.to_sql(snowflake_table , engine, if_exists='replace', index=False, index_label=None, chunksize=20000)
将数据从 Pandas DF 存储到 SF 的其他最佳解决方案是什么?或者我在这里做错了什么? DF 的大小通常为 7-1000 万行。
对于使用 SQLAlchemy,您是否也可以在连接参数中添加绑定数据的 paramstyle=qmark
。这也在这里引用:https://github.com/snowflakedb/snowflake-connector-python/issues/37#issuecomment-365503841
此更改后,如果您觉得合适,最好对 SQLAlchemy 方法和将大型 DF 写入文件的批量加载方法进行性能比较,并使用 COPY INTO 将文件加载到 Snowflake table。
ilja-everila 指出的最佳方式是“复制到...”,因为 SF 要求在转换之前将 csv 存储在云端,我犹豫要不要这样做,但似乎这是唯一的选择650 万条记录的性能在 5-10 分钟内。
我能想到的最不痛苦的方法是将文件转储到 S3
并让 Snowpipe 自动将其加载到 Snowflake 中。通过该设置,您根本不必执行任何复制命令或进行任何 Snowflake 调用。
有关如何设置 Snowpipe for S3 的详细信息,请参阅 Snowflake 文档。简而言之,您需要创建一个阶段、一个目标 table、一种文件格式(我想您已经准备好了这些东西)和一个管道。然后为管道将监听的存储桶设置 SQS 通知。
Snowflake suggests 文件大小约为 10-100 MB,因此拆分文件可能是个好主意。
# set up credentials (s3fs is built on BOTO hence this is AWS specific)
fs = s3fs.S3FileSystem(key=key, secret=secret)
# number of files to split into
n_chunks = 2
# loop over dataframe and dump chunk by chunk to S3
# (you likely want to expand file naming logic to avoid overwriting existing files)
for f_name, chunks in enumerate(np.array_split(np.arange(df.shape[0]), n_chunks)):
bytes_to_write = df.iloc[chunks].to_csv(index=False).encode()
with fs.open('s3://mybucket/test/dummy_{}.csv'.format(f_name), 'wb') as f:
f.write(bytes_to_write)
作为参考,我尝试将 7M 行数据帧拆分为 5 个大约 40 MB 的文件。从开始拆分数据帧到所有行都到达 Snowflake 大约用了 3 分 40 秒。
pandas 在幕后执行具有多个值的 'insert into ...'。 Snowflake 最多限制摄取 16384 条记录。请更改您的 chunksize=16384.
Snowflake 提供 the write_pandas
and pd_writer
helper functions 来管理:
from snowflake.connector.pandas_tools import pd_writer
df.to_sql(snowflake_table, engine, index=False, method=pd_writer)
# ^ here
pd_writer()
函数使用write_pandas()
:
write_pandas(): Writes a Pandas DataFrame to a table in a Snowflake database
To write the data to the table, the function saves the data to Parquet files, uses the PUT command to upload these files to a temporary stage, and uses the COPY INTO command to copy the data from the files to the table.