如何在 Airflow 上重新启动失败的任务
How to restart a failed task on Airflow
我正在使用 LocalExecutor,我的 dag 有 3 个任务,其中任务 (C) 依赖于任务 (A)。任务 (B) 和任务 (A) 可以 运行 并行,如下所示
A-->C
乙
所以任务 (A) 失败了,但是 任务 (B) 运行 没问题。由于任务 (A) 失败,任务 (C) 尚未 运行。
我的问题是 如何单独 运行 Task(A) 以便 Task(C) 运行s 一旦 Task(A) 完成并且Airflow UI 将它们标记为成功。
在UI中:
- 转到要更改的 运行 的 dag 运行
- 单击 GraphView
- 点击任务 A
- 点击"Clear"
这会让任务A再次运行,如果成功,任务C应该运行。
这是可行的,因为当您清除任务的状态时,调度程序会将其视为在此 dag 运行.
之前没有 运行
这是一个替代解决方案,您可以清除它并自动重试某些任务。如果您只想清除某个任务,则不会使用 -d(下游)标志:
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
def clear_upstream_task(context):
execution_date = context.get("execution_date")
clear_tasks = BashOperator(
task_id='clear_tasks',
bash_command=f'airflow tasks clear -s {execution_date} -t t1 -d -y clear_upstream_task'
)
return clear_tasks.execute(context=context)
# Default settings applied to all tasks
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(seconds=5)
}
with DAG('clear_upstream_task',
start_date=datetime(2021, 1, 1),
max_active_runs=3,
schedule_interval=timedelta(minutes=5),
default_args=default_args,
catchup=False
) as dag:
t0 = DummyOperator(
task_id='t0'
)
t1 = DummyOperator(
task_id='t1'
)
t2 = DummyOperator(
task_id='t2'
)
t3 = BashOperator(
task_id='t3',
bash_command='exit 123',
on_failure_callback=clear_upstream_task
)
t0 >> t1 >> t2 >> t3
我正在使用 LocalExecutor,我的 dag 有 3 个任务,其中任务 (C) 依赖于任务 (A)。任务 (B) 和任务 (A) 可以 运行 并行,如下所示
A-->C
乙
所以任务 (A) 失败了,但是 任务 (B) 运行 没问题。由于任务 (A) 失败,任务 (C) 尚未 运行。
我的问题是 如何单独 运行 Task(A) 以便 Task(C) 运行s 一旦 Task(A) 完成并且Airflow UI 将它们标记为成功。
在UI中:
- 转到要更改的 运行 的 dag 运行
- 单击 GraphView
- 点击任务 A
- 点击"Clear"
这会让任务A再次运行,如果成功,任务C应该运行。 这是可行的,因为当您清除任务的状态时,调度程序会将其视为在此 dag 运行.
之前没有 运行这是一个替代解决方案,您可以清除它并自动重试某些任务。如果您只想清除某个任务,则不会使用 -d(下游)标志:
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
def clear_upstream_task(context):
execution_date = context.get("execution_date")
clear_tasks = BashOperator(
task_id='clear_tasks',
bash_command=f'airflow tasks clear -s {execution_date} -t t1 -d -y clear_upstream_task'
)
return clear_tasks.execute(context=context)
# Default settings applied to all tasks
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(seconds=5)
}
with DAG('clear_upstream_task',
start_date=datetime(2021, 1, 1),
max_active_runs=3,
schedule_interval=timedelta(minutes=5),
default_args=default_args,
catchup=False
) as dag:
t0 = DummyOperator(
task_id='t0'
)
t1 = DummyOperator(
task_id='t1'
)
t2 = DummyOperator(
task_id='t2'
)
t3 = BashOperator(
task_id='t3',
bash_command='exit 123',
on_failure_callback=clear_upstream_task
)
t0 >> t1 >> t2 >> t3