我们如何验证 DAG 对象中是否没有循环

How can we validated whether there are no cycles in the DAG objects

我正在为我的 ETL 编写单元测试,作为一个过程,我想测试所有 Dag 以确保它们没有循环。在阅读了 Bas Harenslak 和 Julian de Ruiter 的 Data Pipelines with Apache Airflow 之后,我看到他们正在使用 DAG.test_cycle(),这里的 DAG 是从模块 airflow.models.dag 导入的,但是当我 运行 代码时我得到一个错误,AttributeError: 'DAG' object has no attribute 'test_cycle'

这是我的代码片段

import glob
import importlib
import os

import pytest
from airflow.models.dag import DAG

DAG_PATH = os.path.join(os.path.dirname(file), “…”, “…”, “dags/**/*.py”)
DAG_FILES = glob.glob(DAG_PATH, recursive=True)

@pytest.mark.parametrize("dag_file", DAG_FILES)
def test_dag_integrity(dag_file):
    module_name, _ = os.path.splitext(dag_file)
    module_path = os.path.join(DAG_PATH, dag_file)
    mod_spec = importlib.util.spec_from_file_location(module_name, module_path)
    module = importlib.util.module_from_spec(mod_spec)
    mod_spec.loader.exec_module(module)

    dag_objects = [var for var in vars(module).values() if isinstance(var, DAG)]

    assert dag_objects

    for dag in dag_objects:
        dag.test_cycle()

在 Airflow 2.0.0 或更高版本中,您可以使用以 dag 作为参数的 test_cycle() 函数:

def test_cycle(dag):
    """
    Check to see if there are any cycles in the DAG. Returns False if no cycle found,
    otherwise raises exception.
    """

Source

像这样导入:

from airflow.utils.dag_cycle_tester import test_cycle

您可以在 DagBag class 的定义中找到示例。

万一有人在使用 test_cycle 方法时遇到与@adan11 提到的相同问题,问题是当我们编译我们的测试套件时,我们的测试工具即。 pytest,将在导入时解析 test_cycle 方法。

例如

# Pytest will collect this function
from airflow.utils.dag_cycle_tester import test_cycle

当我们使用如下别名时,Pytest 将忽略它:

from airflow.utils.dag_cycle_tester import test_cycle as _test_cycle

这取决于您的测试配置。