在 Airflow 中生成多个任务时反转 upstream/downstream 关系
Reversed upstream/downstream relationships when generating multiple tasks in Airflow
本题相关的原代码可参见here。
我对 up 位移运算符和 set_upstream
/set_downstream
方法在我的 DAG 中定义的任务循环中工作感到困惑。当DAG的主执行循环配置如下:
for uid in dash_workers.get_id_creds():
clear_tables.set_downstream(id_worker(uid))
或
for uid in dash_workers.get_id_creds():
clear_tables >> id_worker(uid)
图形如下所示(字母数字序列是用户 ID,也定义了任务 ID):
当我像这样配置 DAG 的主执行循环时:
for uid in dash_workers.get_id_creds():
clear_tables.set_upstream(id_worker(uid))
或
for uid in dash_workers.get_id_creds():
id_worker(uid) >> clear_tables
图形如下所示:
第二张图是我想要的/我期望根据我阅读文档生成的前两个代码片段。如果我希望 clear_tables
在触发我的不同用户 ID 的批量数据解析任务之前先执行,我应该将其指示为 clear_tables >> id_worker(uid)
编辑 -- 这是完整的代码,自从我发布最后几个问题以来已经更新,供参考:
from datetime import datetime
import os
import sys
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
import ds_dependencies
SCRIPT_PATH = os.getenv('DASH_PREPROC_PATH')
if SCRIPT_PATH:
sys.path.insert(0, SCRIPT_PATH)
import dash_workers
else:
print('Define DASH_PREPROC_PATH value in environmental variables')
sys.exit(1)
ENV = os.environ
default_args = {
'start_date': datetime.now(),
}
DAG = DAG(
dag_id='dash_preproc',
default_args=default_args
)
clear_tables = PythonOperator(
task_id='clear_tables',
python_callable=dash_workers.clear_db,
dag=DAG)
def id_worker(uid):
return PythonOperator(
task_id=id,
python_callable=dash_workers.main_preprocess,
op_args=[uid],
dag=DAG)
for uid in dash_workers.get_id_creds():
preproc_task = id_worker(uid)
clear_tables << preproc_task
在实施@LadislavIndra 的建议后,我继续对位移运算符进行相同的反向实施,以获得正确的依赖关系图。
UPDATE @AshBerlin-Taylor 的回答就是这里发生的事情。我假设 Graph View 和 Tree View 在做同样的事情,但事实并非如此。这是 id_worker(uid) >> clear_tables
在图表视图中的样子:
我当然不希望我的数据预准备例程的最后一步是删除所有数据表!
查看您的其他代码,似乎 get_id_creds
是您的任务,您正试图遍历它,这会产生一些奇怪的交互。
可行的模式是:
clear_tables = MyOperator()
for uid in uid_list:
my_task = MyOperator(task_id=uid)
clear_tables >> my_task
Airflow 中的树视图是 "backwards" 您(和我!)首先想到的。在您的第一个屏幕截图中,它显示 "clear_tables" 在 "AAAG5608078M2" 运行 任务之前必须是 运行。 DAG 状态取决于每个 id worker 任务。因此,它不是任务顺序,而是状态链树。如果这有任何意义的话。
(乍一看这似乎很奇怪,但这是因为 DAG 可以分支出来并返回分支。)
如果您查看 dag 的图形视图,您可能会更幸运。这个有箭头并以更直观的方式显示执行顺序。 (虽然我现在确实发现树视图很有用。只是开始不太清楚)
本题相关的原代码可参见here。
我对 up 位移运算符和 set_upstream
/set_downstream
方法在我的 DAG 中定义的任务循环中工作感到困惑。当DAG的主执行循环配置如下:
for uid in dash_workers.get_id_creds():
clear_tables.set_downstream(id_worker(uid))
或
for uid in dash_workers.get_id_creds():
clear_tables >> id_worker(uid)
图形如下所示(字母数字序列是用户 ID,也定义了任务 ID):
当我像这样配置 DAG 的主执行循环时:
for uid in dash_workers.get_id_creds():
clear_tables.set_upstream(id_worker(uid))
或
for uid in dash_workers.get_id_creds():
id_worker(uid) >> clear_tables
图形如下所示:
第二张图是我想要的/我期望根据我阅读文档生成的前两个代码片段。如果我希望 clear_tables
在触发我的不同用户 ID 的批量数据解析任务之前先执行,我应该将其指示为 clear_tables >> id_worker(uid)
编辑 -- 这是完整的代码,自从我发布最后几个问题以来已经更新,供参考:
from datetime import datetime
import os
import sys
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
import ds_dependencies
SCRIPT_PATH = os.getenv('DASH_PREPROC_PATH')
if SCRIPT_PATH:
sys.path.insert(0, SCRIPT_PATH)
import dash_workers
else:
print('Define DASH_PREPROC_PATH value in environmental variables')
sys.exit(1)
ENV = os.environ
default_args = {
'start_date': datetime.now(),
}
DAG = DAG(
dag_id='dash_preproc',
default_args=default_args
)
clear_tables = PythonOperator(
task_id='clear_tables',
python_callable=dash_workers.clear_db,
dag=DAG)
def id_worker(uid):
return PythonOperator(
task_id=id,
python_callable=dash_workers.main_preprocess,
op_args=[uid],
dag=DAG)
for uid in dash_workers.get_id_creds():
preproc_task = id_worker(uid)
clear_tables << preproc_task
在实施@LadislavIndra 的建议后,我继续对位移运算符进行相同的反向实施,以获得正确的依赖关系图。
UPDATE @AshBerlin-Taylor 的回答就是这里发生的事情。我假设 Graph View 和 Tree View 在做同样的事情,但事实并非如此。这是 id_worker(uid) >> clear_tables
在图表视图中的样子:
我当然不希望我的数据预准备例程的最后一步是删除所有数据表!
查看您的其他代码,似乎 get_id_creds
是您的任务,您正试图遍历它,这会产生一些奇怪的交互。
可行的模式是:
clear_tables = MyOperator()
for uid in uid_list:
my_task = MyOperator(task_id=uid)
clear_tables >> my_task
Airflow 中的树视图是 "backwards" 您(和我!)首先想到的。在您的第一个屏幕截图中,它显示 "clear_tables" 在 "AAAG5608078M2" 运行 任务之前必须是 运行。 DAG 状态取决于每个 id worker 任务。因此,它不是任务顺序,而是状态链树。如果这有任何意义的话。
(乍一看这似乎很奇怪,但这是因为 DAG 可以分支出来并返回分支。)
如果您查看 dag 的图形视图,您可能会更幸运。这个有箭头并以更直观的方式显示执行顺序。 (虽然我现在确实发现树视图很有用。只是开始不太清楚)