如何将 Apache Airflow 与 slack 集成?
How to integrate Apache Airflow with slack?
有人可以给我一份关于如何将 Apache Airflow 连接到 Slack 工作区的分步手册。
我为我的频道创建了网络钩子,接下来我应该用它做什么?
亲切的问候
- 从创建一个 Slack 令牌
https://api.slack.com/custom-integrations/legacy-tokens
- 在您的 DAG 中使用
SlackAPIPostOperator
运算符,如下所示
SlackAPIPostOperator(
task_id='failure',
token='YOUR_TOKEN',
text=text_message,
channel=SLACK_CHANNEL,
username=SLACK_USER)
以上是使用 Airflow 向 Slack 发送消息的最简单方法。
但是,如果您想配置 Airflow 在任务失败时向 Slack 发送消息,请创建一个函数并使用创建的 Slack 函数的名称将 on_failure_callback
添加到您的任务中。示例如下:
def slack_failed_task(contextDictionary, **kwargs):
failed_alert = SlackAPIPostOperator(
task_id='slack_failed',
channel="#datalabs",
token="...",
text = ':red_circle: DAG Failed',
owner = '_owner',)
return failed_alert.execute()
task_with_failed_slack_alerts = PythonOperator(
task_id='task0',
python_callable=<file to execute>,
on_failure_callback=slack_failed_task,
provide_context=True,
dag=dag)
使用 SlackWebHook(仅适用于 Airflow >= 1.10.0):
如果您想使用 SlackWebHook
,请以类似的方式使用 SlackWebhookOperator
:
尝试新的 SlackWebhookOperator,它存在于 Airflow 版本>=1.10.0
from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator
slack_msg = "Hi Wssup?"
slack_test = SlackWebhookOperator(
task_id='slack_test',
http_conn_id='slack_connection',
webhook_token='/1234/abcd',
message=slack_msg,
channel='#airflow_updates',
username='airflow_'+os.environ['ENVIRONMENT'],
icon_emoji=None,
link_names=False,
dag=dag)
注意:确保您在 Airflow 连接中添加了 slack_connection
作为
host=https://hooks.slack.com/services/
@kaxil 回答中 SlackWebhookOperator
用法的完整示例:
def slack_failed_task(task_name):
failed_alert = SlackWebhookOperator(
task_id='slack_failed_alert',
http_conn_id='slack_connection',
webhook_token=Variable.get("slackWebhookToken", default_var=""),
message='@here DAG Failed {}'.format(task_name),
channel='#epm-marketing-dev',
username='Airflow_{}'.format(ENVIRONMENT_SUFFIX),
icon_emoji=':red_circle:',
link_names=True,
)
return failed_alert.execute
task_with_failed_slack_alerts = PythonOperator(
task_id='task0',
python_callable=<file to execute>,
on_failure_callback=slack_failed_task,
provide_context=True,
dag=dag)
As @Deep Nirmal 注意:请确保您已将 slack_connection 添加到您的 Airflow 连接中,因为
host=https://hooks.slack.com/services/
有人可以给我一份关于如何将 Apache Airflow 连接到 Slack 工作区的分步手册。 我为我的频道创建了网络钩子,接下来我应该用它做什么?
亲切的问候
- 从创建一个 Slack 令牌 https://api.slack.com/custom-integrations/legacy-tokens
- 在您的 DAG 中使用
SlackAPIPostOperator
运算符,如下所示
SlackAPIPostOperator(
task_id='failure',
token='YOUR_TOKEN',
text=text_message,
channel=SLACK_CHANNEL,
username=SLACK_USER)
以上是使用 Airflow 向 Slack 发送消息的最简单方法。
但是,如果您想配置 Airflow 在任务失败时向 Slack 发送消息,请创建一个函数并使用创建的 Slack 函数的名称将 on_failure_callback
添加到您的任务中。示例如下:
def slack_failed_task(contextDictionary, **kwargs):
failed_alert = SlackAPIPostOperator(
task_id='slack_failed',
channel="#datalabs",
token="...",
text = ':red_circle: DAG Failed',
owner = '_owner',)
return failed_alert.execute()
task_with_failed_slack_alerts = PythonOperator(
task_id='task0',
python_callable=<file to execute>,
on_failure_callback=slack_failed_task,
provide_context=True,
dag=dag)
使用 SlackWebHook(仅适用于 Airflow >= 1.10.0):
如果您想使用 SlackWebHook
,请以类似的方式使用 SlackWebhookOperator
:
尝试新的 SlackWebhookOperator,它存在于 Airflow 版本>=1.10.0
from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator
slack_msg = "Hi Wssup?"
slack_test = SlackWebhookOperator(
task_id='slack_test',
http_conn_id='slack_connection',
webhook_token='/1234/abcd',
message=slack_msg,
channel='#airflow_updates',
username='airflow_'+os.environ['ENVIRONMENT'],
icon_emoji=None,
link_names=False,
dag=dag)
注意:确保您在 Airflow 连接中添加了 slack_connection
作为
host=https://hooks.slack.com/services/
@kaxil 回答中 SlackWebhookOperator
用法的完整示例:
def slack_failed_task(task_name):
failed_alert = SlackWebhookOperator(
task_id='slack_failed_alert',
http_conn_id='slack_connection',
webhook_token=Variable.get("slackWebhookToken", default_var=""),
message='@here DAG Failed {}'.format(task_name),
channel='#epm-marketing-dev',
username='Airflow_{}'.format(ENVIRONMENT_SUFFIX),
icon_emoji=':red_circle:',
link_names=True,
)
return failed_alert.execute
task_with_failed_slack_alerts = PythonOperator(
task_id='task0',
python_callable=<file to execute>,
on_failure_callback=slack_failed_task,
provide_context=True,
dag=dag)
As @Deep Nirmal 注意:请确保您已将 slack_connection 添加到您的 Airflow 连接中,因为
host=https://hooks.slack.com/services/