Snowflake to/from 针对 ETL 架构的 S3 管道建议

Snowflake to/from S3 Pipeline Recommendations for ETL architecture

我正在尝试构建一个管道,该管道将数据从 Snowflake 发送到 S3,然后从 S3 返回到 Snowflake(在 运行 它通过 Sagemaker 上的生产 ML 模型之后)。我是数据工程的新手,所以我很想听听社区推荐的路径是什么。管道要求如下:

  1. 我想安排一份月度工作。我是在 AWS 中指定还是在 Snowflake 方面指定?
  2. 对于初始拉取,我想从 Snowflake 查询 12 个月的数据。但是,对于任何后续拉动,我只需要最后一个月,因为这应该是每月管道。
  3. 所有每月提取的数据都应像这样存储在自己的 S3 子文件夹中 query_01012020,query_01022020,query_01032020
  4. ML 模型在 Sagemaker 中成功对数据进行评分后,应触发从 S3 返回指定雪花 table 的数据。
  5. 我想监控 ML 模型在生产加班中的性能,以了解模型是否正在降低其准确性(也许是一些类似校准的图表)。
  6. 我想在管道中出现问题时实时收到任何错误通知。

我希望你能在相关 documentation/tutorials 方面为我提供指导。非常感谢您的指导。

非常感谢。

Snowflake 没有像 Airflow 或 Oozie 这样的编排工具。因此,您需要使用或考虑使用一些 Snowflake 合作伙伴生态系统工具,如 Mattilion 等。或者,您可以使用 Spark 或 python 或任何其他可以使用 JDBC/ODBC/Python 连接器。

要将数据从 s3 实时提供给雪花,您可以使用 AWS SNS 服务并调用 SnowPipe 将数据提供给 Snowflake Stage 环境,并通过 ETL 过程将其转发以供使用。

回答您的每一个问题

I am looking to schedule a monthly job. Do I specify such in AWS or on the Snowflake side?

在 snowflake 中是不可能的,你必须通过 AWS 或其他工具来完成。

For the initial pull, I want to query 12 months' worth of data from Snowflake. However, for any subsequent pull, I only need the last month since this should be a monthly pipeline.

Ans:您可以提取任何大小的数据,您也可以通过 SF 使用一些脚本来支持它,但需要对调用进行编程。

All monthly data pulls should be stored in own S3 subfolder like this query_01012020,query_01022020,query_01032020 etc.

回答:可以通过 AWS SNS(或 REST API)+ SnowPipe 向 Snowflake 提供数据,但反之亦然。

ML 模型在 Sagemaker 中成功对数据进行评分后,应触发从 S3 返回指定雪花 table 的数据。

回答:这可以通过 AWS SNS + SnowPipe 实现。

I want to monitor the performance of the ML model in production overtime to catch if the model is decreasing its accuracy (some calibration-like graph perhaps).

回答:无法通过 Snowflake。

我会这样处理问题:

临时 table 保存 12 个月的数据(我相信您知道所有必需的查询,因为您要求教程,我认为它可能对您和其他人都有帮助)

-- Initial Pull Hold 12 months of Data .... 
Drop table if exists <TABLE_NAME>; 
Create Temporary Table <TABLE_NAME> as (
Select * 
From Original Table 
Where date_field between current_date -365 and Current_date 
); 

-- Export data to S3 ... 
copy into 's3://path/to/export/directory'
from DB_NAME.SCHEMA_NAME.TABLE_NAME
file_format = (type = csv field_delimiter = '|' skip_header = 0)
credentials=(aws_key_id='your_aws_key_id' aws_secret_key='your_aws_secret_key');

完成 ML 工作后,像这样将数据导入回雪花:

-- Import to S3 ... 
copy into DB_NAME.SCHEMA_NAME.TABLE_NAME
from 's3://path/to/your/csv_file_name.csv'
credentials=(aws_key_id='your_aws_key_id' aws_secret_key='your_aws_secret_key')
file_format = (type = csv field_delimiter = '|' skip_header = 1); 

我不确定 snowflake 是否已经发布了 ML 的东西,以及你将如何在你这边做 ML 等等。

对于日程安排,我建议:

  1. 将您的代码放在 shell 脚本或 python 脚本中,并每月安排一次 运行。

  2. 使用雪花任务如下:

创建任务monthly_task_1 仓库 = 时间表 = 'USING CRON 0 0 1 * * America/Chicago' 作为 在此处插入您创建的临时 table 查询

创建任务monthly_task_2 仓库 = 在 monthly_task_1 之后 作为 在此处插入您的 S3 导出查询

您可以在此处阅读有关雪花任务的更多信息:https://docs.snowflake.com/en/sql-reference/sql/create-task.html

为了在 ML 完成后将结果从 S3 导入回 Snowflake,您可以在 ML 代码中添加几行(大概在 Python 中)以执行复制到代码中的 --Import to S3 这写在上面。