Airflow - 使用 Jinja 模板设置操作员的重试次数
Airflow - Set retries of operator using Jinja template
我有一组 Airflow DAG,每个都有一组与之关联的标签。我还有一个 PythonOperator,我所有的 DAG 都使用它。
如果 DAG 应用了特定标签,我需要设置 Operator 的 retries
参数,否则不需要。
代码看起来像这样:
dag.py
from common import tasks
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.helpers import chain
dag = DAG(
'test_retry_dag',
tags=['abc','xyz'],
default_args=default_args,
schedule_interval='* * * * *',
)
with dag:
chain(*tasks())
common.py
def foo(job_name):
raise Exception()
def tasks():
return [PythonOperator(
task_id='auto_retry',
retries='{{5 if "abc" in dag.tags else 0 }}',
python_callable=foo
)]
因为 tasks
是在一个单独的模块中定义的,所以我无法访问那里的 DAG 对象(将它作为参数传递是不可行的,因为它会涉及在非常大的每个 DAG 中进行更改代码库。因此我觉得使用 Jinja 模板是可行的方法,但是上面的代码不起作用,我认为我使用模板的方式是错误的。
有人可以帮帮我吗?
注意:我正在使用 Airflow 1.10,目前无法更新到 Airflow 2
如果您查看 BaseOperator 代码,那么您会发现在不在构造函数中传递 dag 参数的情况下创建一个运算符实例(在您的例子中是 PythonOperator)会从 settings.CONTEXT_MANAGER_DAG
.
获取其值
https://github.com/apache/airflow/blob/1.10.15/airflow/models/baseoperator.py#L420-L421
鉴于您描述的任务方法的用法,它看起来总是使用上下文 dag。然后您可以更改它以执行以下操作。
from airflow import settings
def foo(job_name):
raise Exception()
def tasks():
dag = settings.CONTEXT_MANAGER_DAG
return [PythonOperator(
task_id='auto_retry',
retries=5 if "abc" in dag.tags else 0,
python_callable=foo
)]
我有一组 Airflow DAG,每个都有一组与之关联的标签。我还有一个 PythonOperator,我所有的 DAG 都使用它。
如果 DAG 应用了特定标签,我需要设置 Operator 的 retries
参数,否则不需要。
代码看起来像这样:
dag.py
from common import tasks
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.helpers import chain
dag = DAG(
'test_retry_dag',
tags=['abc','xyz'],
default_args=default_args,
schedule_interval='* * * * *',
)
with dag:
chain(*tasks())
common.py
def foo(job_name):
raise Exception()
def tasks():
return [PythonOperator(
task_id='auto_retry',
retries='{{5 if "abc" in dag.tags else 0 }}',
python_callable=foo
)]
因为 tasks
是在一个单独的模块中定义的,所以我无法访问那里的 DAG 对象(将它作为参数传递是不可行的,因为它会涉及在非常大的每个 DAG 中进行更改代码库。因此我觉得使用 Jinja 模板是可行的方法,但是上面的代码不起作用,我认为我使用模板的方式是错误的。
有人可以帮帮我吗?
注意:我正在使用 Airflow 1.10,目前无法更新到 Airflow 2
如果您查看 BaseOperator 代码,那么您会发现在不在构造函数中传递 dag 参数的情况下创建一个运算符实例(在您的例子中是 PythonOperator)会从 settings.CONTEXT_MANAGER_DAG
.
https://github.com/apache/airflow/blob/1.10.15/airflow/models/baseoperator.py#L420-L421
鉴于您描述的任务方法的用法,它看起来总是使用上下文 dag。然后您可以更改它以执行以下操作。
from airflow import settings
def foo(job_name):
raise Exception()
def tasks():
dag = settings.CONTEXT_MANAGER_DAG
return [PythonOperator(
task_id='auto_retry',
retries=5 if "abc" in dag.tags else 0,
python_callable=foo
)]