Airflow 中的动态任务定义
Dynamic task definition in Airflow
我目前正在尝试使用 Airflow 编排一个过程,其中一些运算符是动态定义的,并依赖于另一个(较早的)运算符的输出。
在下面的代码中,t1 用新记录更新了一个文本文件(这些实际上是从外部队列中读取的,但为了简单起见,我在这里将它们硬编码为 A、B 和 C)。然后,我想为从该文本文件中读取的每条记录创建单独的运算符。这些运算符将分别创建目录 A、B 和 C,并且在 Airflow 中 UI 将被视为单独的 bash 进程 Create_directory_A、Create_directory_B 和 Create_directory_C。
dag = DAG('Test_DAG',
description="Lorem ipsum.",
start_date=datetime(2017, 3, 20),
schedule_interval=None,
catchup=False)
def create_text_file(list_of_rows):
text_file = open('text_file.txt', "w")
for row in list_of_rows:
text_file.write(row + '\n')
text_file.close()
def read_text():
txt_file = open('text_file.txt', 'r')
return [element for element in txt_file.readlines()]
t1 = PythonOperator(
task_id='Create_text_file',
python_callable=create_text_file,
op_args=[['A', 'B', 'C']],
dag=dag
)
for row in read_text():
t2 = BashOperator(
task_id='Create_directory_{}'.format(row),
bash_command="mkdir {{params.dir_name}}",
params={'dir_name': row},
dag=dag
)
t1 >> t2
在 Airflow’s documentation 中,我可以看到 调度程序将定期执行它 [DAG] 以反映任何变化 。这是否意味着存在这样的风险,即使我的 t1 运算符在 t2 之前执行,bash 运算符是在更新之前为记录列表创建的(因为那是评估 DAG 的时候)?
此代码实际上将创建一个 t2
实例,它将是 bash 运算符,它是用从 read_text()
获得的最后一个 row
构建的。我确定这不是你想要的。
更好的方法是为您的 t2
运算符创建一个单独的 DAG,它在 t1
写入文件时触发。有一个关于此的 SO 问题可能会有所帮助:
您不能动态创建依赖于上游任务输出的任务。您混淆了计划和执行时间。 DAG definition 和 task 在计划时间创建。 DAG 运行 和 任务实例 在执行时创建。只有一个任务实例可以产生输出。
Airflow 调度程序将使用 text_file.txt
在 调度时间 包含的任何内容构建动态图。然后将这些任务发送给工人。
一个worker最终会执行t1
个任务实例并创建一个新的text_file.txt
,但是此时t2
个任务的列表已经被调度器计算出来了发给工人了。
因此,无论最新的 t1
任务实例转储到 text_file.txt
中,都将在下次调度程序决定 运行 DAG 时使用。
如果你的任务很快,而且你的工人没有积压,那将是以前 DAG 运行 的内容。如果它们被积压,text_file.txt
内容可能会过时,如果你真的不走运,调度程序会在任务实例写入文件时读取文件,你将从 read_text()
获得不完整的数据.
我目前正在尝试使用 Airflow 编排一个过程,其中一些运算符是动态定义的,并依赖于另一个(较早的)运算符的输出。
在下面的代码中,t1 用新记录更新了一个文本文件(这些实际上是从外部队列中读取的,但为了简单起见,我在这里将它们硬编码为 A、B 和 C)。然后,我想为从该文本文件中读取的每条记录创建单独的运算符。这些运算符将分别创建目录 A、B 和 C,并且在 Airflow 中 UI 将被视为单独的 bash 进程 Create_directory_A、Create_directory_B 和 Create_directory_C。
dag = DAG('Test_DAG',
description="Lorem ipsum.",
start_date=datetime(2017, 3, 20),
schedule_interval=None,
catchup=False)
def create_text_file(list_of_rows):
text_file = open('text_file.txt', "w")
for row in list_of_rows:
text_file.write(row + '\n')
text_file.close()
def read_text():
txt_file = open('text_file.txt', 'r')
return [element for element in txt_file.readlines()]
t1 = PythonOperator(
task_id='Create_text_file',
python_callable=create_text_file,
op_args=[['A', 'B', 'C']],
dag=dag
)
for row in read_text():
t2 = BashOperator(
task_id='Create_directory_{}'.format(row),
bash_command="mkdir {{params.dir_name}}",
params={'dir_name': row},
dag=dag
)
t1 >> t2
在 Airflow’s documentation 中,我可以看到 调度程序将定期执行它 [DAG] 以反映任何变化 。这是否意味着存在这样的风险,即使我的 t1 运算符在 t2 之前执行,bash 运算符是在更新之前为记录列表创建的(因为那是评估 DAG 的时候)?
此代码实际上将创建一个 t2
实例,它将是 bash 运算符,它是用从 read_text()
获得的最后一个 row
构建的。我确定这不是你想要的。
更好的方法是为您的 t2
运算符创建一个单独的 DAG,它在 t1
写入文件时触发。有一个关于此的 SO 问题可能会有所帮助:
您不能动态创建依赖于上游任务输出的任务。您混淆了计划和执行时间。 DAG definition 和 task 在计划时间创建。 DAG 运行 和 任务实例 在执行时创建。只有一个任务实例可以产生输出。
Airflow 调度程序将使用 text_file.txt
在 调度时间 包含的任何内容构建动态图。然后将这些任务发送给工人。
一个worker最终会执行t1
个任务实例并创建一个新的text_file.txt
,但是此时t2
个任务的列表已经被调度器计算出来了发给工人了。
因此,无论最新的 t1
任务实例转储到 text_file.txt
中,都将在下次调度程序决定 运行 DAG 时使用。
如果你的任务很快,而且你的工人没有积压,那将是以前 DAG 运行 的内容。如果它们被积压,text_file.txt
内容可能会过时,如果你真的不走运,调度程序会在任务实例写入文件时读取文件,你将从 read_text()
获得不完整的数据.