Airflow 中的动态任务定义

Dynamic task definition in Airflow

我目前正在尝试使用 Airflow 编排一个过程,其中一些运算符是动态定义的,并依赖于另一个(较早的)运算符的输出。

在下面的代码中,t1 用新记录更新了一个文本文件(这些实际上是从外部队列中读取的,但为了简单起见,我在这里将它们硬编码为 A、B 和 C)。然后,我想为从该文本文件中读取的每条记录创建单独的运算符。这些运算符将分别创建目录 A、B 和 C,并且在 Airflow 中 UI 将被视为单独的 bash 进程 Create_directory_A、Create_directory_B 和 Create_directory_C。

dag = DAG('Test_DAG',
          description="Lorem ipsum.",
          start_date=datetime(2017, 3, 20),
          schedule_interval=None,
          catchup=False)


def create_text_file(list_of_rows):
    text_file = open('text_file.txt', "w")
    for row in list_of_rows:
        text_file.write(row + '\n')
    text_file.close()


def read_text():
    txt_file = open('text_file.txt', 'r')
    return [element for element in txt_file.readlines()]


t1 = PythonOperator(
    task_id='Create_text_file',
    python_callable=create_text_file,
    op_args=[['A', 'B', 'C']],
    dag=dag
)

for row in read_text():
    t2 = BashOperator(
        task_id='Create_directory_{}'.format(row),
        bash_command="mkdir {{params.dir_name}}",
        params={'dir_name': row},
        dag=dag
    )

    t1 >> t2

Airflow’s documentation 中,我可以看到 调度程序将定期执行它 [DAG] 以反映任何变化 。这是否意味着存在这样的风险,即使我的 t1 运算符在 t2 之前执行,bash 运算符是在更新之前为记录列表创建的(因为那是评估 DAG 的时候)?

此代码实际上将创建一个 t2 实例,它将是 bash 运算符,它是用从 read_text() 获得的最后一个 row 构建的。我确定这不是你想要的。

更好的方法是为您的 t2 运算符创建一个单独的 DAG,它在 t1 写入文件时触发。有一个关于此的 SO 问题可能会有所帮助:

您不能动态创建依赖于上游任务输出的任务。您混淆了计划和执行时间。 DAG definitiontask 在计划时间创建。 DAG 运行 任务实例 在执行时创建。只有一个任务实例可以产生输出。

Airflow 调度程序将使用 text_file.txt 调度时间 包含的任何内容构建动态图。然后将这些任务发送给工人。

一个worker最终会执行t1个任务实例并创建一个新的text_file.txt,但是此时t2个任务的列表已经被调度器计算出来了发给工人了。

因此,无论最新的 t1 任务实例转储到 text_file.txt 中,都将在下次调度程序决定 运行 DAG 时使用。

如果你的任务很快,而且你的工人没有积压,那将是以前 DAG 运行 的内容。如果它们被积压,text_file.txt 内容可能会过时,如果你真的不走运,调度程序会在任务实例写入文件时读取文件,你将从 read_text() 获得不完整的数据.