如何从 Airflow 中的 Jinja 模板化字符串中获取 python 字典或列表?
How to get python dictionary or list from Jinja templated string in Airflow?
假设我有一个运算符需要一个 python 列表(或字典)作为它的参数 属性
doExampleTask = ExampleOperator(
task_id = "doExampleTask",
property_needs_list = [
("a", "x"),
("b", "y")
],
property_needs_dict = {
"dynamic_field_1": "dynamic_value",
# ...
"dynamic_field_N": "dynamic_value",
},
)
问题是我无法在DAG的创建时定义列表的python数据结构(需要多少个列表元素)或dict(生成了哪些字段)。
我只能通过执行先前的任务或宏来动态获取此结构。
- 任务可以将带有动态字段的数据结构写入 XCOM
- 宏可以return一个数据结构
但是在上述两种情况下都无法将动态数据结构(由 XCOM 或自定义宏 return 编辑)转换为 python 数据结构并将其用作属性 运营商。
这不会 return 列表或字典:
doExampleTask = ExampleOperator(
task_id = "doExampleTask",
property_needs_list = '{{ generate_list() }}',
property_needs_dict = '{{ generate_dict() }}',
)
这也不会 return 字典或列表:
doExampleTask = ExampleOperator(
task_id = "doExampleTask",
property_needs_list = '{{ ti.xcom_pull(task_ids="PreviousTask", key="list_structure") }}',
property_needs_dict = '{{ ti.xcom_pull(task_ids="PreviousTask", key="dict_structure") }}',
)
如果我使用类似 eval()
的函数,它将无法在任务执行时评估字符串参数。它会尝试在 DAG 的创建时间对其进行评估,但值显然不会存在。
doExampleTask = ExampleOperator(
task_id = "doExampleTask",
property_needs_list = eval('{{ ti.xcom_pull(task_ids="PreviousTask", key="list_structure") }}'),
property_needs_dict = eval('{{ ti.xcom_pull(task_ids="PreviousTask", key="dict_structure") }}'),
)
或
doExampleTask = ExampleOperator(
task_id = "doExampleTask",
property_needs_list = eval('{{ generate_list() }}'),
property_needs_dict = eval('{{ generate_dict() }}'),
)
我该如何解决这个问题?
我对 Airflow 1.x 最感兴趣,但我对 Aitflow 2.x 解决方案持开放态度。
谢谢!
在 Airflow 1 中,Jinja 表达式始终被评估为字符串。您必须对运算符进行子类化或将逻辑构建到您的自定义运算符中,以根据需要翻译字符串化的 list/dict arg。
但是,在 Airflow 2.1 中,添加了一个选项来将模板渲染为原生 Python 类型。您可以在 DAG 级别设置 render_templates_as_native_obj=True
,列表将呈现为真实列表,dict 呈现为真实字典等。查看文档 here.
假设我有一个运算符需要一个 python 列表(或字典)作为它的参数 属性
doExampleTask = ExampleOperator(
task_id = "doExampleTask",
property_needs_list = [
("a", "x"),
("b", "y")
],
property_needs_dict = {
"dynamic_field_1": "dynamic_value",
# ...
"dynamic_field_N": "dynamic_value",
},
)
问题是我无法在DAG的创建时定义列表的python数据结构(需要多少个列表元素)或dict(生成了哪些字段)。
我只能通过执行先前的任务或宏来动态获取此结构。
- 任务可以将带有动态字段的数据结构写入 XCOM
- 宏可以return一个数据结构
但是在上述两种情况下都无法将动态数据结构(由 XCOM 或自定义宏 return 编辑)转换为 python 数据结构并将其用作属性 运营商。
这不会 return 列表或字典:
doExampleTask = ExampleOperator(
task_id = "doExampleTask",
property_needs_list = '{{ generate_list() }}',
property_needs_dict = '{{ generate_dict() }}',
)
这也不会 return 字典或列表:
doExampleTask = ExampleOperator(
task_id = "doExampleTask",
property_needs_list = '{{ ti.xcom_pull(task_ids="PreviousTask", key="list_structure") }}',
property_needs_dict = '{{ ti.xcom_pull(task_ids="PreviousTask", key="dict_structure") }}',
)
如果我使用类似 eval()
的函数,它将无法在任务执行时评估字符串参数。它会尝试在 DAG 的创建时间对其进行评估,但值显然不会存在。
doExampleTask = ExampleOperator(
task_id = "doExampleTask",
property_needs_list = eval('{{ ti.xcom_pull(task_ids="PreviousTask", key="list_structure") }}'),
property_needs_dict = eval('{{ ti.xcom_pull(task_ids="PreviousTask", key="dict_structure") }}'),
)
或
doExampleTask = ExampleOperator(
task_id = "doExampleTask",
property_needs_list = eval('{{ generate_list() }}'),
property_needs_dict = eval('{{ generate_dict() }}'),
)
我该如何解决这个问题?
我对 Airflow 1.x 最感兴趣,但我对 Aitflow 2.x 解决方案持开放态度。
谢谢!
在 Airflow 1 中,Jinja 表达式始终被评估为字符串。您必须对运算符进行子类化或将逻辑构建到您的自定义运算符中,以根据需要翻译字符串化的 list/dict arg。
但是,在 Airflow 2.1 中,添加了一个选项来将模板渲染为原生 Python 类型。您可以在 DAG 级别设置 render_templates_as_native_obj=True
,列表将呈现为真实列表,dict 呈现为真实字典等。查看文档 here.