使用 SNOWPIPE 时如何清除 Stage 或从 Stage 中删除文件?
How to PURGE Stage or delete files from Stage when using SNOWPIPE?
Snowflake 提供 Snowpipe 以在阶段中可用时立即将数据复制到 Table,但它缺少清除选项。
还有其他方法可以实现吗?
在 Snowpipe 的情况下没有直接的方法来实现清除,但可以通过 Snowpipe、[=73 的组合来实现=]流和任务
假设我们有要加载的数据文件驻留在 GCS 存储桶中
步骤 1:使用外部阶段在 Snowflake 上创建 Snowpipe
参考这个Documentation
// Create a Staging Table
CREATE TABLE SNOWPIPE_DB.PUBLIC.GCP_STAGE_TABLE (COL1 STRING);
// Create Destination Table
CREATE TABLE SNOWPIPE_DB.PUBLIC.GCP_DESTINATION_TABLE (COL1 STRING);
// Create an External Stage
CREATE STAGE SNOWPIPE_DB.PUBLIC.GCP_STAGE
URL='gcs://bucket/files/'
STORAGE_INTEGRATION = '<STORAGE_INTEGRATION>';
// Create Snowpipe
CREATE PIPE SNOWPIPE_DB.PUBLIC.GCP_Pipe
AUTO_INGEST = true
INTEGRATION = '<NOTIFICATION_INTEGRATION>'
AS
COPY INTO SNOWPIPE_DB.PUBLIC.GCP_STAGE_TABLE
FROM @SNOWPIPE_DB.PUBLIC.GCP_STAGE;
步骤 2: 在 Table GCP_STAGE_TABLE
上创建流
流记录对 table 所做的数据操作语言 (DML) 更改,包括有关插入、更新和删除的信息。
参考这个Documentation
// Create Stream in APPEND_ONLY Mode since we are concerned with INSERTS only
CREATE OR REPLACE STREAM SNOWPIPE_DB.PUBLIC.RESPONSES_STREAM
ON TABLE SNOWPIPE_DB.PUBLIC.GCP_STAGE_TABLE
APPEND_ONLY = TRUE;
现在,每当在 GCS Bucket 上上传一些数据时,GCP_STAGE_TABLE
就会被 Snowpipe 填充,我们的流也是如此 RESPONSES_STREAM
RESPONSES_STREAM
看起来像这样
COL1
METADATA$ACTION
METADATA$ISUPDATE
METADATA$ROW_ID
MOHAMMED
INSERT
FALSE
kjee941e66d4ca4hhh1e2b8ddba12c9c905a829
TURKY
INSERT
FALSE
b7c5uytba6c1jhhfb6e9d85e3d3cfd7249192b0d8
由于 Stream 有 APPEND_ONLY
模式,我们只会在 METADATA$ACTION
中看到 INSERT
第 3 步: 创建程序以 PURGE
舞台并填充 GCP_DESTINATION_TABLE
// Create a Procedure
CREATE OR REPLACE Load_Data()
RETURNS VARCHAR
LANGUAGE JAVASCRIPT
AS
$$
var purgeStage = `REMOVE @SNOWPIPE_DB.PUBLIC.GCP_STAGE`;
var populateTable = `INSERT INTO SNOWPIPE_DB.PUBLIC.GCP_DESTINATION_TABLE
SELECT * FROM RESPONSES_STREAM`;
try {
snowflake.execute ( {sqlText: purgeStage} );
snowflake.execute ( {sqlText: populateTable} );
return "Succeeded.";
}
catch (err) {
return "Failed: " + err;
}
$$
以上过程使用 REMOVE
命令清除舞台并填充 Table GCP_DESTINATION_TABLE
.
从流 RESPONSES_STREAM
中填充 Table GCP_DESTINATION_TABLE
清除流。
步骤 4: 创建一个任务来调用过程 Load_Data()
参考这个Documentation
我们创建一个间隔为 5 分钟的任务,它首先检查流 RESPONSES_STREAM
是否有任何加载到 GCP_STAGE_TABLE
的数据,如果为真,则执行过程 Load_Data()
// Task DDL
CREATE OR REPLACE TASK MASTER_TASK
WAREHOUSE = LOAD_WH
SCHEDULE = '5 MINUTE'
WHEN SYSTEM$STREAM_HAS_DATA('SNOWPIPE_DB.PUBLIC.RESPONSES_STREAM') //Checks the stream for Data
AS
CALL Load_Data();
当数据加载到 GCP_STAGE_TABLE
时,SYSTEM$STREAM_HAS_DATA('RESPONSES_STREAM')
计算结果为 True,然后使任务执行过程调用。
尽管该过程不是每 5 分钟调用一次,但值得注意的是 WHEN SYSTEM$STREAM_HAS_DATA('RESPONSES_STREAM')
确实会消耗一分钟的计算资源,为减少这种情况,可以将频率从 5 分钟更改为更长的持续时间。
为了使它成为 ELT 任务,过程可以有一些转换逻辑,并且可以制作任务树。
注:
REMOVE
没有正式支持外部阶段,但它对我来说仍然适用于 GCS Bucket。
- 让我知道它是否适用于 AWS S3 和 Azure。
Snowflake 提供 Snowpipe 以在阶段中可用时立即将数据复制到 Table,但它缺少清除选项。
还有其他方法可以实现吗?
在 Snowpipe 的情况下没有直接的方法来实现清除,但可以通过 Snowpipe、[=73 的组合来实现=]流和任务
假设我们有要加载的数据文件驻留在 GCS 存储桶中
步骤 1:使用外部阶段在 Snowflake 上创建 Snowpipe
参考这个Documentation
// Create a Staging Table
CREATE TABLE SNOWPIPE_DB.PUBLIC.GCP_STAGE_TABLE (COL1 STRING);
// Create Destination Table
CREATE TABLE SNOWPIPE_DB.PUBLIC.GCP_DESTINATION_TABLE (COL1 STRING);
// Create an External Stage
CREATE STAGE SNOWPIPE_DB.PUBLIC.GCP_STAGE
URL='gcs://bucket/files/'
STORAGE_INTEGRATION = '<STORAGE_INTEGRATION>';
// Create Snowpipe
CREATE PIPE SNOWPIPE_DB.PUBLIC.GCP_Pipe
AUTO_INGEST = true
INTEGRATION = '<NOTIFICATION_INTEGRATION>'
AS
COPY INTO SNOWPIPE_DB.PUBLIC.GCP_STAGE_TABLE
FROM @SNOWPIPE_DB.PUBLIC.GCP_STAGE;
步骤 2: 在 Table GCP_STAGE_TABLE
流记录对 table 所做的数据操作语言 (DML) 更改,包括有关插入、更新和删除的信息。
参考这个Documentation
// Create Stream in APPEND_ONLY Mode since we are concerned with INSERTS only
CREATE OR REPLACE STREAM SNOWPIPE_DB.PUBLIC.RESPONSES_STREAM
ON TABLE SNOWPIPE_DB.PUBLIC.GCP_STAGE_TABLE
APPEND_ONLY = TRUE;
现在,每当在 GCS Bucket 上上传一些数据时,GCP_STAGE_TABLE
就会被 Snowpipe 填充,我们的流也是如此 RESPONSES_STREAM
RESPONSES_STREAM
看起来像这样
COL1 | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID |
---|---|---|---|
MOHAMMED | INSERT | FALSE | kjee941e66d4ca4hhh1e2b8ddba12c9c905a829 |
TURKY | INSERT | FALSE | b7c5uytba6c1jhhfb6e9d85e3d3cfd7249192b0d8 |
由于 Stream 有 APPEND_ONLY
模式,我们只会在 METADATA$ACTION
INSERT
第 3 步: 创建程序以 PURGE
舞台并填充 GCP_DESTINATION_TABLE
// Create a Procedure
CREATE OR REPLACE Load_Data()
RETURNS VARCHAR
LANGUAGE JAVASCRIPT
AS
$$
var purgeStage = `REMOVE @SNOWPIPE_DB.PUBLIC.GCP_STAGE`;
var populateTable = `INSERT INTO SNOWPIPE_DB.PUBLIC.GCP_DESTINATION_TABLE
SELECT * FROM RESPONSES_STREAM`;
try {
snowflake.execute ( {sqlText: purgeStage} );
snowflake.execute ( {sqlText: populateTable} );
return "Succeeded.";
}
catch (err) {
return "Failed: " + err;
}
$$
以上过程使用 REMOVE
命令清除舞台并填充 Table GCP_DESTINATION_TABLE
.
从流 RESPONSES_STREAM
中填充 Table GCP_DESTINATION_TABLE
清除流。
步骤 4: 创建一个任务来调用过程 Load_Data()
参考这个Documentation
我们创建一个间隔为 5 分钟的任务,它首先检查流 RESPONSES_STREAM
是否有任何加载到 GCP_STAGE_TABLE
的数据,如果为真,则执行过程 Load_Data()
// Task DDL
CREATE OR REPLACE TASK MASTER_TASK
WAREHOUSE = LOAD_WH
SCHEDULE = '5 MINUTE'
WHEN SYSTEM$STREAM_HAS_DATA('SNOWPIPE_DB.PUBLIC.RESPONSES_STREAM') //Checks the stream for Data
AS
CALL Load_Data();
当数据加载到 GCP_STAGE_TABLE
时,SYSTEM$STREAM_HAS_DATA('RESPONSES_STREAM')
计算结果为 True,然后使任务执行过程调用。
尽管该过程不是每 5 分钟调用一次,但值得注意的是 WHEN SYSTEM$STREAM_HAS_DATA('RESPONSES_STREAM')
确实会消耗一分钟的计算资源,为减少这种情况,可以将频率从 5 分钟更改为更长的持续时间。
为了使它成为 ELT 任务,过程可以有一些转换逻辑,并且可以制作任务树。
注:
REMOVE
没有正式支持外部阶段,但它对我来说仍然适用于 GCS Bucket。- 让我知道它是否适用于 AWS S3 和 Azure。