如何将数据库连接传递到 Airflow KubernetesPodOperator

How to pass a database connection into Airflow KubernetesPodOperator

我对 Airflow 中的 KubernetesPodOperator 感到困惑,我想知道如何传递存储有 conn_id 参数的 load_users_into_table() 函数在 Pod 中 Airflowconnection 中 ?

在官方文档中建议将 conn_id 放在 Secret 中,但我不明白如何在我的函数 load_users_into_table() 之后传递它。

https://airflow.apache.org/docs/stable/kubernetes.html

pod中要执行的函数(任务):

def load_users_into_table(postgres_hook, schema, path):
  gdf = read_csv(path)
  gdf.to_sql('users', con=postgres_hook.get_sqlalchemy_engine(), schema=schema)

狗:

_pg_hook = PostgresHook(postgres_conn_id = _conn_id)

with dag: 
test = KubernetesPodOperator(
        namespace=namespace,
        image=image_name,
        cmds=["python", "-c"],
        arguments=[load_users_into_table],
        labels={"dag-id": dag.dag_id},
        name="airflow-test-pod",
        task_id="task-1",
        is_delete_operator_pod=True,
        in_cluster=in_cluster,
        get_logs=True,
        config_file=config_file,
        executor_config={
            "KubernetesExecutor": {"request_memory": "512Mi",
                                   "limit_memory": "1024Mi",
                                   "request_cpu": "1",
                                   "limit_cpu": "2"}
        }
    )

假设您想 运行 使用 K8sPodOperator,您可以使用 argparse 并向 docker 命令添加参数。这些行中的内容应该可以完成工作:

import argparse
    

def f(arg):
    print(arg)


parser = argparse.ArgumentParser()
parser.add_argument('--foo', help='foo help')
args = parser.parse_args()
    

if __name__ == '__main__':
    f(args.foo)

Docker 文件:

FROM python:3
COPY main.py main.py
CMD ["python", "main.py", "--foo", "somebar"]

还有其他方法可以解决这个问题,例如使用 secrets、configMaps 甚至 Airflow 变量,但这应该能让你继续前进。