如何将变量 task_ids 传递给 Airflow 中的 xcom.pull?
How to pass a variable task_ids to a xcom.pull in Airflow?
有时我发现使用循环创建任务很方便。
下面是一个 SqoopOperator 示例,我在 where 子句中使用了之前 PythonOperator 中的 xcom 值。我正在尝试使用变量 get_delivery_sqn_task_id
来访问正确的 xcom 值 ti.xcom_pull(task_ids=get_delivery_sqn_task_id
,但这不起作用 (returns ()).
我可以将所有内容都从循环中删除,但我认为这会使代码非常难看。是否有一个优雅的解决方案来使用变量 task_ids 来检索 xcom 值?我想最好的解决方案是使用气流变量。
for table in tables:
get_delivery_sqn_task_id ='get_delivery_sqn_'+ table
get_delivery_sqn_task = PythonOperator(
task_id = get_delivery_sqn_task_id,
python_callable = get_delivery_sqn,
op_kwargs = {
'table_name': table
},
provide_context = True,
dag = dag
)
sqoop_operator_task = SqoopOperator(
task_id = "sqoop_"+table,
conn_id = "DWDH_PROD",
table = table,
cmd_type = "import",
target_dir = "/sourcedata/sqoop_tmp/"+table,
num_mappers = 1,
where = "delivery_sqn > {{ ti.xcom_pull(task_ids=get_delivery_sqn_task_id, key='return_value') }}",
dag = dag
)
你可以这样做:
"delivery_sqn > {{{{ ti.xcom_pull(task_ids={}, key='return_value') }}}}".format(get_delivery_sqn_task_id)
有时我发现使用循环创建任务很方便。
下面是一个 SqoopOperator 示例,我在 where 子句中使用了之前 PythonOperator 中的 xcom 值。我正在尝试使用变量 get_delivery_sqn_task_id
来访问正确的 xcom 值 ti.xcom_pull(task_ids=get_delivery_sqn_task_id
,但这不起作用 (returns ()).
我可以将所有内容都从循环中删除,但我认为这会使代码非常难看。是否有一个优雅的解决方案来使用变量 task_ids 来检索 xcom 值?我想最好的解决方案是使用气流变量。
for table in tables:
get_delivery_sqn_task_id ='get_delivery_sqn_'+ table
get_delivery_sqn_task = PythonOperator(
task_id = get_delivery_sqn_task_id,
python_callable = get_delivery_sqn,
op_kwargs = {
'table_name': table
},
provide_context = True,
dag = dag
)
sqoop_operator_task = SqoopOperator(
task_id = "sqoop_"+table,
conn_id = "DWDH_PROD",
table = table,
cmd_type = "import",
target_dir = "/sourcedata/sqoop_tmp/"+table,
num_mappers = 1,
where = "delivery_sqn > {{ ti.xcom_pull(task_ids=get_delivery_sqn_task_id, key='return_value') }}",
dag = dag
)
你可以这样做:
"delivery_sqn > {{{{ ti.xcom_pull(task_ids={}, key='return_value') }}}}".format(get_delivery_sqn_task_id)