任务失败后气流停止调度 dagruns

airflow stops scheduling dagruns after task failure

我对 dag运行 失败时气流的作用感到困惑。我想要实现的行为是:

  1. DAG 的定期触发(每小时)
  2. 重试任务
  3. 如果任务失败 n 次重试,请发送有关失败的电子邮件
  4. 当下一个每小时触发时,触发一个新的 dag运行,就好像什么都没有失败一样。

这些是我的 dag 参数和任务参数:

任务默认值:

'depends_on_past': True,
'start_date': airflow.utils.dates.days_ago(2),
'email': ['email@address.co.uk'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
'wait_for_downstream': False,

dag 参数:

schedule_interval=timedelta(minutes=60),
catchup=False,
max_active_runs=1

我想我误解了其中的一些论点,因为在我看来,如果任务失败 n 次(即 dag运行 失败),那么下一个 dag运行 会被安排但是永远处于 运行ning 状态,再也没有 dag运行s 成功(甚至被安排)。例如,这里是 dag运行s(我不知道在哪里可以找到像 这样的基于文本的调度程序日志),其中 dags 被安排为每 5 分钟 运行每小时:

每5分钟执行运行s直到失败,之后最后一次执行就处于运行ning状态,过去30分钟一直如此

我做错了什么?

我应该补充一点,重新启动调度程序没有帮助,手动将 运行ning 任务设置为失败也没有帮助...

您已将 depends_on_past 设置为 True,这会阻止启动下一个 DagRun。

From the docs: depends_on_past (bool) – 当设置为 true 时,任务实例将 运行 顺序地依赖于前一个任务的计划来成功。 start_date 的任务实例允许 运行。

这意味着您的 Dag 正在尝试 运行,但它正在等待,直到来自先前 DagRun 的相应任务具有成功状态。

这个问题让我很头疼,所以我想post一个完整的解决方案。

在我的例子中,即使我有 depends_on_past = False 选项,但在上一次执行失败时下一个 DAG 的执行并未开始。 这是因为 wait_for_downstream 选项为 True 且此组合不兼容。根据文档:

wait_for_downstream (bool) - when set to true, an instance of task X will wait >for tasks immediately downstream of the previous instance of task X to finish >successfully before it runs. This is useful if the different instances of a task >X alter the same asset, and this asset is used by tasks downstream of task X. >Note that depends_on_past is forced to True wherever wait_for_downstream is used.

最后请注意,max_active_runs = 1 选项很重要 被激活是因为在另一种情况下,同一个任务可以在多个后续 dags 运行.

上同时启动 运行ning
from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator

args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'wait_for_downstream': False,
    'start_date': datetime(2019, 7, 20),
}

dag = DAG(
    dag_id='test_v8',
    default_args=args,
    schedule_interval='* * * * *',
    catchup=False,
    max_active_runs=1

)

from time import sleep


def sleep_1():
    sleep(1)


def sleep_2():
    sleep(2)


sleep_2 = PythonOperator(
    task_id='sleep_2',
    python_callable=sleep_2,
    dag=dag,
)

sleep_1 = PythonOperator(
    task_id='sleep_1',
    python_callable=sleep_1,
    dag=dag,
)

sleep_1 >> sleep_2

终于成功了!