如何在预任务失败且 depends_on_past = true 时触发气流中的任务?
How to trigger task in airflow when pre-task failed and depends_on_past = true?
我有一个任务需要每 5 分钟触发一次。而下一个任务需要等待上一个任务完成(无论上一个任务成功或失败)。
这是我配置参数的方法。
default_args = {
'owner': 'airflow',
'start_date': datetime(2019, 7, 1, 8, 30),
'max_active_runs': 1,
'depends_on_past': True,
'execution_timeout': timedelta(seconds=300)
}
dag = DAG(
dag_id='dag1', default_args=default_args,
schedule_interval='*/5 8-16 * * *',
dagrun_timeout=timedelta(minutes=600))
def task1(ds, **kwargs):
#do something
task1 = PythonOperator(
task_id='task1',
provide_context=True,
python_callable=task1,
trigger_rule= 'all_done',
dag=dag)
在我的配置下,task1会在前一个状态成功时被触发,而在前一个状态失败时被阻塞。
我在 airflow 的文档 "trigger_rule can be used in conjunction with depends_on_past (boolean) that, when set to True, keeps a task from getting triggered if the previous schedule for the task hasn’t succeeded" 中找到了描述。
那么怎样才能达到在前面的task1已经完成的情况下,不管pre-state是什么,都触发task1的目的呢?
max_active_runs: 1 将只允许一次执行 dag,覆盖您等待之前的要求。
您有 2 个 depends_on_past 设置。删除一个并将其设置为 false。
我有一个任务需要每 5 分钟触发一次。而下一个任务需要等待上一个任务完成(无论上一个任务成功或失败)。
这是我配置参数的方法。
default_args = {
'owner': 'airflow',
'start_date': datetime(2019, 7, 1, 8, 30),
'max_active_runs': 1,
'depends_on_past': True,
'execution_timeout': timedelta(seconds=300)
}
dag = DAG(
dag_id='dag1', default_args=default_args,
schedule_interval='*/5 8-16 * * *',
dagrun_timeout=timedelta(minutes=600))
def task1(ds, **kwargs):
#do something
task1 = PythonOperator(
task_id='task1',
provide_context=True,
python_callable=task1,
trigger_rule= 'all_done',
dag=dag)
在我的配置下,task1会在前一个状态成功时被触发,而在前一个状态失败时被阻塞。 我在 airflow 的文档 "trigger_rule can be used in conjunction with depends_on_past (boolean) that, when set to True, keeps a task from getting triggered if the previous schedule for the task hasn’t succeeded" 中找到了描述。
那么怎样才能达到在前面的task1已经完成的情况下,不管pre-state是什么,都触发task1的目的呢?
max_active_runs: 1 将只允许一次执行 dag,覆盖您等待之前的要求。
您有 2 个 depends_on_past 设置。删除一个并将其设置为 false。