任务失败后气流停止调度 dagruns
airflow stops scheduling dagruns after task failure
我对 dag运行 失败时气流的作用感到困惑。我想要实现的行为是:
- DAG 的定期触发(每小时)
- 重试任务
- 如果任务失败 n 次重试,请发送有关失败的电子邮件
- 当下一个每小时触发时,触发一个新的 dag运行,就好像什么都没有失败一样。
这些是我的 dag 参数和任务参数:
任务默认值:
'depends_on_past': True,
'start_date': airflow.utils.dates.days_ago(2),
'email': ['email@address.co.uk'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
'wait_for_downstream': False,
dag 参数:
schedule_interval=timedelta(minutes=60),
catchup=False,
max_active_runs=1
我想我误解了其中的一些论点,因为在我看来,如果任务失败 n 次(即 dag运行 失败),那么下一个 dag运行 会被安排但是永远处于 运行ning 状态,再也没有 dag运行s 成功(甚至被安排)。例如,这里是 dag运行s(我不知道在哪里可以找到像 这样的基于文本的调度程序日志),其中 dags 被安排为每 5 分钟 运行每小时:
每5分钟执行运行s直到失败,之后最后一次执行就处于运行ning状态,过去30分钟一直如此
我做错了什么?
我应该补充一点,重新启动调度程序没有帮助,手动将 运行ning 任务设置为失败也没有帮助...
您已将 depends_on_past
设置为 True,这会阻止启动下一个 DagRun。
From the docs:
depends_on_past (bool) – 当设置为 true 时,任务实例将 运行 顺序地依赖于前一个任务的计划来成功。 start_date 的任务实例允许 运行。
这意味着您的 Dag 正在尝试 运行,但它正在等待,直到来自先前 DagRun 的相应任务具有成功状态。
这个问题让我很头疼,所以我想post一个完整的解决方案。
在我的例子中,即使我有 depends_on_past = False 选项,但在上一次执行失败时下一个 DAG 的执行并未开始。
这是因为 wait_for_downstream 选项为 True 且此组合不兼容。根据文档:
wait_for_downstream (bool) - when set to true, an instance of task X will wait >for tasks immediately downstream of the previous instance of task X to finish >successfully before it runs. This is useful if the different instances of a task >X alter the same asset, and this asset is used by tasks downstream of task X. >Note that depends_on_past is forced to True wherever wait_for_downstream is used.
最后请注意,max_active_runs = 1 选项很重要
被激活是因为在另一种情况下,同一个任务可以在多个后续 dags 运行.
上同时启动 运行ning
from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
args = {
'owner': 'airflow',
'depends_on_past': False,
'wait_for_downstream': False,
'start_date': datetime(2019, 7, 20),
}
dag = DAG(
dag_id='test_v8',
default_args=args,
schedule_interval='* * * * *',
catchup=False,
max_active_runs=1
)
from time import sleep
def sleep_1():
sleep(1)
def sleep_2():
sleep(2)
sleep_2 = PythonOperator(
task_id='sleep_2',
python_callable=sleep_2,
dag=dag,
)
sleep_1 = PythonOperator(
task_id='sleep_1',
python_callable=sleep_1,
dag=dag,
)
sleep_1 >> sleep_2
终于成功了!
我对 dag运行 失败时气流的作用感到困惑。我想要实现的行为是:
- DAG 的定期触发(每小时)
- 重试任务
- 如果任务失败 n 次重试,请发送有关失败的电子邮件
- 当下一个每小时触发时,触发一个新的 dag运行,就好像什么都没有失败一样。
这些是我的 dag 参数和任务参数:
任务默认值:
'depends_on_past': True,
'start_date': airflow.utils.dates.days_ago(2),
'email': ['email@address.co.uk'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
'wait_for_downstream': False,
dag 参数:
schedule_interval=timedelta(minutes=60),
catchup=False,
max_active_runs=1
我想我误解了其中的一些论点,因为在我看来,如果任务失败 n 次(即 dag运行 失败),那么下一个 dag运行 会被安排但是永远处于 运行ning 状态,再也没有 dag运行s 成功(甚至被安排)。例如,这里是 dag运行s(我不知道在哪里可以找到像
每5分钟执行运行s直到失败,之后最后一次执行就处于运行ning状态,过去30分钟一直如此
我做错了什么?
我应该补充一点,重新启动调度程序没有帮助,手动将 运行ning 任务设置为失败也没有帮助...
您已将 depends_on_past
设置为 True,这会阻止启动下一个 DagRun。
From the docs: depends_on_past (bool) – 当设置为 true 时,任务实例将 运行 顺序地依赖于前一个任务的计划来成功。 start_date 的任务实例允许 运行。
这意味着您的 Dag 正在尝试 运行,但它正在等待,直到来自先前 DagRun 的相应任务具有成功状态。
这个问题让我很头疼,所以我想post一个完整的解决方案。
在我的例子中,即使我有 depends_on_past = False 选项,但在上一次执行失败时下一个 DAG 的执行并未开始。 这是因为 wait_for_downstream 选项为 True 且此组合不兼容。根据文档:
wait_for_downstream (bool) - when set to true, an instance of task X will wait >for tasks immediately downstream of the previous instance of task X to finish >successfully before it runs. This is useful if the different instances of a task >X alter the same asset, and this asset is used by tasks downstream of task X. >Note that depends_on_past is forced to True wherever wait_for_downstream is used.
最后请注意,max_active_runs = 1 选项很重要 被激活是因为在另一种情况下,同一个任务可以在多个后续 dags 运行.
上同时启动 运行ningfrom datetime import datetime, timedelta
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
args = {
'owner': 'airflow',
'depends_on_past': False,
'wait_for_downstream': False,
'start_date': datetime(2019, 7, 20),
}
dag = DAG(
dag_id='test_v8',
default_args=args,
schedule_interval='* * * * *',
catchup=False,
max_active_runs=1
)
from time import sleep
def sleep_1():
sleep(1)
def sleep_2():
sleep(2)
sleep_2 = PythonOperator(
task_id='sleep_2',
python_callable=sleep_2,
dag=dag,
)
sleep_1 = PythonOperator(
task_id='sleep_1',
python_callable=sleep_1,
dag=dag,
)
sleep_1 >> sleep_2
终于成功了!