如何将数据库连接传递到 Airflow KubernetesPodOperator
How to pass a database connection into Airflow KubernetesPodOperator
我对 Airflow
中的 KubernetesPodOperator
感到困惑,我想知道如何传递存储有 conn_id
参数的 load_users_into_table()
函数在 Pod 中 Airflow
的 connection
中 ?
在官方文档中建议将 conn_id
放在 Secret
中,但我不明白如何在我的函数 load_users_into_table()
之后传递它。
https://airflow.apache.org/docs/stable/kubernetes.html
pod中要执行的函数(任务):
def load_users_into_table(postgres_hook, schema, path):
gdf = read_csv(path)
gdf.to_sql('users', con=postgres_hook.get_sqlalchemy_engine(), schema=schema)
狗:
_pg_hook = PostgresHook(postgres_conn_id = _conn_id)
with dag:
test = KubernetesPodOperator(
namespace=namespace,
image=image_name,
cmds=["python", "-c"],
arguments=[load_users_into_table],
labels={"dag-id": dag.dag_id},
name="airflow-test-pod",
task_id="task-1",
is_delete_operator_pod=True,
in_cluster=in_cluster,
get_logs=True,
config_file=config_file,
executor_config={
"KubernetesExecutor": {"request_memory": "512Mi",
"limit_memory": "1024Mi",
"request_cpu": "1",
"limit_cpu": "2"}
}
)
假设您想 运行 使用 K8sPodOperator,您可以使用 argparse 并向 docker 命令添加参数。这些行中的内容应该可以完成工作:
import argparse
def f(arg):
print(arg)
parser = argparse.ArgumentParser()
parser.add_argument('--foo', help='foo help')
args = parser.parse_args()
if __name__ == '__main__':
f(args.foo)
Docker 文件:
FROM python:3
COPY main.py main.py
CMD ["python", "main.py", "--foo", "somebar"]
还有其他方法可以解决这个问题,例如使用 secrets、configMaps 甚至 Airflow 变量,但这应该能让你继续前进。
我对 Airflow
中的 KubernetesPodOperator
感到困惑,我想知道如何传递存储有 conn_id
参数的 load_users_into_table()
函数在 Pod 中 Airflow
的 connection
中 ?
在官方文档中建议将 conn_id
放在 Secret
中,但我不明白如何在我的函数 load_users_into_table()
之后传递它。
https://airflow.apache.org/docs/stable/kubernetes.html
pod中要执行的函数(任务):
def load_users_into_table(postgres_hook, schema, path):
gdf = read_csv(path)
gdf.to_sql('users', con=postgres_hook.get_sqlalchemy_engine(), schema=schema)
狗:
_pg_hook = PostgresHook(postgres_conn_id = _conn_id)
with dag:
test = KubernetesPodOperator(
namespace=namespace,
image=image_name,
cmds=["python", "-c"],
arguments=[load_users_into_table],
labels={"dag-id": dag.dag_id},
name="airflow-test-pod",
task_id="task-1",
is_delete_operator_pod=True,
in_cluster=in_cluster,
get_logs=True,
config_file=config_file,
executor_config={
"KubernetesExecutor": {"request_memory": "512Mi",
"limit_memory": "1024Mi",
"request_cpu": "1",
"limit_cpu": "2"}
}
)
假设您想 运行 使用 K8sPodOperator,您可以使用 argparse 并向 docker 命令添加参数。这些行中的内容应该可以完成工作:
import argparse
def f(arg):
print(arg)
parser = argparse.ArgumentParser()
parser.add_argument('--foo', help='foo help')
args = parser.parse_args()
if __name__ == '__main__':
f(args.foo)
Docker 文件:
FROM python:3
COPY main.py main.py
CMD ["python", "main.py", "--foo", "somebar"]
还有其他方法可以解决这个问题,例如使用 secrets、configMaps 甚至 Airflow 变量,但这应该能让你继续前进。