雪花任务 - 可视化层次结构
Snowflake Tasks - Visualizing hierarchy
Snowflake 允许使用 AFTER
语法创建任务链。
AFTER string
Specifies the predecessor task for the current task. When a run of the predecessor task finishes successfully, it triggers this task (after a brief lag).
This parameter enables defining a simple tree of tasks; i.e. a set of tasks organized by their dependencies. In this context, a tree is a series of tasks that start with a scheduled root task and are linked together by their dependencies.
假设我们有以下内容:
CREATE DATABASE TEST;
CREATE WAREHOUSE Developer WITH WAREHOUSE_SIZE = 'XSMALL'
WAREHOUSE_TYPE = 'STANDARD';
CREATE SCHEMA TEST;
CREATE OR REPLACE TASK task1 WAREHOUSE = Developer SCHEDULE = '10 minute'
AS SELECT system$wait(20);
CREATE OR REPLACE TASK task2 WAREHOUSE = Developer AFTER task1
AS SELECT system$wait(30);
CREATE OR REPLACE TASK task3 WAREHOUSE = Developer AFTER task2
AS SELECT system$wait(60);
CREATE OR REPLACE TASK task4 WAREHOUSE = Developer AFTER task1
AS SELECT system$wait(20);
CREATE OR REPLACE TASK task5 WAREHOUSE = Developer AFTER task1
AS SELECT system$wait(30);
CREATE OR REPLACE TASK task6 WAREHOUSE = Developer AFTER task3
AS SELECT system$wait(40);
CREATE OR REPLACE TASK task7 WAREHOUSE = Developer AFTER task5
AS SELECT system$wait(50);
CREATE OR REPLACE TASK task8 WAREHOUSE = Developer AFTER task5
AS SELECT system$wait(30);
目标是获取任务的图形表示,以便快速概览或记录。
Snowflake 支持:TASK_DEPENDENTS table 功能:
This table function returns the list of child tasks for a given root (i.e. parent) task in a simple tree of tasks.
SELECT CONCAT_WS('.', DATABASE_NAME, SCHEMA_NAME, NAME) AS TASK_NAME, PREDECESSOR
FROM TABLE(INFORMATION_SCHEMA.TASK_DEPENDENTS(TASK_NAME => 'task1',
RECURSIVE => TRUE ));
/*
TASK_NAME PREDECESSOR
TEST.TEST.TASK1
TEST.TEST.TASK2 TEST.TEST.TASK1
TEST.TEST.TASK4 TEST.TEST.TASK1
TEST.TEST.TASK5 TEST.TEST.TASK1
TEST.TEST.TASK3 TEST.TEST.TASK2
TEST.TEST.TASK7 TEST.TEST.TASK5
TEST.TEST.TASK8 TEST.TEST.TASK5
TEST.TEST.TASK6 TEST.TEST.TASK3
*/
使用“图表即代码”和Mermaid的思想,我们可以生成以下流程图:
WITH RECURSIVE cte AS (
SELECT CONCAT_WS('.', DATABASE_NAME, SCHEMA_NAME, NAME) AS TASK_NAME, *
FROM TABLE(INFORMATION_SCHEMA.TASK_DEPENDENTS(
TASK_NAME => 'task1', RECURSIVE => TRUE ))
-- here goes task name
), rec AS (
SELECT
0 AS lvl, cte.TASK_NAME, cte.PREDECESSOR,
REPLACE(REPLACE(REPLACE(
'ROOT{.} -- "SCHEDULE: <schedule>;CONDITION: <condition>" --> <root>'
,'<schedule>', COALESCE(cte.SCHEDULE, '<none>'))
,'<condition>', COALESCE(cte.CONDITION,'<none>'))
,'<root>', cte.TASK_NAME) AS GRAPH_ENTRY
FROM cte
WHERE PREDECESSOR IS NULL
UNION ALL
SELECT rec.lvl + 1 AS lvl, cte.TASK_NAME, cte.PREDECESSOR,
REPLACE(REPLACE('<T1> --> <T2>'
,'<T1>', cte.PREDECESSOR)
,'<T2>', cte.TASK_NAME) AS GRAPH_ENTRY
FROM rec
JOIN cte ON rec.TASK_NAME = cte.PREDECESSOR
)
SELECT 'graph TD' || CHAR(13) ||
LISTAGG(CHAR(9) || GRAPH_ENTRY || CHAR(13), '')
WITHIN GROUP(ORDER BY lvl) AS flow_chart
FROM rec;
我们将得到以下输出:
graph TD
ROOT{.} -- "SCHEDULE: 10 minute;CONDITION: <none>" --> TEST.TEST.TASK1
TEST.TEST.TASK1 --> TEST.TEST.TASK2
TEST.TEST.TASK1 --> TEST.TEST.TASK4
TEST.TEST.TASK1 --> TEST.TEST.TASK5
TEST.TEST.TASK2 --> TEST.TEST.TASK3
TEST.TEST.TASK5 --> TEST.TEST.TASK7
TEST.TEST.TASK5 --> TEST.TEST.TASK8
TEST.TEST.TASK3 --> TEST.TEST.TASK6
可以使用Mermaid-live-editor可视化:
额外功能:它还可以用于使用甘特图可视化执行历史:
启用所有任务:
ALTER TASK TEST.TEST.TASK8 RESUME;
ALTER TASK TEST.TEST.TASK7 RESUME;
ALTER TASK TEST.TEST.TASK6 RESUME;
ALTER TASK TEST.TEST.TASK5 RESUME;
ALTER TASK TEST.TEST.TASK4 RESUME;
ALTER TASK TEST.TEST.TASK3 RESUME;
ALTER TASK TEST.TEST.TASK2 RESUME;
ALTER TASK TEST.TEST.TASK1 RESUME;
SHOW TASKS;
正在生成甘特图:
SELECT
CONCAT_WS('.', DATABASE_NAME, SCHEMA_NAME, NAME) AS TASK_NAME,
QUERY_START_TIME,
COMPLETED_TIME,
DATEDIFF(SECOND, QUERY_START_TIME, COMPLETED_TIME) AS DURATION_SEC,
TASK_NAME || ':' || TO_VARCHAR(QUERY_START_TIME, 'YYYY-MM-DD HH:MI:SS')
|| ',' || DURATION_SEC || 's' AS GRAPH_ENTRY,
s.gantt || LISTAGG(CHAR(9) || GRAPH_ENTRY || CHAR(13), '')
WITHIN GROUP(ORDER BY QUERY_START_TIME) OVER() AS graph
FROM TABLE(information_schema.task_history(
scheduled_time_range_start=>'2021-05-16 07:00:00.000'::TIMESTAMP_LTZ))
,LATERAL(SELECT REPLACE(
'gantt
title Task execution
dateFormat YYYY-MM-DD HH:mm:ss
axisFormat %Y-%m-%d %H:%M
section RunId=<run_id>
'
,'<run_id>'
,RUN_ID)
) s(gantt)
WHERE STATE = 'SUCCEEDED'
--AND RUN_ID = x
ORDER BY scheduled_time;
输出:
gantt
title Task execution
dateFormat YYYY-MM-DD HH:mm:ss
axisFormat %Y-%m-%d %H:%M
section RunId=xxxxxx
TEST.TEST.TASK1:2021-05-16 07:13:45,20s
TEST.TEST.TASK5:2021-05-16 07:14:06,31s
TEST.TEST.TASK4:2021-05-16 07:14:09,21s
TEST.TEST.TASK2:2021-05-16 07:14:15,30s
TEST.TEST.TASK8:2021-05-16 07:14:51,34s
TEST.TEST.TASK7:2021-05-16 07:14:51,50s
TEST.TEST.TASK3:2021-05-16 07:15:01,60s
TEST.TEST.TASK6:2021-05-16 07:16:15,40s
Snowflake 允许使用 AFTER
语法创建任务链。
AFTER string
Specifies the predecessor task for the current task. When a run of the predecessor task finishes successfully, it triggers this task (after a brief lag).
This parameter enables defining a simple tree of tasks; i.e. a set of tasks organized by their dependencies. In this context, a tree is a series of tasks that start with a scheduled root task and are linked together by their dependencies.
假设我们有以下内容:
CREATE DATABASE TEST;
CREATE WAREHOUSE Developer WITH WAREHOUSE_SIZE = 'XSMALL'
WAREHOUSE_TYPE = 'STANDARD';
CREATE SCHEMA TEST;
CREATE OR REPLACE TASK task1 WAREHOUSE = Developer SCHEDULE = '10 minute'
AS SELECT system$wait(20);
CREATE OR REPLACE TASK task2 WAREHOUSE = Developer AFTER task1
AS SELECT system$wait(30);
CREATE OR REPLACE TASK task3 WAREHOUSE = Developer AFTER task2
AS SELECT system$wait(60);
CREATE OR REPLACE TASK task4 WAREHOUSE = Developer AFTER task1
AS SELECT system$wait(20);
CREATE OR REPLACE TASK task5 WAREHOUSE = Developer AFTER task1
AS SELECT system$wait(30);
CREATE OR REPLACE TASK task6 WAREHOUSE = Developer AFTER task3
AS SELECT system$wait(40);
CREATE OR REPLACE TASK task7 WAREHOUSE = Developer AFTER task5
AS SELECT system$wait(50);
CREATE OR REPLACE TASK task8 WAREHOUSE = Developer AFTER task5
AS SELECT system$wait(30);
目标是获取任务的图形表示,以便快速概览或记录。
Snowflake 支持:TASK_DEPENDENTS table 功能:
This table function returns the list of child tasks for a given root (i.e. parent) task in a simple tree of tasks.
SELECT CONCAT_WS('.', DATABASE_NAME, SCHEMA_NAME, NAME) AS TASK_NAME, PREDECESSOR
FROM TABLE(INFORMATION_SCHEMA.TASK_DEPENDENTS(TASK_NAME => 'task1',
RECURSIVE => TRUE ));
/*
TASK_NAME PREDECESSOR
TEST.TEST.TASK1
TEST.TEST.TASK2 TEST.TEST.TASK1
TEST.TEST.TASK4 TEST.TEST.TASK1
TEST.TEST.TASK5 TEST.TEST.TASK1
TEST.TEST.TASK3 TEST.TEST.TASK2
TEST.TEST.TASK7 TEST.TEST.TASK5
TEST.TEST.TASK8 TEST.TEST.TASK5
TEST.TEST.TASK6 TEST.TEST.TASK3
*/
使用“图表即代码”和Mermaid的思想,我们可以生成以下流程图:
WITH RECURSIVE cte AS (
SELECT CONCAT_WS('.', DATABASE_NAME, SCHEMA_NAME, NAME) AS TASK_NAME, *
FROM TABLE(INFORMATION_SCHEMA.TASK_DEPENDENTS(
TASK_NAME => 'task1', RECURSIVE => TRUE ))
-- here goes task name
), rec AS (
SELECT
0 AS lvl, cte.TASK_NAME, cte.PREDECESSOR,
REPLACE(REPLACE(REPLACE(
'ROOT{.} -- "SCHEDULE: <schedule>;CONDITION: <condition>" --> <root>'
,'<schedule>', COALESCE(cte.SCHEDULE, '<none>'))
,'<condition>', COALESCE(cte.CONDITION,'<none>'))
,'<root>', cte.TASK_NAME) AS GRAPH_ENTRY
FROM cte
WHERE PREDECESSOR IS NULL
UNION ALL
SELECT rec.lvl + 1 AS lvl, cte.TASK_NAME, cte.PREDECESSOR,
REPLACE(REPLACE('<T1> --> <T2>'
,'<T1>', cte.PREDECESSOR)
,'<T2>', cte.TASK_NAME) AS GRAPH_ENTRY
FROM rec
JOIN cte ON rec.TASK_NAME = cte.PREDECESSOR
)
SELECT 'graph TD' || CHAR(13) ||
LISTAGG(CHAR(9) || GRAPH_ENTRY || CHAR(13), '')
WITHIN GROUP(ORDER BY lvl) AS flow_chart
FROM rec;
我们将得到以下输出:
graph TD
ROOT{.} -- "SCHEDULE: 10 minute;CONDITION: <none>" --> TEST.TEST.TASK1
TEST.TEST.TASK1 --> TEST.TEST.TASK2
TEST.TEST.TASK1 --> TEST.TEST.TASK4
TEST.TEST.TASK1 --> TEST.TEST.TASK5
TEST.TEST.TASK2 --> TEST.TEST.TASK3
TEST.TEST.TASK5 --> TEST.TEST.TASK7
TEST.TEST.TASK5 --> TEST.TEST.TASK8
TEST.TEST.TASK3 --> TEST.TEST.TASK6
可以使用Mermaid-live-editor可视化:
额外功能:它还可以用于使用甘特图可视化执行历史:
启用所有任务:
ALTER TASK TEST.TEST.TASK8 RESUME;
ALTER TASK TEST.TEST.TASK7 RESUME;
ALTER TASK TEST.TEST.TASK6 RESUME;
ALTER TASK TEST.TEST.TASK5 RESUME;
ALTER TASK TEST.TEST.TASK4 RESUME;
ALTER TASK TEST.TEST.TASK3 RESUME;
ALTER TASK TEST.TEST.TASK2 RESUME;
ALTER TASK TEST.TEST.TASK1 RESUME;
SHOW TASKS;
正在生成甘特图:
SELECT
CONCAT_WS('.', DATABASE_NAME, SCHEMA_NAME, NAME) AS TASK_NAME,
QUERY_START_TIME,
COMPLETED_TIME,
DATEDIFF(SECOND, QUERY_START_TIME, COMPLETED_TIME) AS DURATION_SEC,
TASK_NAME || ':' || TO_VARCHAR(QUERY_START_TIME, 'YYYY-MM-DD HH:MI:SS')
|| ',' || DURATION_SEC || 's' AS GRAPH_ENTRY,
s.gantt || LISTAGG(CHAR(9) || GRAPH_ENTRY || CHAR(13), '')
WITHIN GROUP(ORDER BY QUERY_START_TIME) OVER() AS graph
FROM TABLE(information_schema.task_history(
scheduled_time_range_start=>'2021-05-16 07:00:00.000'::TIMESTAMP_LTZ))
,LATERAL(SELECT REPLACE(
'gantt
title Task execution
dateFormat YYYY-MM-DD HH:mm:ss
axisFormat %Y-%m-%d %H:%M
section RunId=<run_id>
'
,'<run_id>'
,RUN_ID)
) s(gantt)
WHERE STATE = 'SUCCEEDED'
--AND RUN_ID = x
ORDER BY scheduled_time;
输出:
gantt
title Task execution
dateFormat YYYY-MM-DD HH:mm:ss
axisFormat %Y-%m-%d %H:%M
section RunId=xxxxxx
TEST.TEST.TASK1:2021-05-16 07:13:45,20s
TEST.TEST.TASK5:2021-05-16 07:14:06,31s
TEST.TEST.TASK4:2021-05-16 07:14:09,21s
TEST.TEST.TASK2:2021-05-16 07:14:15,30s
TEST.TEST.TASK8:2021-05-16 07:14:51,34s
TEST.TEST.TASK7:2021-05-16 07:14:51,50s
TEST.TEST.TASK3:2021-05-16 07:15:01,60s
TEST.TEST.TASK6:2021-05-16 07:16:15,40s