通知不断发布到松弛频道
Notifications keep posting to slack channel
我正在使用 slack webhook 向我的 slack 频道发送消息。
它每隔几分钟就会不断发布消息的问题。
这是我所做的...
我在 util 文件夹下创建了一个简单的函数。
def send_to_slack(text):
conn_id = "https://hooks.slack.com/services/your/slack/URL"
task_slack_alert(text, url, is_error=False, args=None)
def task_slack_alert(msg, url, is_error=False, args=None):
slack_msg = ":red_circle: Task Failed" if is_error else ":green_heart: Task Message"
"""*Task*: {task}
*Dag*: {dag}
*Execution Time*: {exec_ts}""".format(
task=args["task"],
dag=args["dag"],
exec_ts=args["ts"],
) if args else ""
message = {'text': + msg}
response = requests.post(url=url, data=json.dumps(message))
time.sleep(1)
print(f"Slack response {response}")
if response.status_code != 200:
print(f"Error sending chat message. Got: {response.status_code}")
在我的 dag(在另一个文件夹下)我调用函数
dag 将数据从 oracle 复制到 snowflake db,这没有松弛部分。
在我的 dag 中,我执行以下操作:
x = {‘key1’: [‘value1’, ‘value 2’, … ‘value10]}
send_to_slack('My test message from python')
default_args = {...
'on_failure_callback': send_to_slack, }
with DAG(‘my_dag’,
default_args=default_args,
catchup=False) as dag:
parallel = 4
start = DummyOperator(task_id='start')
tasks = []
i = 0
for s in x.keys():
for t in x.get(s):
task = OracleToSnowflakeOperator(
task_id=s + '_' + t,
source_oracle_conn_id=source_oracle_conn_id,
source_schema=schema,
source_table=table,…
)
if i <= parallel:
task.set_upstream(start)
else:
task.set_upstream(tasks[i - (parallel + 1)])
i = i + 1
tasks.append(task)
我知道如果我在同一个 dag 中定义函数,每次解析 dag 时都会调用它。
不是我的情况,所以怎么了?
谢谢
您在 DAG 文件中调用函数 send_to_slack
,这意味着它会 运行 每次调度程序评估您的 DAG(每隔几分钟)。
您应该:
- 使用 Airflow 附带的 slack operator 并将其放在
OracleToSnowflakeOperator
的下游,并像对待任何其他运算符一样对待它
- 编辑您的
OracleToSnowflakeOperator
,我认为这是自定义的,并将调用 Slack 的逻辑放在那里(使用 slack hook)
基本上您应该将对 Slack 的调用封装在自定义运算符中或使用提供的标准 Slack 运算符,不要将其放在 DAG 定义中。
我正在使用 slack webhook 向我的 slack 频道发送消息。 它每隔几分钟就会不断发布消息的问题。 这是我所做的...
我在 util 文件夹下创建了一个简单的函数。
def send_to_slack(text):
conn_id = "https://hooks.slack.com/services/your/slack/URL"
task_slack_alert(text, url, is_error=False, args=None)
def task_slack_alert(msg, url, is_error=False, args=None):
slack_msg = ":red_circle: Task Failed" if is_error else ":green_heart: Task Message"
"""*Task*: {task}
*Dag*: {dag}
*Execution Time*: {exec_ts}""".format(
task=args["task"],
dag=args["dag"],
exec_ts=args["ts"],
) if args else ""
message = {'text': + msg}
response = requests.post(url=url, data=json.dumps(message))
time.sleep(1)
print(f"Slack response {response}")
if response.status_code != 200:
print(f"Error sending chat message. Got: {response.status_code}")
在我的 dag(在另一个文件夹下)我调用函数 dag 将数据从 oracle 复制到 snowflake db,这没有松弛部分。 在我的 dag 中,我执行以下操作:
x = {‘key1’: [‘value1’, ‘value 2’, … ‘value10]}
send_to_slack('My test message from python')
default_args = {...
'on_failure_callback': send_to_slack, }
with DAG(‘my_dag’,
default_args=default_args,
catchup=False) as dag:
parallel = 4
start = DummyOperator(task_id='start')
tasks = []
i = 0
for s in x.keys():
for t in x.get(s):
task = OracleToSnowflakeOperator(
task_id=s + '_' + t,
source_oracle_conn_id=source_oracle_conn_id,
source_schema=schema,
source_table=table,…
)
if i <= parallel:
task.set_upstream(start)
else:
task.set_upstream(tasks[i - (parallel + 1)])
i = i + 1
tasks.append(task)
我知道如果我在同一个 dag 中定义函数,每次解析 dag 时都会调用它。 不是我的情况,所以怎么了? 谢谢
您在 DAG 文件中调用函数 send_to_slack
,这意味着它会 运行 每次调度程序评估您的 DAG(每隔几分钟)。
您应该:
- 使用 Airflow 附带的 slack operator 并将其放在
OracleToSnowflakeOperator
的下游,并像对待任何其他运算符一样对待它 - 编辑您的
OracleToSnowflakeOperator
,我认为这是自定义的,并将调用 Slack 的逻辑放在那里(使用 slack hook)
基本上您应该将对 Slack 的调用封装在自定义运算符中或使用提供的标准 Slack 运算符,不要将其放在 DAG 定义中。