如何从 Python 脚本中触发气流 DAG 运行?
How to trigger an airflow DAG run from within a Python script?
我使用 apache airflow 创建了一些 DAGS,其中一些未按计划 运行。
我正在尝试找到一种方法,可以从 Python 脚本中为特定 DAG 触发 运行。这可能吗?我能怎么做?
编辑 --- python 脚本将来自与我所有 DAGS 所在的项目不同的项目运行ning
在触发 Airflow DAG 运行时,您有多种选择。
使用Python
airflow python 包提供了一个 local client 可用于从 python 脚本中触发 dag。例如:
from airflow.api.client.local_client import Client
c = Client(None, None)
c.trigger_dag(dag_id='test_dag_id', run_id='test_run_id', conf={})
使用 Airflow CLI
您可以使用 Airflow CLI 手动触发气流中的 dag。有关如何使用 CLI 触发 DAG 的更多信息,请参见 here。
使用 Airflow REST API
您还可以使用 Airflow REST api 来触发 DAG 运行。更多信息 here。
python 中的第一个选项可能最适合您(我个人过去也是这样做的)。但理论上您可以使用 subprocess to interact with the CLI from python, or a library like requests 从 Python.
中与 REST API 交互
在 AWS MWAA Airflow 1.10.12 上,我使用了基于 boto3
库的解决方案 Python 和 REST POST 请求:
import boto3
import requests
def TriggerAirflowDAG(mwaa_environment, dag_id):
client = boto3.client("mwaa")
token = client.create_cli_token(Name=mwaa_environment)
url = "https://{0}/aws_mwaa/cli".format(token["WebServerHostname"])
body = f"trigger_dag {dag_id}"
headers = {
"Authorization": "Bearer " + token["CliToken"],
"Content-Type": "text/plain"
}
return requests.post(url, data=body, headers=headers)
User/role 发起 DAG 运行 的人必须有 AmazonMWAAAirflowCliAccess
政策。
我使用 apache airflow 创建了一些 DAGS,其中一些未按计划 运行。
我正在尝试找到一种方法,可以从 Python 脚本中为特定 DAG 触发 运行。这可能吗?我能怎么做?
编辑 --- python 脚本将来自与我所有 DAGS 所在的项目不同的项目运行ning
在触发 Airflow DAG 运行时,您有多种选择。
使用Python
airflow python 包提供了一个 local client 可用于从 python 脚本中触发 dag。例如:
from airflow.api.client.local_client import Client
c = Client(None, None)
c.trigger_dag(dag_id='test_dag_id', run_id='test_run_id', conf={})
使用 Airflow CLI
您可以使用 Airflow CLI 手动触发气流中的 dag。有关如何使用 CLI 触发 DAG 的更多信息,请参见 here。
使用 Airflow REST API
您还可以使用 Airflow REST api 来触发 DAG 运行。更多信息 here。
python 中的第一个选项可能最适合您(我个人过去也是这样做的)。但理论上您可以使用 subprocess to interact with the CLI from python, or a library like requests 从 Python.
中与 REST API 交互在 AWS MWAA Airflow 1.10.12 上,我使用了基于 boto3
库的解决方案 Python 和 REST POST 请求:
import boto3
import requests
def TriggerAirflowDAG(mwaa_environment, dag_id):
client = boto3.client("mwaa")
token = client.create_cli_token(Name=mwaa_environment)
url = "https://{0}/aws_mwaa/cli".format(token["WebServerHostname"])
body = f"trigger_dag {dag_id}"
headers = {
"Authorization": "Bearer " + token["CliToken"],
"Content-Type": "text/plain"
}
return requests.post(url, data=body, headers=headers)
User/role 发起 DAG 运行 的人必须有 AmazonMWAAAirflowCliAccess
政策。