Dag 和任务的气流 structure/organization
Airflow structure/organization of Dags and tasks
我的问题:
- 为了组织任务和任务,什么是好的目录结构? (dags 示例仅显示几个任务)
- 我目前将我的 dags 放在 dags 文件夹的根目录下,将我的任务放在不同的目录中,不确定这样做的方法吗?
- 我们应该使用 zip 文件吗? https://github.com/apache/incubator-airflow/blob/a1f4227bee1a70531cfa90769149322513cb6f92/airflow/models.py#L280
我也很乐意与其他人一起对文件夹结构进行基准测试。也许这取决于您使用 Airflow 的目的,但我会分享我的案例。我正在做数据管道来构建数据仓库,所以在高层次上我基本上有两个步骤:
- 将大量数据转储到数据湖中(只有少数人可以直接访问)
- 将数据从数据湖加载到分析数据库中,数据将在其中建模并公开给仪表板应用程序(许多 sql 查询来建模数据)
今天我将文件整理到三个主要文件夹中,试图反映上述逻辑:
├── dags
│ ├── dag_1.py
│ └── dag_2.py
├── data-lake
│ ├── data-source-1
│ └── data-source-2
└── dw
├── cubes
│ ├── cube_1.sql
│ └── cube_2.sql
├── dims
│ ├── dim_1.sql
│ └── dim_2.sql
└── facts
├── fact_1.sql
└── fact_2.sql
这或多或少是我的基本文件夹结构。
我用的是这样的。
- 一个项目通常是完全独立或独特的。也许 DAG 可以处理我们从某个客户端收到的文件,这些文件与其他所有内容完全无关(几乎可以肯定是一个单独的数据库模式)
- 我在一个公共文件夹中有我的运算符、挂钩和一些帮助脚本(删除某个 DAG 等的所有 Airflow 数据)
- 我曾经对整个 Airflow 文件夹有一个 git 存储库,但现在我每个项目都有一个单独的 git是如此无关)。这意味着每个项目文件夹也作为 .git 和 .gitignore 等
- 我倾向于保存原始数据,然后 'rest' 修改后的数据副本,这正是复制到数据库中的数据。由于来自不同客户端的不同格式(Excel、网络抓取、HTML 电子邮件抓取、平面文件、来自 SalesForce 或其他数据库源的查询...),我不得不大量修改一些原始数据...)
示例树:
├───dags
│ ├───common
│ │ ├───hooks
│ │ │ pysftp_hook.py
│ │ │
│ │ ├───operators
│ │ │ docker_sftp.py
│ │ │ postgres_templated_operator.py
│ │ │
│ │ └───scripts
│ │ delete.py
│ │
│ ├───project_1
│ │ │ dag_1.py
│ │ │ dag_2.py
│ │ │
│ │ └───sql
│ │ dim.sql
│ │ fact.sql
│ │ select.sql
│ │ update.sql
│ │ view.sql
│ │
│ └───project_2
│ │ dag_1.py
│ │ dag_2.py
│ │
│ └───sql
│ dim.sql
│ fact.sql
│ select.sql
│ update.sql
│ view.sql
│
└───data
├───project_1
│ ├───modified
│ │ file_20180101.csv
│ │ file_20180102.csv
│ │
│ └───raw
│ file_20180101.csv
│ file_20180102.csv
│
└───project_2
├───modified
│ file_20180101.csv
│ file_20180102.csv
│
└───raw
file_20180101.csv
file_20180102.csv
2021 年 10 月更新。我现在有一个用于所有项目的存储库。我所有的转换脚本都在插件文件夹中(其中还包含挂钩和运算符——基本上是我导入到我的 DAG 中的任何代码)。 DAG 代码我尽量保持简洁,所以它基本上只规定了时间表以及数据加载的位置。
├───dags
│ │
│ ├───project_1
│ │ dag_1.py
│ │ dag_2.py
│ │
│ └───project_2
│ dag_1.py
│ dag_2.py
│
├───plugins
│ ├───hooks
│ │ pysftp_hook.py
| | servicenow_hook.py
│ │
│ ├───sensors
│ │ ftp_sensor.py
| | sql_sensor.py
| |
│ ├───operators
│ │ servicenow_to_azure_blob_operator.py
│ │ postgres_templated_operator.py
│ |
│ ├───scripts
│ ├───project_1
| | transform_cases.py
| | common.py
│ ├───project_2
| | transform_surveys.py
| | common.py
│ ├───common
| helper.py
| dataset_writer.py
| .airflowignore
| Dockerfile
| docker-stack-airflow.yml
我的问题:
- 为了组织任务和任务,什么是好的目录结构? (dags 示例仅显示几个任务)
- 我目前将我的 dags 放在 dags 文件夹的根目录下,将我的任务放在不同的目录中,不确定这样做的方法吗?
- 我们应该使用 zip 文件吗? https://github.com/apache/incubator-airflow/blob/a1f4227bee1a70531cfa90769149322513cb6f92/airflow/models.py#L280
我也很乐意与其他人一起对文件夹结构进行基准测试。也许这取决于您使用 Airflow 的目的,但我会分享我的案例。我正在做数据管道来构建数据仓库,所以在高层次上我基本上有两个步骤:
- 将大量数据转储到数据湖中(只有少数人可以直接访问)
- 将数据从数据湖加载到分析数据库中,数据将在其中建模并公开给仪表板应用程序(许多 sql 查询来建模数据)
今天我将文件整理到三个主要文件夹中,试图反映上述逻辑:
├── dags
│ ├── dag_1.py
│ └── dag_2.py
├── data-lake
│ ├── data-source-1
│ └── data-source-2
└── dw
├── cubes
│ ├── cube_1.sql
│ └── cube_2.sql
├── dims
│ ├── dim_1.sql
│ └── dim_2.sql
└── facts
├── fact_1.sql
└── fact_2.sql
这或多或少是我的基本文件夹结构。
我用的是这样的。
- 一个项目通常是完全独立或独特的。也许 DAG 可以处理我们从某个客户端收到的文件,这些文件与其他所有内容完全无关(几乎可以肯定是一个单独的数据库模式)
- 我在一个公共文件夹中有我的运算符、挂钩和一些帮助脚本(删除某个 DAG 等的所有 Airflow 数据)
- 我曾经对整个 Airflow 文件夹有一个 git 存储库,但现在我每个项目都有一个单独的 git是如此无关)。这意味着每个项目文件夹也作为 .git 和 .gitignore 等
- 我倾向于保存原始数据,然后 'rest' 修改后的数据副本,这正是复制到数据库中的数据。由于来自不同客户端的不同格式(Excel、网络抓取、HTML 电子邮件抓取、平面文件、来自 SalesForce 或其他数据库源的查询...),我不得不大量修改一些原始数据...)
示例树:
├───dags
│ ├───common
│ │ ├───hooks
│ │ │ pysftp_hook.py
│ │ │
│ │ ├───operators
│ │ │ docker_sftp.py
│ │ │ postgres_templated_operator.py
│ │ │
│ │ └───scripts
│ │ delete.py
│ │
│ ├───project_1
│ │ │ dag_1.py
│ │ │ dag_2.py
│ │ │
│ │ └───sql
│ │ dim.sql
│ │ fact.sql
│ │ select.sql
│ │ update.sql
│ │ view.sql
│ │
│ └───project_2
│ │ dag_1.py
│ │ dag_2.py
│ │
│ └───sql
│ dim.sql
│ fact.sql
│ select.sql
│ update.sql
│ view.sql
│
└───data
├───project_1
│ ├───modified
│ │ file_20180101.csv
│ │ file_20180102.csv
│ │
│ └───raw
│ file_20180101.csv
│ file_20180102.csv
│
└───project_2
├───modified
│ file_20180101.csv
│ file_20180102.csv
│
└───raw
file_20180101.csv
file_20180102.csv
2021 年 10 月更新。我现在有一个用于所有项目的存储库。我所有的转换脚本都在插件文件夹中(其中还包含挂钩和运算符——基本上是我导入到我的 DAG 中的任何代码)。 DAG 代码我尽量保持简洁,所以它基本上只规定了时间表以及数据加载的位置。
├───dags
│ │
│ ├───project_1
│ │ dag_1.py
│ │ dag_2.py
│ │
│ └───project_2
│ dag_1.py
│ dag_2.py
│
├───plugins
│ ├───hooks
│ │ pysftp_hook.py
| | servicenow_hook.py
│ │
│ ├───sensors
│ │ ftp_sensor.py
| | sql_sensor.py
| |
│ ├───operators
│ │ servicenow_to_azure_blob_operator.py
│ │ postgres_templated_operator.py
│ |
│ ├───scripts
│ ├───project_1
| | transform_cases.py
| | common.py
│ ├───project_2
| | transform_surveys.py
| | common.py
│ ├───common
| helper.py
| dataset_writer.py
| .airflowignore
| Dockerfile
| docker-stack-airflow.yml