如果回调函数中发生异常,气流任务不会失败
Airflow tasks not failing if exception happens in callback function
Airflow 任务在 on_success_callback 函数执行期间发生异常时不会失败,即使捕获到错误并且在回调中抛出 AirflowException func.Is 这是正常行为。
如果回调函数执行过程中出现异常,是否有其他方法可以确保任务失败。
我相信当您到达 on_success 回调时,这意味着您的任务已经成功。现在,如果您希望任务仍然因为 on_success 回调中的错误而失败,那么您可能需要实现一个 try except 块,其中您必须手动将任务设置为失败。像这样。
def on_success_callback(context):
try:
raise ValueError
except:
dag = context['dag']
tasks = dag.task_ids
print(context['execution_date'])
dag.clear(
task_ids = tasks,
start_date = context['execution_date'],
end_date = dag.end_date
)
您可以从上下文中派生任务实例,然后将其出错。
on_success_callback
在任务成功完成后执行。
在 on_success_callback
中引发异常不会导致更改任务状态。
如果您在 on_success_callback
中执行的代码假设在出现异常时任务失败,那么该代码应该在任务代码中。
Airflow 任务在 on_success_callback 函数执行期间发生异常时不会失败,即使捕获到错误并且在回调中抛出 AirflowException func.Is 这是正常行为。
如果回调函数执行过程中出现异常,是否有其他方法可以确保任务失败。
我相信当您到达 on_success 回调时,这意味着您的任务已经成功。现在,如果您希望任务仍然因为 on_success 回调中的错误而失败,那么您可能需要实现一个 try except 块,其中您必须手动将任务设置为失败。像这样。
def on_success_callback(context):
try:
raise ValueError
except:
dag = context['dag']
tasks = dag.task_ids
print(context['execution_date'])
dag.clear(
task_ids = tasks,
start_date = context['execution_date'],
end_date = dag.end_date
)
您可以从上下文中派生任务实例,然后将其出错。
on_success_callback
在任务成功完成后执行。
在 on_success_callback
中引发异常不会导致更改任务状态。
如果您在 on_success_callback
中执行的代码假设在出现异常时任务失败,那么该代码应该在任务代码中。