我们如何验证 DAG 对象中是否没有循环
How can we validated whether there are no cycles in the DAG objects
我正在为我的 ETL 编写单元测试,作为一个过程,我想测试所有 Dag 以确保它们没有循环。在阅读了 Bas Harenslak 和 Julian de Ruiter 的 Data Pipelines with Apache Airflow 之后,我看到他们正在使用 DAG.test_cycle(),这里的 DAG 是从模块 airflow.models.dag 导入的,但是当我 运行 代码时我得到一个错误,AttributeError: 'DAG' object has no attribute 'test_cycle'
这是我的代码片段
import glob
import importlib
import os
import pytest
from airflow.models.dag import DAG
DAG_PATH = os.path.join(os.path.dirname(file), “…”, “…”, “dags/**/*.py”)
DAG_FILES = glob.glob(DAG_PATH, recursive=True)
@pytest.mark.parametrize("dag_file", DAG_FILES)
def test_dag_integrity(dag_file):
module_name, _ = os.path.splitext(dag_file)
module_path = os.path.join(DAG_PATH, dag_file)
mod_spec = importlib.util.spec_from_file_location(module_name, module_path)
module = importlib.util.module_from_spec(mod_spec)
mod_spec.loader.exec_module(module)
dag_objects = [var for var in vars(module).values() if isinstance(var, DAG)]
assert dag_objects
for dag in dag_objects:
dag.test_cycle()
在 Airflow 2.0.0
或更高版本中,您可以使用以 dag
作为参数的 test_cycle()
函数:
def test_cycle(dag):
"""
Check to see if there are any cycles in the DAG. Returns False if no cycle found,
otherwise raises exception.
"""
像这样导入:
from airflow.utils.dag_cycle_tester import test_cycle
您可以在 DagBag class 的定义中找到示例。
万一有人在使用 test_cycle 方法时遇到与@adan11 提到的相同问题,问题是当我们编译我们的测试套件时,我们的测试工具即。 pytest,将在导入时解析 test_cycle 方法。
例如
# Pytest will collect this function
from airflow.utils.dag_cycle_tester import test_cycle
当我们使用如下别名时,Pytest 将忽略它:
from airflow.utils.dag_cycle_tester import test_cycle as _test_cycle
这取决于您的测试配置。
我正在为我的 ETL 编写单元测试,作为一个过程,我想测试所有 Dag 以确保它们没有循环。在阅读了 Bas Harenslak 和 Julian de Ruiter 的 Data Pipelines with Apache Airflow 之后,我看到他们正在使用 DAG.test_cycle(),这里的 DAG 是从模块 airflow.models.dag 导入的,但是当我 运行 代码时我得到一个错误,AttributeError: 'DAG' object has no attribute 'test_cycle'
这是我的代码片段
import glob
import importlib
import os
import pytest
from airflow.models.dag import DAG
DAG_PATH = os.path.join(os.path.dirname(file), “…”, “…”, “dags/**/*.py”)
DAG_FILES = glob.glob(DAG_PATH, recursive=True)
@pytest.mark.parametrize("dag_file", DAG_FILES)
def test_dag_integrity(dag_file):
module_name, _ = os.path.splitext(dag_file)
module_path = os.path.join(DAG_PATH, dag_file)
mod_spec = importlib.util.spec_from_file_location(module_name, module_path)
module = importlib.util.module_from_spec(mod_spec)
mod_spec.loader.exec_module(module)
dag_objects = [var for var in vars(module).values() if isinstance(var, DAG)]
assert dag_objects
for dag in dag_objects:
dag.test_cycle()
在 Airflow 2.0.0
或更高版本中,您可以使用以 dag
作为参数的 test_cycle()
函数:
def test_cycle(dag):
"""
Check to see if there are any cycles in the DAG. Returns False if no cycle found,
otherwise raises exception.
"""
像这样导入:
from airflow.utils.dag_cycle_tester import test_cycle
您可以在 DagBag class 的定义中找到示例。
万一有人在使用 test_cycle 方法时遇到与@adan11 提到的相同问题,问题是当我们编译我们的测试套件时,我们的测试工具即。 pytest,将在导入时解析 test_cycle 方法。
例如
# Pytest will collect this function
from airflow.utils.dag_cycle_tester import test_cycle
当我们使用如下别名时,Pytest 将忽略它:
from airflow.utils.dag_cycle_tester import test_cycle as _test_cycle
这取决于您的测试配置。