气流 - Pytest - 未找到夹具 'dag'

Airflow - Pytest - fixture 'dag' not found

我正在尝试为气流中的 DAG 编写一些完整性测试。我目前正在使用 airflow.utils.dag_cycle_tester 中的 test_cycle 函数测试循环 DAG。我的代码如下:

import glob
import importlib.util
import os
import pytest
from airflow.models import DAG
from airflow.utils.dag_cycle_tester import test_cycle

DAG_PATH = os.path.join(
 os.path.dirname(__file__), "..", "..", "dags/**/*.py"
)
DAG_FILES = glob.glob(DAG_PATH, recursive=True)

print(DAG_FILES)

@pytest.mark.parametrize("dag_file", DAG_FILES)
def test_dag_integrity(dag_file):
    module_name, _ = os.path.splitext(dag_file)
    module_path = os.path.join(DAG_PATH, dag_file)
    mod_spec = importlib.util.spec_from_file_location(module_name, 
    module_path)
    module = importlib.util.module_from_spec(mod_spec)
    mod_spec.loader.exec_module(module)
    dag_objects = [var for var in vars(module).values() if isinstance(var, 
    DAG)]
    assert dag_objects
    for dag in dag_objects:
        test_cycle(dag)

出于某种原因,当我 运行 使用 pytest tests/ 时,出现以下错误:

___________ERROR 在 test_cycle ___________

的设置

def test_cycle(dag):

未找到 E 夹具 'dag'

可用装置:anyio_backend、anyio_backend_name、anyio_backend_options、缓存、capfd、capfdbinary、caplog、capsys、capsysbinary、celery_config、celery_enable_logging, cov, doctest_namespace, monkeypatch, no_cover, pytestconfig, record_property, record_testsuite_property, record_xml_attribute, recwarn, tmp_path, tmp_path_工厂,tmpdir,tmpdir_factory 使用 'pytest --fixtures [testpath]' 寻求帮助。

任何关于导致这种情况的想法将不胜感激。谢谢!

虽然失败实际上是在您显示的另一个函数中,但我可以解释发生了什么。

问题:
Pytest 选择以 test_ 开头的每个函数。因此,当您的函数 test_cycletest_ 开始时,它也将 运行 作为单独的测试。在单独的测试中,它试图找到不存在的夹具 dag

因此解决方案是:
将函数重命名为 cycle_dag 应该可以修复它:)