Airflow - 使用 Jinja 模板设置操作员的重试次数

Airflow - Set retries of operator using Jinja template

我有一组 Airflow DAG,每个都有一组与之关联的标签。我还有一个 PythonOperator,我所有的 DAG 都使用它。

如果 DAG 应用了特定标签,我需要设置 Operator 的 retries 参数,否则不需要。

代码看起来像这样:

dag.py

from common import tasks

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.helpers import chain

dag = DAG(
'test_retry_dag',
tags=['abc','xyz'],
default_args=default_args,
schedule_interval='* * * * *',
)
    

with dag:
    chain(*tasks())

common.py

def foo(job_name):
    raise Exception()

def tasks():
    return [PythonOperator(
        task_id='auto_retry',
        retries='{{5 if "abc" in  dag.tags else 0 }}',
        python_callable=foo
    )]

因为 tasks 是在一个单独的模块中定义的,所以我无法访问那里的 DAG 对象(将它作为参数传递是不可行的,因为它会涉及在非常大的每个 DAG 中进行更改代码库。因此我觉得使用 Jinja 模板是可行的方法,但是上面的代码不起作用,我认为我使用模板的方式是错误的。

有人可以帮帮我吗?

注意:我正在使用 Airflow 1.10,目前无法更新到 Airflow 2

如果您查看 BaseOperator 代码,那么您会发现在不在构造函数中传递 dag 参数的情况下创建一个运算符实例(在您的例子中是 PythonOperator)会从 settings.CONTEXT_MANAGER_DAG.

获取其值

https://github.com/apache/airflow/blob/1.10.15/airflow/models/baseoperator.py#L420-L421

鉴于您描述的任务方法的用法,它看起来总是使用上下文 dag。然后您可以更改它以执行以下操作。

from airflow import settings

def foo(job_name):
    raise Exception()

def tasks():
    dag = settings.CONTEXT_MANAGER_DAG
    return [PythonOperator(
        task_id='auto_retry',
        retries=5 if "abc" in dag.tags else 0,
        python_callable=foo
    )]