在 Apache Airflow DAG 中使用 AWS SES 失败时发送电子邮件
Email on failure using AWS SES in Apache Airflow DAG
每当我的 DAG 中的任务无法 运行 或重试 运行 时,我正在尝试让 Airflow 使用 AWS SES 向我发送电子邮件。我也在使用我的 AWS SES 凭证而不是我的一般 AWS 凭证。
我现在的airflow.cfg
[email]
email_backend = airflow.utils.email.send_email_smtp
[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = emailsmtpserver.region.amazonaws.com
smtp_starttls = True
smtp_ssl = False
# Uncomment and set the user/pass settings if you want to use SMTP AUTH
smtp_user = REMOVEDAWSACCESSKEY
smtp_password = REMOVEDAWSSECRETACCESSKEY
smtp_port = 25
smtp_mail_from = myemail@myjob.com
我的 DAG 中的当前任务旨在故意失败并重试:
testfaildag_library_install_jar_jdbc = PythonOperator(
task_id='library_install_jar',
retries=3,
retry_delay=timedelta(seconds=15),
python_callable=add_library_to_cluster,
params={'_task_id': 'cluster_create', '_cluster_name': CLUSTER_NAME, '_library_path':s3000://fakepath.jar},
dag=dag,
email_on_failure=True,
email_on_retry=True,
email=’myname@myjob.com’,
provide_context=True
)
一切都按设计工作,因为任务重试了设定的次数并最终失败,除了没有发送电子邮件。我也查看了上面提到的任务中的日志,并没有提到smtp。
我看过类似的问题 here, but the only solution there did not work for me. Additionally, Airflow's documentation such as their example here 似乎也不适合我。
SES 可以与 Airflow 的 email_on_failure 和 email_on_retry 函数一起使用吗?
我目前想做的是使用on_failure_callback
函数调用AWS提供的python脚本here发送失败邮件,但那不是此时的首选路线。
谢谢,感谢您的帮助。
--6 月 8 日更新,SES 正常工作
这是我写的关于我们如何让它全部工作的文章。这个答案的底部有一个小总结。
几个要点:
我们决定不使用 Amazon SES,而是使用 sendmail 我们现在已经启动并运行 SES。
- 它是为
email_on_failure
和 email_on_retry
功能提供服务的气流工作者。您可以在 Dag 运行 期间执行 journalctl –u airflow-worker –f
来监视它。在您的生产服务器上,您不需要在使用新的 smtp 设置更改 airflow.cfg
后重新启动您的 airflow-worker - 它应该会自动获取。无需担心当前 运行ning Dags。
这是关于如何使用 sendmail 的技术文章:
由于我们在本地主机上从 ses 更改为 sendmail,因此我们必须在 airflow.cfg
中更改我们的 smtp 设置。
新配置为:
[email]
email_backend = airflow.utils.email.send_email_smtp
[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = localhost
smtp_starttls = False
smtp_ssl = False
# Uncomment and set the user/pass settings if you want to use SMTP AUTH
#smtp_user = not used
#smtp_password = not used
smtp_port = 25
smtp_mail_from = myjob@mywork.com
这适用于生产和本地气流实例。
如果他们的配置与我上面的不同,可能会收到一些常见错误:
socket.error: [Errno 111] Connection refused
-- 您必须将 airflow.cfg
中的 smtp_host
行更改为 localhost
smtplib.SMTPException: STARTTLS extension not supported by server.
-- 您必须将 airflow.cfg
中的 smtp_starttls
更改为 False
在我的本地测试中,我试图简单地强制 airflow 显示它试图发送电子邮件时发生的事情的日志——我创建了一个假的 dag,如下所示:
# Airflow imports
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
# General imports
from datetime import datetime,timedelta
def throwerror():
raise ValueError("Failure")
SPARK_V_2_2_1 = '3.5.x-scala2.11'
args = {
'owner': ‘me’,
'email': ['me@myjob'],
'depends_on_past': False,
'start_date': datetime(2018, 5,24),
'end_date':datetime(2018,6,28)
}
dag = DAG(
dag_id='testemaildag',
default_args=args,
catchup=False,
schedule_interval="* 18 * * *"
)
t1 = DummyOperator(
task_id='extract_data',
dag=dag
)
t2 = PythonOperator(
task_id='fail_task',
dag=dag,
python_callable=throwerror
)
t2.set_upstream(t1)
如果您执行 journalctl -u airflow-worker -f
,您会看到工作人员说它已将失败的警报电子邮件发送到您 DAG 中的电子邮件,但我们仍然没有收到该电子邮件。然后我们决定通过 cat /var/log/maillog
查看 sendmail 的邮件日志。我们看到了这样的日志:
Jun 5 14:10:25 production-server-ip-range postfix/smtpd[port]: connect from localhost[127.0.0.1]
Jun 5 14:10:25 production-server-ip-range postfix/smtpd[port]: ID: client=localhost[127.0.0.1]
Jun 5 14:10:25 production-server-ip-range postfix/cleanup[port]: ID: message-id=<randomMessageID@production-server-ip-range-ec2-instance>
Jun 5 14:10:25 production-server-ip-range postfix/smtpd[port]: disconnect from localhost[127.0.0.1]
Jun 5 14:10:25 production-server-ip-range postfix/qmgr[port]: MESSAGEID: from=<myjob@mycompany.com>, size=1297, nrcpt=1 (queue active)
Jun 5 14:10:55 production-server-ip-range postfix/smtp[port]: connect to aspmx.l.google.com[smtp-ip-range]:25: Connection timed out
Jun 5 14:11:25 production-server-ip-range postfix/smtp[port]: connect to alt1.aspmx.l.google.com[smtp-ip-range]:25: Connection timed out
所以这可能是最重要的 "Oh duh" 时刻。在这里,我们能够看到我们的 smtp 服务中实际发生了什么。我们使用 telnet 确认我们无法从 gmail 连接到目标 IP 范围。
我们确定电子邮件正在尝试发送,但 sendmail 服务无法成功连接到 ip 范围。
我们决定允许 AWS 端口 25 上的所有出站流量(因为我们的气流生产环境是 ec2 实例),现在它可以成功运行。我们现在可以接收有关失败和重试的电子邮件(提示:email_on_failure
和 email_on_retry
在您的 DAG API Reference 中默认为 True
- 您无需将其放入你的 args 如果你不想,但在其中明确声明 True 或 False 仍然是一个好习惯。
SES 现在可以使用了。这是气流配置:
[email]
email_backend = airflow.utils.email.send_email_smtp
[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = emailsmtpserver.region.amazonaws.com
smtp_starttls = True
smtp_ssl = False
# Uncomment and set the user/pass settings if you want to use SMTP AUTH
smtp_user = REMOVEDAWSACCESSKEY
smtp_password = REMOVEDAWSSECRETACCESSKEY
smtp_port = 587
smtp_mail_from = myemail@myjob.com (Verified SES email)
谢谢!
类似的情况,我尝试按照相同的调试过程进行操作,但没有日志输出。另外,我的airflow ec2实例的出站规则是对所有端口和ip开放的,所以应该是其他原因。
我注意到当您从 SES 创建 SMTP 凭据时,它还会创建一个 IAM 用户。我不确定气流 运行 在您的情况下如何(ec2 实例上的裸机或包装在容器中),以及如何设置用户访问权限。
每当我的 DAG 中的任务无法 运行 或重试 运行 时,我正在尝试让 Airflow 使用 AWS SES 向我发送电子邮件。我也在使用我的 AWS SES 凭证而不是我的一般 AWS 凭证。
我现在的airflow.cfg
[email]
email_backend = airflow.utils.email.send_email_smtp
[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = emailsmtpserver.region.amazonaws.com
smtp_starttls = True
smtp_ssl = False
# Uncomment and set the user/pass settings if you want to use SMTP AUTH
smtp_user = REMOVEDAWSACCESSKEY
smtp_password = REMOVEDAWSSECRETACCESSKEY
smtp_port = 25
smtp_mail_from = myemail@myjob.com
我的 DAG 中的当前任务旨在故意失败并重试:
testfaildag_library_install_jar_jdbc = PythonOperator(
task_id='library_install_jar',
retries=3,
retry_delay=timedelta(seconds=15),
python_callable=add_library_to_cluster,
params={'_task_id': 'cluster_create', '_cluster_name': CLUSTER_NAME, '_library_path':s3000://fakepath.jar},
dag=dag,
email_on_failure=True,
email_on_retry=True,
email=’myname@myjob.com’,
provide_context=True
)
一切都按设计工作,因为任务重试了设定的次数并最终失败,除了没有发送电子邮件。我也查看了上面提到的任务中的日志,并没有提到smtp。
我看过类似的问题 here, but the only solution there did not work for me. Additionally, Airflow's documentation such as their example here 似乎也不适合我。
SES 可以与 Airflow 的 email_on_failure 和 email_on_retry 函数一起使用吗?
我目前想做的是使用on_failure_callback
函数调用AWS提供的python脚本here发送失败邮件,但那不是此时的首选路线。
谢谢,感谢您的帮助。
--6 月 8 日更新,SES 正常工作
这是我写的关于我们如何让它全部工作的文章。这个答案的底部有一个小总结。
几个要点:
我们决定不使用 Amazon SES,而是使用 sendmail我们现在已经启动并运行 SES。- 它是为
email_on_failure
和email_on_retry
功能提供服务的气流工作者。您可以在 Dag 运行 期间执行journalctl –u airflow-worker –f
来监视它。在您的生产服务器上,您不需要在使用新的 smtp 设置更改airflow.cfg
后重新启动您的 airflow-worker - 它应该会自动获取。无需担心当前 运行ning Dags。
这是关于如何使用 sendmail 的技术文章:
由于我们在本地主机上从 ses 更改为 sendmail,因此我们必须在 airflow.cfg
中更改我们的 smtp 设置。
新配置为:
[email]
email_backend = airflow.utils.email.send_email_smtp
[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = localhost
smtp_starttls = False
smtp_ssl = False
# Uncomment and set the user/pass settings if you want to use SMTP AUTH
#smtp_user = not used
#smtp_password = not used
smtp_port = 25
smtp_mail_from = myjob@mywork.com
这适用于生产和本地气流实例。
如果他们的配置与我上面的不同,可能会收到一些常见错误:
socket.error: [Errno 111] Connection refused
-- 您必须将airflow.cfg
中的smtp_host
行更改为localhost
smtplib.SMTPException: STARTTLS extension not supported by server.
-- 您必须将airflow.cfg
中的smtp_starttls
更改为False
在我的本地测试中,我试图简单地强制 airflow 显示它试图发送电子邮件时发生的事情的日志——我创建了一个假的 dag,如下所示:
# Airflow imports
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
# General imports
from datetime import datetime,timedelta
def throwerror():
raise ValueError("Failure")
SPARK_V_2_2_1 = '3.5.x-scala2.11'
args = {
'owner': ‘me’,
'email': ['me@myjob'],
'depends_on_past': False,
'start_date': datetime(2018, 5,24),
'end_date':datetime(2018,6,28)
}
dag = DAG(
dag_id='testemaildag',
default_args=args,
catchup=False,
schedule_interval="* 18 * * *"
)
t1 = DummyOperator(
task_id='extract_data',
dag=dag
)
t2 = PythonOperator(
task_id='fail_task',
dag=dag,
python_callable=throwerror
)
t2.set_upstream(t1)
如果您执行 journalctl -u airflow-worker -f
,您会看到工作人员说它已将失败的警报电子邮件发送到您 DAG 中的电子邮件,但我们仍然没有收到该电子邮件。然后我们决定通过 cat /var/log/maillog
查看 sendmail 的邮件日志。我们看到了这样的日志:
Jun 5 14:10:25 production-server-ip-range postfix/smtpd[port]: connect from localhost[127.0.0.1]
Jun 5 14:10:25 production-server-ip-range postfix/smtpd[port]: ID: client=localhost[127.0.0.1]
Jun 5 14:10:25 production-server-ip-range postfix/cleanup[port]: ID: message-id=<randomMessageID@production-server-ip-range-ec2-instance>
Jun 5 14:10:25 production-server-ip-range postfix/smtpd[port]: disconnect from localhost[127.0.0.1]
Jun 5 14:10:25 production-server-ip-range postfix/qmgr[port]: MESSAGEID: from=<myjob@mycompany.com>, size=1297, nrcpt=1 (queue active)
Jun 5 14:10:55 production-server-ip-range postfix/smtp[port]: connect to aspmx.l.google.com[smtp-ip-range]:25: Connection timed out
Jun 5 14:11:25 production-server-ip-range postfix/smtp[port]: connect to alt1.aspmx.l.google.com[smtp-ip-range]:25: Connection timed out
所以这可能是最重要的 "Oh duh" 时刻。在这里,我们能够看到我们的 smtp 服务中实际发生了什么。我们使用 telnet 确认我们无法从 gmail 连接到目标 IP 范围。
我们确定电子邮件正在尝试发送,但 sendmail 服务无法成功连接到 ip 范围。
我们决定允许 AWS 端口 25 上的所有出站流量(因为我们的气流生产环境是 ec2 实例),现在它可以成功运行。我们现在可以接收有关失败和重试的电子邮件(提示:email_on_failure
和 email_on_retry
在您的 DAG API Reference 中默认为 True
- 您无需将其放入你的 args 如果你不想,但在其中明确声明 True 或 False 仍然是一个好习惯。
SES 现在可以使用了。这是气流配置:
[email]
email_backend = airflow.utils.email.send_email_smtp
[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = emailsmtpserver.region.amazonaws.com
smtp_starttls = True
smtp_ssl = False
# Uncomment and set the user/pass settings if you want to use SMTP AUTH
smtp_user = REMOVEDAWSACCESSKEY
smtp_password = REMOVEDAWSSECRETACCESSKEY
smtp_port = 587
smtp_mail_from = myemail@myjob.com (Verified SES email)
谢谢!
类似的情况,我尝试按照相同的调试过程进行操作,但没有日志输出。另外,我的airflow ec2实例的出站规则是对所有端口和ip开放的,所以应该是其他原因。
我注意到当您从 SES 创建 SMTP 凭据时,它还会创建一个 IAM 用户。我不确定气流 运行 在您的情况下如何(ec2 实例上的裸机或包装在容器中),以及如何设置用户访问权限。