SlackAPIPostOperator 中的气流 UI link?

Airflow UI link in SlackAPIPostOperator?

我在 Airflow 中使用 SlackAPIPostOperator 在任务失败时发送 Slack 消息。 我想知道是否有一种聪明的方法可以将失败任务的 link 气流 UI 日志页面添加到松弛消息中。

下面是我要实现的例子:

http://myserver-uw1.myaws.com:8080/admin/airflow/graph?execution_date=...&arrange=LR&root=&dag_id=MyDAG&_csrf_token=mytoken

当前消息是:

def slack_failed_task(context):
    failed_alert = SlackAPIPostOperator(
        task_id='slack_failed',
        channel="#mychannel",
        token="...",
        text=':red_circle: Failure on: ' + 
             str(context['dag']) +
             '\nRun ID: ' + str(context['run_id']) +
             '\nTask: ' + str(context['task_instance']))
    return failed_alert.execute(context=context)

您可以使用 [webserver] 部分下的配置值 base_url 将 url 构建到 UI,然后使用 Slack 的 message format <http://example.com|stuff> 链接。

from airflow import configuration

def slack_failed_task(context):
    link = '<{base_url}/admin/airflow/log?dag_id={dag_id}&task_id={task_id}&execution_date={execution_date}|logs>'.format(
        base_url=configuration.get('webserver', 'BASE_URL'),
        dag_id=context['dag'].dag_id,
        task_id=context['task_instance'].task_id,
        execution_date=context['ts']))  # equal to context['execution_date'].isoformat())

    failed_alert = SlackAPIPostOperator(
        task_id='slack_failed',
        channel="#mychannel",
        token="...",
        text=':red_circle: Failure on: ' + 
             str(context['dag']) +
             '\nRun ID: ' + str(context['run_id']) +
             '\nTask: ' + str(context['task_instance']) +
             '\nSee ' + link + ' to debug')
    return failed_alert.execute(context=context)

我们也可以使用 log_url attribute in the Task Instance

def slack_failed_task(context):
    failed_alert = SlackAPIPostOperator(
        task_id='slack_failed',
        channel="#mychannel",
        token="...",
        text=':red_circle: Failure on: ' + 
             str(context['dag']) +
             '\nRun ID: ' + str(context['run_id']) +
             '\nTask: ' + str(context['task_instance']) + 
             '\nLogs: <{url}|to Airflow UI>'.format(url=context['task_instance'].log_url)
    )
    return failed_alert.execute(context=context)

我知道这至少从 1.10.4 版本开始可用。