SlackAPIPostOperator 中的气流 UI link?
Airflow UI link in SlackAPIPostOperator?
我在 Airflow 中使用 SlackAPIPostOperator
在任务失败时发送 Slack 消息。
我想知道是否有一种聪明的方法可以将失败任务的 link 气流 UI 日志页面添加到松弛消息中。
下面是我要实现的例子:
当前消息是:
def slack_failed_task(context):
failed_alert = SlackAPIPostOperator(
task_id='slack_failed',
channel="#mychannel",
token="...",
text=':red_circle: Failure on: ' +
str(context['dag']) +
'\nRun ID: ' + str(context['run_id']) +
'\nTask: ' + str(context['task_instance']))
return failed_alert.execute(context=context)
您可以使用 [webserver]
部分下的配置值 base_url
将 url 构建到 UI,然后使用 Slack 的 message format <http://example.com|stuff>
链接。
from airflow import configuration
def slack_failed_task(context):
link = '<{base_url}/admin/airflow/log?dag_id={dag_id}&task_id={task_id}&execution_date={execution_date}|logs>'.format(
base_url=configuration.get('webserver', 'BASE_URL'),
dag_id=context['dag'].dag_id,
task_id=context['task_instance'].task_id,
execution_date=context['ts'])) # equal to context['execution_date'].isoformat())
failed_alert = SlackAPIPostOperator(
task_id='slack_failed',
channel="#mychannel",
token="...",
text=':red_circle: Failure on: ' +
str(context['dag']) +
'\nRun ID: ' + str(context['run_id']) +
'\nTask: ' + str(context['task_instance']) +
'\nSee ' + link + ' to debug')
return failed_alert.execute(context=context)
我们也可以使用 log_url
attribute in the Task Instance
def slack_failed_task(context):
failed_alert = SlackAPIPostOperator(
task_id='slack_failed',
channel="#mychannel",
token="...",
text=':red_circle: Failure on: ' +
str(context['dag']) +
'\nRun ID: ' + str(context['run_id']) +
'\nTask: ' + str(context['task_instance']) +
'\nLogs: <{url}|to Airflow UI>'.format(url=context['task_instance'].log_url)
)
return failed_alert.execute(context=context)
我知道这至少从 1.10.4 版本开始可用。
我在 Airflow 中使用 SlackAPIPostOperator
在任务失败时发送 Slack 消息。
我想知道是否有一种聪明的方法可以将失败任务的 link 气流 UI 日志页面添加到松弛消息中。
下面是我要实现的例子:
当前消息是:
def slack_failed_task(context):
failed_alert = SlackAPIPostOperator(
task_id='slack_failed',
channel="#mychannel",
token="...",
text=':red_circle: Failure on: ' +
str(context['dag']) +
'\nRun ID: ' + str(context['run_id']) +
'\nTask: ' + str(context['task_instance']))
return failed_alert.execute(context=context)
您可以使用 [webserver]
部分下的配置值 base_url
将 url 构建到 UI,然后使用 Slack 的 message format <http://example.com|stuff>
链接。
from airflow import configuration
def slack_failed_task(context):
link = '<{base_url}/admin/airflow/log?dag_id={dag_id}&task_id={task_id}&execution_date={execution_date}|logs>'.format(
base_url=configuration.get('webserver', 'BASE_URL'),
dag_id=context['dag'].dag_id,
task_id=context['task_instance'].task_id,
execution_date=context['ts'])) # equal to context['execution_date'].isoformat())
failed_alert = SlackAPIPostOperator(
task_id='slack_failed',
channel="#mychannel",
token="...",
text=':red_circle: Failure on: ' +
str(context['dag']) +
'\nRun ID: ' + str(context['run_id']) +
'\nTask: ' + str(context['task_instance']) +
'\nSee ' + link + ' to debug')
return failed_alert.execute(context=context)
我们也可以使用 log_url
attribute in the Task Instance
def slack_failed_task(context):
failed_alert = SlackAPIPostOperator(
task_id='slack_failed',
channel="#mychannel",
token="...",
text=':red_circle: Failure on: ' +
str(context['dag']) +
'\nRun ID: ' + str(context['run_id']) +
'\nTask: ' + str(context['task_instance']) +
'\nLogs: <{url}|to Airflow UI>'.format(url=context['task_instance'].log_url)
)
return failed_alert.execute(context=context)
我知道这至少从 1.10.4 版本开始可用。