STREAM 未从存储过程创建的任务中使用

STREAM not consumed from a task created by a stored procedure

TLDR:当我或我直接创建的任务在其上发出 DML 时,我有流被消耗。但是当它是由存储过程创建的任务时,流不会被消耗。

我有一个流,它的行为符合预期,当我在上面 select 时,我可以看到它有数据: SELECT SYSTEM$STREAM_HAS_DATA('ANALYTICS_DB.schema.stream_name');

我使用创建它的相同角色来使用流:

INSERT INTO ANALYTICS_DB.schema.table
(stuff, last_checked, column_max)
SELECT 'RECORD_MODIFIED_AT', current_timestamp, max(RECORD_MODIFIED_AT)
FROM (SELECT * FROM ANALYTICS_DB.schema.stream_name);

我又SELECT了SYSTEM$STREAM_HAS_DATA,一切都好它被消耗了。

现在,我将其捆绑到一个任务中:

CREATE TASK IF NOT EXISTS ANALYTICS_DB.schema.table_test
            WAREHOUSE = wh
            SCHEDULE = 'USING CRON * * * * * Etc/UTC'
            COMMENT = 'Checking when was the last time tables got updated'
        WHEN -- conditional check if the stream has new data
            SYSTEM$STREAM_HAS_DATA('ANALYTICS_DB.schema.stream_name')
        AS -- same previous query
    INSERT INTO ANALYTICS_DB.schema.table
    (stuff, last_checked, column_max)
    SELECT 'RECORD_MODIFIED_AT', current_timestamp, max(MODIFIED_AT)
    FROM FROM ANALYTICS_DB.schema.stream_name;

大约一分钟后,我再次检查我的流,一切正常,它会在运行按计划进行时消耗流。

创建任务的存储过程。

我的SQL部分:

create PROCEDURE IF NOT EXISTS ANALYTICS_DB.schema.create_tasks()
    returns string
    language javascript
    EXECUTE AS CALLER
as

和 javascript 部分(为了读者的缘故,删去了重要的部分)。它 运行 很好,创建任务,任务 运行 根据计划,发出查询但 流未被消耗 。因此,我的 max() 计算是在不断增长的 table.

上完成的
$$
// trimmed some stuff here getting the data

    while (result_set.next())
    {
        var lagschema = result_set.getColumnValue(1);
        var lagtable = result_set.getColumnValue(2);
        var lagcolumn = result_set.getColumnValue(3);

        var sql_task = `CREATE TASK IF NOT EXISTS schema.ppw_freshness_schema.stream_name
            WAREHOUSE = wh
            SCHEDULE = 'USING CRON */5 * * * * Etc/UTC'
            COMMENT = 'Checking when was the last update'
        WHEN 
            SYSTEM$STREAM_HAS_DATA('ANALYTICS_DB.schema.stream_name')
        AS
                INSERT INTO ANALYTICS_DB.schema.table
    (stuff, last_checked, column_max)
    SELECT 'RECORD_MODIFIED_AT', current_timestamp, max(RECORD_MODIFIED_AT)
    FROM FROM ANALYTICS_DB.schema.stream_name;`;

        var create_task = snowflake.createStatement({sqlText: sql_task});
        create_task.execute();
        var start_task = snowflake.createStatement({sqlText: `ALTER TASK IF EXISTS schema.ppw_freshness_schema.stream_name RESUME;`});
        start_task.execute();
    }
// error handling
    $$;

在下面查看我通过存储过程创建的任务是如何每次 运行ning 的,因为它永远不会清空流。一旦我手动创建相同的任务,它就可以清空流,并最终在没有新数据时跳过 运行s(这是想要的行为)。

没有任何东西可以提供有关这里问题的线索,因为它存在于流本身的命名中。所以完全是我的错误。 最重要的是,我 运行 的测试使用了非常活跃的 table,因此它掩盖了流实际上按预期执行的事实。