在 Airflow 的运算符参数中将 jinja 模板格式化为 INT
Format jinja template as INT in operator parameter in Airflow
我正在尝试将 jinja 模板参数格式化为整数,以便我可以将其传递给需要 INT(可以是自定义或 PythonOperator)的运算符,但我无法做到。
请参阅下面的示例 DAG。我正在使用内置的 Jinja 过滤器 | int
但它不起作用 - 类型仍然是 <class 'str'>
我对 Airflow 还是个新手,但根据我读到的 Jinja/Airflow 作品,我认为这是不可能的。我看到两个主要的解决方法:
- 更改运算符参数以期望字符串并处理下面的转换。
在 单独的 PythonOperator 中处理此转换,它将字符串转换为 int 并使用 xcom/task 上下文导出它。 (我认为这会起作用但不确定)
请让我知道任何其他解决方法
def greet(mystr):
print (mystr)
print(type(mystr))
default_args = {
'owner': 'airflow',
'start_date': days_ago(2)
}
dag = DAG(
'template_dag',
default_args=default_args,
description='template',
schedule_interval='0 13 * * *'
)
with dag:
# foo = "{{ var.value.my_custom_var | int }}" # from variable
foo = "{{ execution_date.int_timestamp | int }}" # built in macro
# could be MyCustomOperator
opr_greet = PythonOperator(task_id='greet',
python_callable=greet,
op_kwargs={'mystr': foo}
)
opr_greet
Airflow 1.10.11
我相信 Jinja 总是会 return 你一个字符串:模板是一个字符串,替换模板中的值将 return 你一个字符串。
如果你确定 foo 总是一个整数,你可以这样做
opr_greet = PythonOperator(task_id='greet',
python_callable=greet,
op_kwargs={'mystr': int(foo)}
)
更新: 看起来 Airflow uses the render
method from Jinja2, which returns 一个 Unicode 字符串。
此时,如果能修改greet
,在那个函数中管理输入参数就更容易了。
更新的答案:
从 Airflow 2.1 开始,您可以将 render_template_as_native_obj=True
传递给 dag,Airflow 将 return Python 类型(dict、int 等)而不是字符串。看到这个 pull request
dag = DAG(
dag_id="example_template_as_python_object",
schedule_interval=None,
start_date=days_ago(2),
render_template_as_native_obj=True,
)
以前版本的旧答案:
我发现了一个提供最佳解决方法的相关问题,IMO。
诀窍是使用 Python运算符,在那里进行数据类型转换,然后使用参数调用主运算符。下面是将 json 字符串转换为字典对象的示例。同样可以申请string转int等
def my_func(ds, **kwargs):
ti = kwargs['ti']
body = ti.xcom_pull(task_ids='privious_task_id')
import_body= json.loads(body)
op = CloudSqlInstanceImportOperator(
project_id=GCP_PROJECT_ID,
body= import_body,
instance=INSTANCE_NAME,
gcp_conn_id='postgres_default',
task_id='sql_import_task',
validate_body=True,
)
op.execute()
p = PythonOperator(task_id='python_task', python_callable=my_func)
我正在尝试将 jinja 模板参数格式化为整数,以便我可以将其传递给需要 INT(可以是自定义或 PythonOperator)的运算符,但我无法做到。
请参阅下面的示例 DAG。我正在使用内置的 Jinja 过滤器 | int
但它不起作用 - 类型仍然是 <class 'str'>
我对 Airflow 还是个新手,但根据我读到的 Jinja/Airflow 作品,我认为这是不可能的。我看到两个主要的解决方法:
- 更改运算符参数以期望字符串并处理下面的转换。
在 单独的 PythonOperator 中处理此转换,它将字符串转换为 int 并使用 xcom/task 上下文导出它。 (我认为这会起作用但不确定)
请让我知道任何其他解决方法
def greet(mystr):
print (mystr)
print(type(mystr))
default_args = {
'owner': 'airflow',
'start_date': days_ago(2)
}
dag = DAG(
'template_dag',
default_args=default_args,
description='template',
schedule_interval='0 13 * * *'
)
with dag:
# foo = "{{ var.value.my_custom_var | int }}" # from variable
foo = "{{ execution_date.int_timestamp | int }}" # built in macro
# could be MyCustomOperator
opr_greet = PythonOperator(task_id='greet',
python_callable=greet,
op_kwargs={'mystr': foo}
)
opr_greet
Airflow 1.10.11
我相信 Jinja 总是会 return 你一个字符串:模板是一个字符串,替换模板中的值将 return 你一个字符串。
如果你确定 foo 总是一个整数,你可以这样做
opr_greet = PythonOperator(task_id='greet',
python_callable=greet,
op_kwargs={'mystr': int(foo)}
)
更新: 看起来 Airflow uses the render
method from Jinja2, which returns 一个 Unicode 字符串。
此时,如果能修改greet
,在那个函数中管理输入参数就更容易了。
更新的答案:
从 Airflow 2.1 开始,您可以将 render_template_as_native_obj=True
传递给 dag,Airflow 将 return Python 类型(dict、int 等)而不是字符串。看到这个 pull request
dag = DAG(
dag_id="example_template_as_python_object",
schedule_interval=None,
start_date=days_ago(2),
render_template_as_native_obj=True,
)
以前版本的旧答案:
我发现了一个提供最佳解决方法的相关问题,IMO。
诀窍是使用 Python运算符,在那里进行数据类型转换,然后使用参数调用主运算符。下面是将 json 字符串转换为字典对象的示例。同样可以申请string转int等
def my_func(ds, **kwargs):
ti = kwargs['ti']
body = ti.xcom_pull(task_ids='privious_task_id')
import_body= json.loads(body)
op = CloudSqlInstanceImportOperator(
project_id=GCP_PROJECT_ID,
body= import_body,
instance=INSTANCE_NAME,
gcp_conn_id='postgres_default',
task_id='sql_import_task',
validate_body=True,
)
op.execute()
p = PythonOperator(task_id='python_task', python_callable=my_func)