在 Airflow 中循环参数的最佳方法?

Best way to loop through parameters in Airflow?

我正在尝试熟悉 Airflow 并一直喜欢它。

但是我有点不清楚的一件事是如何在我想要 运行 相同的 dag 但并行地用于多个业务线 (lob) 的地方正确地参数化我的 dag。所以基本上我想 运行 下面的 dag 用于每个 运行 中的多个 lob,并让每个 lob 运行 平行。

所以假设我定义了一个变量,它是一个 lob 数组,如“lob1”、“lob2”等。我想用下面的 bigquery sql 语句替换 'mylob' 'lob1' 然后 'lob2' 等等

我在想也许我可以将 lob 存储为 ui 中的变量,然后在 dag 中循环遍历它,但我不确定它是否会在等待时按顺序结束每个循环迭代中要完成的每个任务。

我认为另一种方法可能是使用这个参数化的 dag 作为一种更大的 driver dag 中的子 dag。但再次不确定这是否是最佳实践方法。

非常感谢任何帮助或指点。我觉得我在这里遗漏了一些明显的东西,但不是 quite 在任何地方找到这样的例子。

"""
### My first dag to play around with bigquery and gcp stuff.
"""

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from dateutil import tz
from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.contrib.operators.bigquery_operator import BigQueryOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2017, 3, 10),    
    'email': ['xxx@xxx.com'],
    'email_on_failure': True,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

with DAG('my_bq_dag_2', schedule_interval='30 */1 * * *',
         default_args=default_args) as dag:
      
    bq_msg_1 = BigQueryOperator(
        task_id='my_bq_task_1',
        bql='select "mylob" as lob, "Hello World!" as msg',
        destination_dataset_table='airflow.test1',
        write_disposition='WRITE_TRUNCATE',
        bigquery_conn_id='gcp_smoke'
    )
    
    bq_msg_1.doc_md = """\
    #### Task Documentation
    Append a "Hello World!" message string to the table [airflow.msg]
    """
    
    bq_msg_2 = BigQueryOperator(
        task_id='my_bq_task_2',
        bql='select "mylob" as lob, "Goodbye World!" as msg',
        destination_dataset_table='airflow.test1',
        write_disposition='WRITE_APPEND',
        bigquery_conn_id='gcp_smoke'
    )
    
    bq_msg_2.doc_md = """\
    #### Task Documentation
    Append a "Goodbye World!" message string to the table [airflow.msg]
    """
    
    # set dependencies
    bq_msg_2.set_upstream(bq_msg_1)

更新:试图让它工作,但它似乎永远无法实现 lob2


"""
### My first dag to play around with bigquery and gcp stuff.
"""

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.contrib.operators.bigquery_operator import BigQueryOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2017, 3, 10),    
    'email': ['xxx@xxx.com'],
    'email_on_failure': True,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG('my_bq_dag_2', schedule_interval='@once',default_args=default_args)

lobs = ["lob1","lob2","lob3"]

for lob in lobs:

    templated_command = """
        select '{{ params.lob }}' as lob, concat(string(current_timestamp()),' - Hello - {{ ds }}') as msg
    """    
         
    bq_msg_1 = BigQueryOperator(
        dag = dag,
        task_id='my_bq_task_1',
        bql=templated_command,
        params={'lob': lob},
        destination_dataset_table='airflow.test1',
        write_disposition='WRITE_APPEND',
        bigquery_conn_id='gcp_smoke'
    )

使用trigger_dag概念。它适用于此类用例。其中,您将参数从控制器 dag 传递给 subdag。您将在气流安装的示例文件夹中找到示例。

我想我找到了一个似乎对我有用的 answer/approach(我上面的问题是没有唯一的任务 ID)。

写了一个关于示例的小博客 post 以防对其他人有用。

http://engineering.pmc.com/2017/03/playing-around-with-apache-airflow-bigquery-62/