气流默认 on_failure_callback
Airflow default on_failure_callback
在我的 DAG 文件中,我定义了一个 on_failure_callback() 函数来 post Slack 以防失败。
如果我在我的 DAG 中为每个运算符指定,它会很好地工作:on_failure_callback=on_failure_callback()
有没有办法自动(例如通过 default_args,或通过我的 DAG 对象)向我的所有操作员发送信息?
我终于找到了方法。
您可以将 on_failure_callback 作为 default_args
class Foo:
@staticmethod
def get_default_args():
"""
Return default args
:return: default_args
"""
default_args = {
'on_failure_callback': Foo.on_failure_callback
}
return default_args
@staticmethod
def on_failure_callback(context):
"""
Define the callback to post on Slack if a failure is detected in the Workflow
:return: operator.execute
"""
operator = SlackAPIPostOperator(
task_id='failure',
text=str(context['task_instance']),
token=Variable.get("slack_access_token"),
channel=Variable.get("slack_channel")
)
return operator.execute(context=context)
这里回答晚了,但是可以,您可以在 DAG 的默认值中指定 on_failure_callback。您只需要编写一个自定义函数,确保它可以接受上下文。示例:
def failure_callback(context):
message = [
":red_circle: Task failed",
f"*Dag*: {context['dag_run'].dag_id}",
f"*Run*: {context['dag_run'].run_id}",
f"*Task*: <{context.get('task_instance').log_url}|*{context.get('task_instance').task_id}* failed for execution {context.get('execution_date')}>",
]
# Replace this return with whatever you want
# I usually send a Slack notification here
return "\n".join(message)
with DAG(
...
default_args={
...
"on_failure_callback": failure_callback,
},
) as dag:
...
在我的 DAG 文件中,我定义了一个 on_failure_callback() 函数来 post Slack 以防失败。
如果我在我的 DAG 中为每个运算符指定,它会很好地工作:on_failure_callback=on_failure_callback()
有没有办法自动(例如通过 default_args,或通过我的 DAG 对象)向我的所有操作员发送信息?
我终于找到了方法。
您可以将 on_failure_callback 作为 default_args
class Foo:
@staticmethod
def get_default_args():
"""
Return default args
:return: default_args
"""
default_args = {
'on_failure_callback': Foo.on_failure_callback
}
return default_args
@staticmethod
def on_failure_callback(context):
"""
Define the callback to post on Slack if a failure is detected in the Workflow
:return: operator.execute
"""
operator = SlackAPIPostOperator(
task_id='failure',
text=str(context['task_instance']),
token=Variable.get("slack_access_token"),
channel=Variable.get("slack_channel")
)
return operator.execute(context=context)
这里回答晚了,但是可以,您可以在 DAG 的默认值中指定 on_failure_callback。您只需要编写一个自定义函数,确保它可以接受上下文。示例:
def failure_callback(context):
message = [
":red_circle: Task failed",
f"*Dag*: {context['dag_run'].dag_id}",
f"*Run*: {context['dag_run'].run_id}",
f"*Task*: <{context.get('task_instance').log_url}|*{context.get('task_instance').task_id}* failed for execution {context.get('execution_date')}>",
]
# Replace this return with whatever you want
# I usually send a Slack notification here
return "\n".join(message)
with DAG(
...
default_args={
...
"on_failure_callback": failure_callback,
},
) as dag:
...