AirFlow DAG 卡在 运行 状态
AirFlow DAG Get stuck in running state
我创建了一个 dag 并每天安排它。
它每天都在排队,但任务实际上并不 运行。
这个问题已经在过去 here 提出,但答案对我没有帮助,所以似乎还有另一个问题。
我的代码在下面分享。我用注释替换了任务 t2 的 SQL。
当我使用 "airflow test..." 在 CLI 上分别 运行 时,每个任务 运行 都成功了。
您能解释一下制作 DAG 运行 应该做什么吗?
谢谢!
这是 DAG 代码:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
default_args = {
'owner' : 'me',
'depends_on_past' : 'true',
'start_date' : datetime(2018, 06, 25),
'email' : ['myemail@moovit.com'],
'email_on_failure':True,
'email_on_retry':False,
'retries' : 2,
'retry_delay' : timedelta(minutes=5)
}
dag = DAG('my_agg_table',
default_args = default_args,
schedule_interval = "30 4 * * *"
)
t1 = BigQueryOperator(
task_id='bq_delete_my_agg_table',
use_legacy_sql=False,
write_disposition='WRITE_TRUNCATE',
allow_large_results=True,
bql='''
delete `my_project.agg.my_agg_table`
where date = '{{ macros.ds_add(ds, -1)}}'
''',
dag=dag)
t2 = BigQueryOperator(
task_id='bq_insert_my_agg_table',
use_legacy_sql=False,
write_disposition='WRITE_APPEND',
allow_large_results=True,
bql='''
#standardSQL
Select ... the query continue here.....
''', destination_dataset_table='my_project.agg.my_agg_table',
dag=dag)
t1 >> t2
通常很容易找出任务未完成的原因 运行。在 Airflow 网络中时 UI:
- select 任何感兴趣的 DAG
- 现在点击任务
- 再次单击
Task Instance Details
- 第一行有一个面板
Task Instance State
- 在它旁边的方框
Reason
中是运行 执行任务的原因 - 或者忽略任务的原因
检查未执行的第一个任务通常是有意义的,因为我看到您设置了 depends_on_past=True
如果在错误的情况下使用可能会导致问题。
更多相关信息:Airflow 1.9.0 is queuing but not launching tasks
我创建了一个 dag 并每天安排它。 它每天都在排队,但任务实际上并不 运行。 这个问题已经在过去 here 提出,但答案对我没有帮助,所以似乎还有另一个问题。
我的代码在下面分享。我用注释替换了任务 t2 的 SQL。 当我使用 "airflow test..." 在 CLI 上分别 运行 时,每个任务 运行 都成功了。
您能解释一下制作 DAG 运行 应该做什么吗? 谢谢!
这是 DAG 代码:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
default_args = {
'owner' : 'me',
'depends_on_past' : 'true',
'start_date' : datetime(2018, 06, 25),
'email' : ['myemail@moovit.com'],
'email_on_failure':True,
'email_on_retry':False,
'retries' : 2,
'retry_delay' : timedelta(minutes=5)
}
dag = DAG('my_agg_table',
default_args = default_args,
schedule_interval = "30 4 * * *"
)
t1 = BigQueryOperator(
task_id='bq_delete_my_agg_table',
use_legacy_sql=False,
write_disposition='WRITE_TRUNCATE',
allow_large_results=True,
bql='''
delete `my_project.agg.my_agg_table`
where date = '{{ macros.ds_add(ds, -1)}}'
''',
dag=dag)
t2 = BigQueryOperator(
task_id='bq_insert_my_agg_table',
use_legacy_sql=False,
write_disposition='WRITE_APPEND',
allow_large_results=True,
bql='''
#standardSQL
Select ... the query continue here.....
''', destination_dataset_table='my_project.agg.my_agg_table',
dag=dag)
t1 >> t2
通常很容易找出任务未完成的原因 运行。在 Airflow 网络中时 UI:
- select 任何感兴趣的 DAG
- 现在点击任务
- 再次单击
Task Instance Details
- 第一行有一个面板
Task Instance State
- 在它旁边的方框
Reason
中是运行 执行任务的原因 - 或者忽略任务的原因
检查未执行的第一个任务通常是有意义的,因为我看到您设置了 depends_on_past=True
如果在错误的情况下使用可能会导致问题。
更多相关信息:Airflow 1.9.0 is queuing but not launching tasks