STREAM 未从存储过程创建的任务中使用
STREAM not consumed from a task created by a stored procedure
TLDR:当我或我直接创建的任务在其上发出 DML 时,我有流被消耗。但是当它是由存储过程创建的任务时,流不会被消耗。
我有一个流,它的行为符合预期,当我在上面 select 时,我可以看到它有数据:
SELECT SYSTEM$STREAM_HAS_DATA('ANALYTICS_DB.schema.stream_name');
我使用创建它的相同角色来使用流:
INSERT INTO ANALYTICS_DB.schema.table
(stuff, last_checked, column_max)
SELECT 'RECORD_MODIFIED_AT', current_timestamp, max(RECORD_MODIFIED_AT)
FROM (SELECT * FROM ANALYTICS_DB.schema.stream_name);
我又SELECT了SYSTEM$STREAM_HAS_DATA,一切都好它被消耗了。
现在,我将其捆绑到一个任务中:
CREATE TASK IF NOT EXISTS ANALYTICS_DB.schema.table_test
WAREHOUSE = wh
SCHEDULE = 'USING CRON * * * * * Etc/UTC'
COMMENT = 'Checking when was the last time tables got updated'
WHEN -- conditional check if the stream has new data
SYSTEM$STREAM_HAS_DATA('ANALYTICS_DB.schema.stream_name')
AS -- same previous query
INSERT INTO ANALYTICS_DB.schema.table
(stuff, last_checked, column_max)
SELECT 'RECORD_MODIFIED_AT', current_timestamp, max(MODIFIED_AT)
FROM FROM ANALYTICS_DB.schema.stream_name;
大约一分钟后,我再次检查我的流,一切正常,它会在运行按计划进行时消耗流。
创建任务的存储过程。
我的SQL部分:
create PROCEDURE IF NOT EXISTS ANALYTICS_DB.schema.create_tasks()
returns string
language javascript
EXECUTE AS CALLER
as
和 javascript 部分(为了读者的缘故,删去了重要的部分)。它 运行 很好,创建任务,任务 运行 根据计划,发出查询但 流未被消耗 。因此,我的 max() 计算是在不断增长的 table.
上完成的
$$
// trimmed some stuff here getting the data
while (result_set.next())
{
var lagschema = result_set.getColumnValue(1);
var lagtable = result_set.getColumnValue(2);
var lagcolumn = result_set.getColumnValue(3);
var sql_task = `CREATE TASK IF NOT EXISTS schema.ppw_freshness_schema.stream_name
WAREHOUSE = wh
SCHEDULE = 'USING CRON */5 * * * * Etc/UTC'
COMMENT = 'Checking when was the last update'
WHEN
SYSTEM$STREAM_HAS_DATA('ANALYTICS_DB.schema.stream_name')
AS
INSERT INTO ANALYTICS_DB.schema.table
(stuff, last_checked, column_max)
SELECT 'RECORD_MODIFIED_AT', current_timestamp, max(RECORD_MODIFIED_AT)
FROM FROM ANALYTICS_DB.schema.stream_name;`;
var create_task = snowflake.createStatement({sqlText: sql_task});
create_task.execute();
var start_task = snowflake.createStatement({sqlText: `ALTER TASK IF EXISTS schema.ppw_freshness_schema.stream_name RESUME;`});
start_task.execute();
}
// error handling
$$;
在下面查看我通过存储过程创建的任务是如何每次 运行ning 的,因为它永远不会清空流。一旦我手动创建相同的任务,它就可以清空流,并最终在没有新数据时跳过 运行s(这是想要的行为)。
没有任何东西可以提供有关这里问题的线索,因为它存在于流本身的命名中。所以完全是我的错误。
最重要的是,我 运行 的测试使用了非常活跃的 table,因此它掩盖了流实际上按预期执行的事实。
TLDR:当我或我直接创建的任务在其上发出 DML 时,我有流被消耗。但是当它是由存储过程创建的任务时,流不会被消耗。
我有一个流,它的行为符合预期,当我在上面 select 时,我可以看到它有数据:
SELECT SYSTEM$STREAM_HAS_DATA('ANALYTICS_DB.schema.stream_name');
我使用创建它的相同角色来使用流:
INSERT INTO ANALYTICS_DB.schema.table
(stuff, last_checked, column_max)
SELECT 'RECORD_MODIFIED_AT', current_timestamp, max(RECORD_MODIFIED_AT)
FROM (SELECT * FROM ANALYTICS_DB.schema.stream_name);
我又SELECT了SYSTEM$STREAM_HAS_DATA,一切都好它被消耗了。
现在,我将其捆绑到一个任务中:
CREATE TASK IF NOT EXISTS ANALYTICS_DB.schema.table_test
WAREHOUSE = wh
SCHEDULE = 'USING CRON * * * * * Etc/UTC'
COMMENT = 'Checking when was the last time tables got updated'
WHEN -- conditional check if the stream has new data
SYSTEM$STREAM_HAS_DATA('ANALYTICS_DB.schema.stream_name')
AS -- same previous query
INSERT INTO ANALYTICS_DB.schema.table
(stuff, last_checked, column_max)
SELECT 'RECORD_MODIFIED_AT', current_timestamp, max(MODIFIED_AT)
FROM FROM ANALYTICS_DB.schema.stream_name;
大约一分钟后,我再次检查我的流,一切正常,它会在运行按计划进行时消耗流。
创建任务的存储过程。
我的SQL部分:
create PROCEDURE IF NOT EXISTS ANALYTICS_DB.schema.create_tasks()
returns string
language javascript
EXECUTE AS CALLER
as
和 javascript 部分(为了读者的缘故,删去了重要的部分)。它 运行 很好,创建任务,任务 运行 根据计划,发出查询但 流未被消耗 。因此,我的 max() 计算是在不断增长的 table.
上完成的$$
// trimmed some stuff here getting the data
while (result_set.next())
{
var lagschema = result_set.getColumnValue(1);
var lagtable = result_set.getColumnValue(2);
var lagcolumn = result_set.getColumnValue(3);
var sql_task = `CREATE TASK IF NOT EXISTS schema.ppw_freshness_schema.stream_name
WAREHOUSE = wh
SCHEDULE = 'USING CRON */5 * * * * Etc/UTC'
COMMENT = 'Checking when was the last update'
WHEN
SYSTEM$STREAM_HAS_DATA('ANALYTICS_DB.schema.stream_name')
AS
INSERT INTO ANALYTICS_DB.schema.table
(stuff, last_checked, column_max)
SELECT 'RECORD_MODIFIED_AT', current_timestamp, max(RECORD_MODIFIED_AT)
FROM FROM ANALYTICS_DB.schema.stream_name;`;
var create_task = snowflake.createStatement({sqlText: sql_task});
create_task.execute();
var start_task = snowflake.createStatement({sqlText: `ALTER TASK IF EXISTS schema.ppw_freshness_schema.stream_name RESUME;`});
start_task.execute();
}
// error handling
$$;
在下面查看我通过存储过程创建的任务是如何每次 运行ning 的,因为它永远不会清空流。一旦我手动创建相同的任务,它就可以清空流,并最终在没有新数据时跳过 运行s(这是想要的行为)。
没有任何东西可以提供有关这里问题的线索,因为它存在于流本身的命名中。所以完全是我的错误。 最重要的是,我 运行 的测试使用了非常活跃的 table,因此它掩盖了流实际上按预期执行的事实。