在 DAG 中动态创建任务时 Apache Airflow 超时错误
Apache Airflow Timeout error when dynamically creating tasks in DAG
在我的旧 DAG 中,我创建了这样的任务:
start_task = DummyOperator(task_id = "start_task")
t1 = PythonOperator(task_id = "t1", python_callable = get_t1)
t2 = PythonOperator(task_id = "t2", python_callable = get_t2)
t3 = PythonOperator(task_id = "t3", python_callable = get_t3)
t4 = PythonOperator(task_id = "t4", python_callable = get_t4)
t5 = PythonOperator(task_id = "t5", python_callable = get_t5)
t6 = PythonOperator(task_id = "t6", python_callable = get_t6)
t7 = PythonOperator(task_id = "t7", python_callable = get_t7)
t8 = PythonOperator(task_id = "t8", python_callable = get_t8)
t9 = PythonOperator(task_id = "t9", python_callable = get_t9)
t10 = PythonOperator(task_id = "t10", python_callable = get_t10)
t11 = PythonOperator(task_id = "t11", python_callable = get_t11)
end_task = DummyOperator(task_id = "end_task")
start_task >> [t1, t2, t3, t4, t5, t6, t7, t8, t9, t10, t11] >> end_task
这些任务中的每一个 运行 都是一个不同的查询,并且每个任务都是 运行 并发的。我修改了我的代码,因为其中大部分是多余的,可以放在函数中。在我的新代码中,我还尝试通过从 .json 中读取每个任务的查询和元数据来动态创建任务。
新代码:
loaded_info = load_info() # function call to load .json data into a list
start_task = DummyOperator(task_id = "start_task")
end_task = DummyOperator(task_id = "end_task")
tasks = [] # empty list to append tasks to in for loop
for x in loaded_info:
qce = QCError(**x)
id = qce.column
task = PythonOperator(task_id = id, python_callable = create_task(qce))
tasks.append(task)
start_task >> tasks >> end_task
这个新代码看起来很好,但是它阻止了我运行宁airflow initdb
。在 运行 执行命令后,终端将一直等待并且永远不会完成,直到我最终 CRTL+C 杀死它,然后最终在杀死后给我一个错误:
raise AirflowTaskTimeout(self.error_message)
pandas.io.sql.DatabaseError: Execution failed on sql 'select ..., count(*) as frequency from ... where ... <> all (array['...', '...', etc.]) or ... is null group by ... order by ... asc': Timeout, PID: 315
(注意:上面错误语句中的查询只是.json中的第一个查询)。考虑到我在使用旧 DAG 时从未出现过此错误,我假设这是由于动态任务创建造成的,但我需要帮助来确定究竟是什么导致了此错误。
我尝试过的:
- 运行 Airflow Webserver Ad-Hoc 中的每个查询(它们都工作正常)
- 在本地创建一个测试脚本 运行 并输出 .json 的内容以确保所有内容的格式正确等
我终于从 airflow initdb
到 运行(但我还没有测试我的工作,稍后会更新它的状态)。
事实证明,在定义 python 运算符时,您不能像我那样包含参数:
task = PythonOperator(task_id = id, python_callable = create_task(qce))
将 qce
传递给 create_tasks
是导致错误的原因。要将参数传递给您的任务,请参阅 。
对于那些想要查看针对我的确切案例的修复的人,我有这个:
with DAG("dva_event_analysis_dag", default_args = DEFAULT_ARGS, schedule_interval = None, catchup = False) as dag:
loaded_info = load_info()
start_task = DummyOperator(task_id = "start_task")
end_task = DummyOperator(task_id = "end_task")
tasks = []
for x in loaded_info:
id = x["column"]
task = PythonOperator(task_id = id, provide_context = True, python_callable = create_task, op_kwargs = x)
tasks.append(task)
start_task >> tasks >> end_task
更新(2019 年 7 月 3 日):作业状态为成功。这确实是对我的错误的修复。希望这可以帮助其他有类似问题的人。
在我的旧 DAG 中,我创建了这样的任务:
start_task = DummyOperator(task_id = "start_task")
t1 = PythonOperator(task_id = "t1", python_callable = get_t1)
t2 = PythonOperator(task_id = "t2", python_callable = get_t2)
t3 = PythonOperator(task_id = "t3", python_callable = get_t3)
t4 = PythonOperator(task_id = "t4", python_callable = get_t4)
t5 = PythonOperator(task_id = "t5", python_callable = get_t5)
t6 = PythonOperator(task_id = "t6", python_callable = get_t6)
t7 = PythonOperator(task_id = "t7", python_callable = get_t7)
t8 = PythonOperator(task_id = "t8", python_callable = get_t8)
t9 = PythonOperator(task_id = "t9", python_callable = get_t9)
t10 = PythonOperator(task_id = "t10", python_callable = get_t10)
t11 = PythonOperator(task_id = "t11", python_callable = get_t11)
end_task = DummyOperator(task_id = "end_task")
start_task >> [t1, t2, t3, t4, t5, t6, t7, t8, t9, t10, t11] >> end_task
这些任务中的每一个 运行 都是一个不同的查询,并且每个任务都是 运行 并发的。我修改了我的代码,因为其中大部分是多余的,可以放在函数中。在我的新代码中,我还尝试通过从 .json 中读取每个任务的查询和元数据来动态创建任务。
新代码:
loaded_info = load_info() # function call to load .json data into a list
start_task = DummyOperator(task_id = "start_task")
end_task = DummyOperator(task_id = "end_task")
tasks = [] # empty list to append tasks to in for loop
for x in loaded_info:
qce = QCError(**x)
id = qce.column
task = PythonOperator(task_id = id, python_callable = create_task(qce))
tasks.append(task)
start_task >> tasks >> end_task
这个新代码看起来很好,但是它阻止了我运行宁airflow initdb
。在 运行 执行命令后,终端将一直等待并且永远不会完成,直到我最终 CRTL+C 杀死它,然后最终在杀死后给我一个错误:
raise AirflowTaskTimeout(self.error_message)
pandas.io.sql.DatabaseError: Execution failed on sql 'select ..., count(*) as frequency from ... where ... <> all (array['...', '...', etc.]) or ... is null group by ... order by ... asc': Timeout, PID: 315
(注意:上面错误语句中的查询只是.json中的第一个查询)。考虑到我在使用旧 DAG 时从未出现过此错误,我假设这是由于动态任务创建造成的,但我需要帮助来确定究竟是什么导致了此错误。
我尝试过的:
- 运行 Airflow Webserver Ad-Hoc 中的每个查询(它们都工作正常)
- 在本地创建一个测试脚本 运行 并输出 .json 的内容以确保所有内容的格式正确等
我终于从 airflow initdb
到 运行(但我还没有测试我的工作,稍后会更新它的状态)。
事实证明,在定义 python 运算符时,您不能像我那样包含参数:
task = PythonOperator(task_id = id, python_callable = create_task(qce))
将 qce
传递给 create_tasks
是导致错误的原因。要将参数传递给您的任务,请参阅
对于那些想要查看针对我的确切案例的修复的人,我有这个:
with DAG("dva_event_analysis_dag", default_args = DEFAULT_ARGS, schedule_interval = None, catchup = False) as dag:
loaded_info = load_info()
start_task = DummyOperator(task_id = "start_task")
end_task = DummyOperator(task_id = "end_task")
tasks = []
for x in loaded_info:
id = x["column"]
task = PythonOperator(task_id = id, provide_context = True, python_callable = create_task, op_kwargs = x)
tasks.append(task)
start_task >> tasks >> end_task
更新(2019 年 7 月 3 日):作业状态为成功。这确实是对我的错误的修复。希望这可以帮助其他有类似问题的人。