如何将 Apache Airflow 与 slack 集成?

How to integrate Apache Airflow with slack?

有人可以给我一份关于如何将 Apache Airflow 连接到 Slack 工作区的分步手册。 我为我的频道创建了网络钩子,接下来我应该用它做什么?

亲切的问候

SlackAPIPostOperator(
      task_id='failure',
      token='YOUR_TOKEN',
      text=text_message,
      channel=SLACK_CHANNEL,
      username=SLACK_USER)

以上是使用 Airflow 向 Slack 发送消息的最简单方法。

但是,如果您想配置 Airflow 在任务失败时向 Slack 发送消息,请创建一个函数并使用创建的 Slack 函数的名称将 on_failure_callback 添加到您的任务中。示例如下:

def slack_failed_task(contextDictionary, **kwargs):  
       failed_alert = SlackAPIPostOperator(
         task_id='slack_failed',
         channel="#datalabs",
         token="...",
         text = ':red_circle: DAG Failed',
         owner = '_owner',)
         return failed_alert.execute()


task_with_failed_slack_alerts = PythonOperator(
    task_id='task0',
    python_callable=<file to execute>,
    on_failure_callback=slack_failed_task,
    provide_context=True,
    dag=dag)

使用 SlackWebHook(仅适用于 Airflow >= 1.10.0): 如果您想使用 SlackWebHook,请以类似的方式使用 SlackWebhookOperator

https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/operators/slack_webhook_operator.py#L25

尝试新的 SlackWebhookOperator,它存在于 Airflow 版本>=1.10.0

from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator

slack_msg = "Hi Wssup?"

slack_test =  SlackWebhookOperator(
        task_id='slack_test',
        http_conn_id='slack_connection',
        webhook_token='/1234/abcd',
        message=slack_msg,
        channel='#airflow_updates',
        username='airflow_'+os.environ['ENVIRONMENT'],
        icon_emoji=None,
        link_names=False,
        dag=dag)

注意:确保您在 Airflow 连接中添加了 slack_connection 作为

host=https://hooks.slack.com/services/

@kaxil 回答中 SlackWebhookOperator 用法的完整示例:

def slack_failed_task(task_name):
  failed_alert = SlackWebhookOperator(
    task_id='slack_failed_alert',
    http_conn_id='slack_connection',
    webhook_token=Variable.get("slackWebhookToken", default_var=""),
    message='@here DAG Failed {}'.format(task_name),
    channel='#epm-marketing-dev',
    username='Airflow_{}'.format(ENVIRONMENT_SUFFIX),
    icon_emoji=':red_circle:',
    link_names=True,
  )
  return failed_alert.execute

task_with_failed_slack_alerts = PythonOperator(
  task_id='task0',
  python_callable=<file to execute>,
  on_failure_callback=slack_failed_task,
  provide_context=True,
  dag=dag)

As @Deep Nirmal 注意:请确保您已将 slack_connection 添加到您的 Airflow 连接中,因为

host=https://hooks.slack.com/services/