如何使用 Python 在 Airflow DAG 中导入外部脚本?

How to import external scripts in a Airflow DAG with Python?

我有以下结构:

然后我尝试在 inbound_layer 的某些文件中导入脚本,如下所示:

import calc

但是我在 Airflow 网站上收到以下错误消息:

有什么想法吗?

对于airflow DAG,当你导入自己的模块时,你需要确保两件事:

  1. 模块在哪里?您需要找到气流文件夹中的根路径在哪里。例如,在我的开发箱中,文件夹是:

    ~/projects/data/airflow/teams/team_name/projects/default/dags/dag_names/dag_files.py

根是气流,所以如果我把我的模块 my_module 放在

~/projects/data/airflow/teams/team_name/common

那我需要用

from teams.team_name.common import my_module

在你的例子中,如果根目录是bi的上层文件夹,而你把calc的脚本放在bi/inbound_layer/test.py中,那么你可以使用:

from bi.inbound_layer.test import calc
  1. 并且您必须确保目录结构中有 \__init\__.py 个文件,导入才能正常运行。您应该在路径中的每个文件夹中有一个空文件 \__init\__.py。它表明此目录是气流包的一部分。在您的情况下,您可以使用 bi 和 _inbound_layer_ 文件夹下的 touch \__init\__.py (cli) 创建空的 __init\__.py.

我需要在 ren.py 的顶部插入以下脚本:

import sys, os
from airflow.models import Variable

DAGBAGS_DIR = Variable.get('DAGBAGS_DIR')
sys.path.append(DAGBAGS_DIR + '/bi/inbound_layer/')

这样我就可以使用当前文件夹包了。

Airflow 默认将 Airflow 主页中的 dags/, plugins/, and config/ 目录添加到 PYTHONPATH,因此您可以例如在 dags 文件夹下创建文件夹 commons,在那里创建文件(scriptFileName)。假设脚本有一些 class (GetJobDoneClass) 你想导入你的 DAG 你可以这样做:

from common.scriptFileName import GetJobDoneClass