气流:并行链接任务

Airflow: chaining tasks in parallel

我目前正在开发一个需要每月循环处理一长串任务的 DAG。

为此,我创建了一个空列表,然后遍历多个任务,根据新的月份更改它们的 task_ids。

示例:

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.helpers import chain

dag = DAG(
    "import_trx_table",
    default_args=default_args,
    schedule_interval="45 9 * * *",
)

task_list = []

for month_ds in month_lst_ds:
    start = DummyOperator(task_id=f"dummy_start_trx_table_imports_{month_ds}", dag=dag)

    task_list.append(start)

...

chain(*task_list)

问题是 DAG 现在非常长(而且很慢)(因为我迭代了超过 12 个月)。其中一些任务肯定可以 运行 并行。

我试图检查链辅助函数是否有并行链接的方法,但找不到任何东西。

任何suggestions/ideas?

谢谢。

您可以使用 chain 但它在这里并没有真正提供价值。

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from datetime import datetime

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2021, 6, 7),
}

dag = DAG(
    "Whosebug_question",
    default_args=default_args,
    schedule_interval="@daily",
)

month_lst_ds = ['Dec', 'Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov']

start_op = DummyOperator(task_id='start_task', dag=dag)
end_op = DummyOperator(task_id='end_task', dag=dag)
for month_ds in month_lst_ds:
    month_op = DummyOperator(task_id=f"dummy_start_trx_table_imports_{month_ds}", dag=dag)
    start_op >> month_op >> end_op 

如果你愿意可以替换

start_op >> month_op >> end_op 

chain(start_op, month_op, end_op )

这是 DAG 结构在图形视图中的样子: