使用 SNOWPIPE 时如何清除 Stage 或从 Stage 中删除文件?

How to PURGE Stage or delete files from Stage when using SNOWPIPE?

Snowflake 提供 Snowpipe 以在阶段中可用时立即将数据复制到 Table,但它缺少清除选项。
还有其他方法可以实现吗?

在 Snowpipe 的情况下没有直接的方法来实现清除,但可以通过 Snowpipe、[=73 的组合来实现=]任务

假设我们有要加载的数据文件驻留在 GCS 存储桶中

步骤 1:使用外部阶段在 Snowflake 上创建 Snowpipe
参考这个Documentation

// Create a Staging Table
CREATE TABLE SNOWPIPE_DB.PUBLIC.GCP_STAGE_TABLE (COL1 STRING);

// Create Destination Table
CREATE TABLE SNOWPIPE_DB.PUBLIC.GCP_DESTINATION_TABLE (COL1 STRING);

// Create an External Stage
CREATE STAGE SNOWPIPE_DB.PUBLIC.GCP_STAGE
  URL='gcs://bucket/files/'
  STORAGE_INTEGRATION = '<STORAGE_INTEGRATION>'; 

// Create Snowpipe
CREATE PIPE SNOWPIPE_DB.PUBLIC.GCP_Pipe
  AUTO_INGEST = true
  INTEGRATION = '<NOTIFICATION_INTEGRATION>'
  AS
  COPY INTO SNOWPIPE_DB.PUBLIC.GCP_STAGE_TABLE
  FROM @SNOWPIPE_DB.PUBLIC.GCP_STAGE;

步骤 2: 在 Table GCP_STAGE_TABLE

上创建流

流记录对 table 所做的数据操作语言 (DML) 更改,包括有关插入、更新和删除的信息。
参考这个Documentation

// Create Stream in APPEND_ONLY Mode since we are concerned with INSERTS only

CREATE OR REPLACE STREAM SNOWPIPE_DB.PUBLIC.RESPONSES_STREAM
  ON TABLE SNOWPIPE_DB.PUBLIC.GCP_STAGE_TABLE
  APPEND_ONLY = TRUE;

现在,每当在 GCS Bucket 上上传一些数据时,GCP_STAGE_TABLE 就会被 Snowpipe 填充,我们的流也是如此 RESPONSES_STREAM

RESPONSES_STREAM 看起来像这样

COL1 METADATA$ACTION METADATA$ISUPDATE METADATA$ROW_ID
MOHAMMED INSERT FALSE kjee941e66d4ca4hhh1e2b8ddba12c9c905a829
TURKY INSERT FALSE b7c5uytba6c1jhhfb6e9d85e3d3cfd7249192b0d8

由于 Stream 有 APPEND_ONLY 模式,我们只会在 METADATA$ACTION

中看到 INSERT

第 3 步: 创建程序以 PURGE 舞台并填充 GCP_DESTINATION_TABLE

// Create a Procedure
CREATE OR REPLACE Load_Data()
    RETURNS VARCHAR
    LANGUAGE JAVASCRIPT
    AS 
    $$
    var purgeStage = `REMOVE @SNOWPIPE_DB.PUBLIC.GCP_STAGE`; 
    var populateTable = `INSERT INTO SNOWPIPE_DB.PUBLIC.GCP_DESTINATION_TABLE
                         SELECT * FROM RESPONSES_STREAM`;
    try {
        snowflake.execute ( {sqlText: purgeStage} );
        snowflake.execute ( {sqlText: populateTable} );
        return "Succeeded."; 
        }
    catch (err)  {
        return "Failed: " + err; 
        }
    $$

以上过程使用 REMOVE 命令清除舞台并填充 Table GCP_DESTINATION_TABLE.
从流 RESPONSES_STREAM 中填充 Table GCP_DESTINATION_TABLE 清除流。

步骤 4: 创建一个任务来调用过程 Load_Data()
参考这个Documentation

我们创建一个间隔为 5 分钟的任务,它首先检查流 RESPONSES_STREAM 是否有任何加载到 GCP_STAGE_TABLE 的数据,如果为真,则执行过程 Load_Data()

// Task DDL
CREATE OR REPLACE TASK MASTER_TASK
    WAREHOUSE  = LOAD_WH
    SCHEDULE = '5 MINUTE'
    WHEN SYSTEM$STREAM_HAS_DATA('SNOWPIPE_DB.PUBLIC.RESPONSES_STREAM') //Checks the stream for Data
AS 
    CALL Load_Data();
当数据加载到 GCP_STAGE_TABLE 时,

SYSTEM$STREAM_HAS_DATA('RESPONSES_STREAM') 计算结果为 True,然后使任务执行过程调用。

尽管该过程不是每 5 分钟调用一次,但值得注意的是 WHEN SYSTEM$STREAM_HAS_DATA('RESPONSES_STREAM') 确实会消耗一分钟的计算资源,为减少这种情况,可以将频率从 5 分钟更改为更长的持续时间。

为了使它成为 ELT 任务,过程可以有一些转换逻辑,并且可以制作任务树。

注:

  1. REMOVE 没有正式支持外部阶段,但它对我来说仍然适用于 GCS Bucket。
  2. 让我知道它是否适用于 AWS S3 和 Azure。