Dag 和任务的气流 structure/organization

Airflow structure/organization of Dags and tasks

我的问题:

我也很乐意与其他人一起对文件夹结构进行基准测试。也许这取决于您使用 Airflow 的目的,但我会分享我的案例。我正在做数据管道来构建数据仓库,所以在高层次上我基本上有两个步骤:

  1. 将大量数据转储到数据湖中(只有少数人可以直接访问)
  2. 将数据从数据湖加载到分析数据库中,数据将在其中建模并公开给仪表板应用程序(许多 sql 查询来建模数据)

今天我将文件整理到三个主要文件夹中,试图反映上述逻辑:

├── dags
│   ├── dag_1.py
│   └── dag_2.py
├── data-lake
│   ├── data-source-1
│   └── data-source-2
└── dw
    ├── cubes
    │   ├── cube_1.sql
    │   └── cube_2.sql
    ├── dims
    │   ├── dim_1.sql
    │   └── dim_2.sql
    └── facts
        ├── fact_1.sql
        └── fact_2.sql

这或多或少是我的基本文件夹结构。

我用的是这样的。

  • 一个项目通常是完全独立或独特的。也许 DAG 可以处理我们从某个客户端收到的文件,这些文件与其他所有内容完全无关(几乎可以肯定是一个单独的数据库模式)
  • 我在一个公共文件夹中有我的运算符、挂钩和一些帮助脚本(删除某个 DAG 等的所有 Airflow 数据)
  • 我曾经对整个 Airflow 文件夹有一个 git 存储库,但现在我每个项目都有一个单独的 git是如此无关)。这意味着每个项目文件夹也作为 .git 和 .gitignore 等
  • 我倾向于保存原始数据,然后 'rest' 修改后的数据副本,这正是复制到数据库中的数据。由于来自不同客户端的不同格式(Excel、网络抓取、HTML 电子邮件抓取、平面文件、来自 SalesForce 或其他数据库源的查询...),我不得不大量修改一些原始数据...)

示例树:

├───dags
│   ├───common
│   │   ├───hooks
│   │   │       pysftp_hook.py
│   │   │
│   │   ├───operators
│   │   │       docker_sftp.py
│   │   │       postgres_templated_operator.py
│   │   │
│   │   └───scripts
│   │           delete.py
│   │
│   ├───project_1
│   │   │   dag_1.py
│   │   │   dag_2.py
│   │   │
│   │   └───sql
│   │           dim.sql
│   │           fact.sql
│   │           select.sql
│   │           update.sql
│   │           view.sql
│   │
│   └───project_2
│       │   dag_1.py
│       │   dag_2.py
│       │
│       └───sql
│               dim.sql
│               fact.sql
│               select.sql
│               update.sql
│               view.sql
│
└───data
    ├───project_1
    │   ├───modified
    │   │       file_20180101.csv
    │   │       file_20180102.csv
    │   │
    │   └───raw
    │           file_20180101.csv
    │           file_20180102.csv
    │
    └───project_2
        ├───modified
        │       file_20180101.csv
        │       file_20180102.csv
        │
        └───raw
                file_20180101.csv
                file_20180102.csv

2021 年 10 月更新。我现在有一个用于所有项目的存储库。我所有的转换脚本都在插件文件夹中(其中还包含挂钩和运算符——基本上是我导入到我的 DAG 中的任何代码)。 DAG 代码我尽量保持简洁,所以它基本上只规定了时间表以及数据加载的位置。

├───dags
│   │
│   ├───project_1
│   │     dag_1.py
│   │     dag_2.py
│   │
│   └───project_2
│         dag_1.py
│         dag_2.py
│
├───plugins
│   ├───hooks
│   │      pysftp_hook.py
|   |      servicenow_hook.py
│   │   
│   ├───sensors
│   │      ftp_sensor.py
|   |      sql_sensor.py
|   |
│   ├───operators
│   │      servicenow_to_azure_blob_operator.py
│   │      postgres_templated_operator.py
│   |
│   ├───scripts
│       ├───project_1
|       |      transform_cases.py
|       |      common.py
│       ├───project_2
|       |      transform_surveys.py
|       |      common.py
│       ├───common
|             helper.py
|             dataset_writer.py
| .airflowignore
| Dockerfile
| docker-stack-airflow.yml