如何在虚拟环境中使用apache airflow?

How to use apache airflow in a virtual environment?

我对使用 apache 气流还很陌生。我使用 pycharm 作为我的 IDE。我创建了一个项目(anaconda 环境),创建了一个包含 DAG 定义和 Bash 运算符的 python 脚本。当我打开我的气流网络服务器时,我的 DAGS 没有显示。仅显示默认示例 DAG。我的 AIRFLOW_HOME 变量包含 ~/airflow。所以我在那里存储了我的 python 脚本,现在它显示了。

如何在项目环境中使用它?

是否在每个项目开始时更改环境变量?

有没有办法为每个项目添加特定的气流主目录?

我不想将我的 DAG 存储在默认的气流目录中,因为我想将它添加到我的 git 存储库中。请帮助我。

编辑 airflow.cfg 文件并设置:

load_examples = False
dags_folder = /path/to/your/dag/files

如果您的气流目录未设置为默认目录,则应设置此环境变量。如果每次都更改它很烦人,只需在您的 pycharm 项目配置或本地 OS (~/.bashrc).

中设置它

我的建议是写一个小脚本在本地模式下执行airflow。 你应该单独执行 airflow webserver&scheduler&workers。

对我来说,在小型开发机器上运行 airflow 服务更方便,考虑这样做。

您可以 set/override 在 ${AIRFLOW_HOME}/airflow.cfg 中使用环境变量指定的气流选项,使用以下格式:$AIRFLOW__{SECTION}__{KEY}(注意双下划线)。这是气流文档的 link。所以你可以简单地做

export AIRFLOW__CORE__DAGS_FOLDER=/path/to/dags/folder

但是,对于不同的项目,这样做既繁琐又容易出错。作为替代方案,您可以考虑使用 pipenvpipenv for managing virtual environments instead of Anaconda. Here is a nice guide about pipenv and problems it solves. One of the default features,因为当您在激活 virtualenv 的情况下生成 shell 时,它会自动加载在 .env 文件中定义的变量。所以这就是您使用 pipenv 的工作流程:

cd /path/to/my_project

# Creates venv with python 3.7 
pipenv install --python=3.7 Flask==1.0.3 apache-airflow==1.10.3

# Set home for airflow in a root of your project (specified in .env file)
echo "AIRFLOW_HOME=${PWD}/airflow" >> .env

# Enters created venv and loads content of .env file 
pipenv shell

# Initialize airflow
airflow initdb
mkdir -p ${AIRFLOW_HOME}/dags/

Note: usage of Flask==1.03 I will explain at the end, but this is because pipenv checks whether sub-dependencies are compatible in order to ensure reproducibility.

所以在这些步骤之后你会得到以下项目结构

my_project
├── airflow
│   ├── airflow.cfg
│   ├── airflow.db
│   ├── dags
│   ├── logs
│   │   └── scheduler
│   │       ├── 2019-07-07
│   │       └── latest -> /path/to/my_project/airflow/logs/scheduler/2019-07-07
│   └── unittests.cfg
├── .env
├── Pipfile
└── Pipfile.lock

现在,当您第一次初始化 airflow 时,它将创建 ${AIRFLOW_HOME}/airflow.cfg 文件并将 use/expand ${AIRFLOW_HOME}/dags 作为 dags_folder 的值。如果您仍然需要 dags_folder 的不同位置,您可以再次使用 .env 文件

echo "AIRFLOW__CORE__DAGS_FOLDER=/different/path/to/dags/folder" >> .env

因此,您的 .env 文件将如下所示:

AIRFLOW_HOME=/path/to/my_project/airflow
AIRFLOW__CORE__DAGS_FOLDER=/different/path/to/dags/folder

我们取得了什么成就以及为什么这会很好

  1. 由于您在虚拟环境中安装了 airflow,因此您需要激活它才能使用 airflow
  2. 因为你是用 pipenv 做的,你需要使用 pipenv shell 才能激活 venv
  3. 因为你使用 pipenv shell,你总是会把 .env 中定义的变量导出到你的 venv 中。最重要的是 pipenv 仍然是一个子 shell,因此,当您退出它时,所有额外的环境变量也将被清除。
  4. 使用气流的不同项目将有不同的日志文件等位置。

关于 pipenv 的补充说明

  1. 为了使用通过 pipenv 创建的 venv 作为您的 IDE 的项目解释器,请使用 pipenv --py.
  2. 提供的路径
  3. 默认情况下,pipenv 像 conda 一样在同一个全局位置创建所有 venvs,但您可以通过将 export PIPENV_VENV_IN_PROJECT=1 添加到您的 .bashrc(或其他 rc)。然后PyCharm进入工程解释器的设置就可以自动获取了。

Flask==1.0.3

用法注意事项

来自 PyPi 的 Airflow 1.10.3 依赖于 flask>=1.0, <2.0jinja2>=2.7.3, <=2.10.0。今天,当我测试代码片段时,最新可用的 flask1.1.0,它取决于 jinja2>=2.10.1。这意味着虽然 pipenv 可以安装所有需要的软件,但是它无法锁定依赖项。因此,为了干净地使用我的代码示例,我必须指定 flask 的版本,这需要 jinja2 的版本与气流要求兼容。但是没有什么可担心的。 GitHub 上的最新版本 airflow 已经修复了这个问题。