Airflow - 如何将 xcom 变量传递给 Python 函数
Airflow - How to pass xcom variable into Python function
我需要引用一个由 BashOperator
返回的变量。在我的 task_archive_s3_file
中,我需要从 get_s3_file
获取文件名。该任务只是将 {{ ti.xcom_pull(task_ids=submit_file_to_spark) }}
打印为字符串而不是值。
如果我使用 bash_command
,值打印正确。
get_s3_file = PythonOperator(
task_id='get_s3_file',
python_callable=obj.func_get_s3_file,
trigger_rule=TriggerRule.ALL_SUCCESS,
dag=dag)
submit_file_to_spark = BashOperator(
task_id='submit_file_to_spark',
bash_command="echo 'hello world'",
trigger_rule="all_done",
xcom_push=True,
dag=dag)
task_archive_s3_file = PythonOperator(
task_id='archive_s3_file',
# bash_command="echo {{ ti.xcom_pull(task_ids='submit_file_to_spark') }}",
python_callable=obj.func_archive_s3_file,
params={'s3_path_filename': "{{ ti.xcom_pull(task_ids=submit_file_to_spark) }}" },
dag=dag)
get_s3_file >> submit_file_to_spark >> task_archive_s3_file
像 {{ ti.xcom_pull(...) }}
这样的模板只能在支持模板的参数内部使用,否则它们不会在执行前呈现。请参阅 PythonOperator and BashOperator 的 template_fields
、template_fields_renderers
和 template_ext
属性。
所以 templates_dict
是您用来将模板传递给 python 运算符的方式:
def func_archive_s3_file(**context):
archive(context['templates_dict']['s3_path_filename'])
task_archive_s3_file = PythonOperator(
task_id='archive_s3_file',
dag=dag,
python_callable=obj.func_archive_s3_file,
provide_context=True, # must pass this because templates_dict gets passed via context
templates_dict={'s3_path_filename': "{{ ti.xcom_pull(task_ids='submit_file_to_spark') }}" })
然而,在获取 XCom 值的情况下,另一种选择是仅使用通过上下文提供给您的 TaskInstance
对象:
def func_archive_s3_file(**context):
archive(context['ti'].xcom_pull(task_ids='submit_file_to_spark'))
task_archive_s3_file = PythonOperator(
task_id='archive_s3_file',
dag=dag,
python_callable=obj.func_archive_s3_file,
provide_context=True,
对问题和答案都投了赞成票,但我认为对于那些只想在 DAG 中的 PythonOperator
任务之间传递小数据对象的用户来说,这可以更清楚一点。参考这个问题和 this XCom example 让我找到了以下解决方案。超级简单:
from datetime import datetime
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
DAG = DAG(
dag_id='example_dag',
start_date=datetime.now(),
schedule_interval='@once'
)
def push_function(**kwargs):
ls = ['a', 'b', 'c']
return ls
push_task = PythonOperator(
task_id='push_task',
python_callable=push_function,
provide_context=True,
dag=DAG)
def pull_function(**kwargs):
ti = kwargs['ti']
ls = ti.xcom_pull(task_ids='push_task')
print(ls)
pull_task = PythonOperator(
task_id='pull_task',
python_callable=pull_function,
provide_context=True,
dag=DAG)
push_task >> pull_task
我不确定为什么会这样,但确实如此。社区的几个问题:
- 这里的
ti
发生了什么事? **kwargs
是如何内置的?
- 两个功能都需要
provide_context=True
吗?
非常欢迎任何使这个答案更清晰的编辑!
使用相同的代码和修改的参数,如 Startdate
等
import airflow
from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
args = {
'owner': 'Airflow',
'start_date': airflow.utils.dates.days_ago(2),
}
DAG = DAG(
dag_id='simple_xcom',
default_args=args,
# start_date=datetime(2019, 04, 21),
schedule_interval="@daily",
#schedule_interval=timedelta(1)
)
def push_function(**context):
msg='the_message'
print("message to push: '%s'" % msg)
task_instance = context['task_instance']
task_instance.xcom_push(key="the_message", value=msg)
push_task = PythonOperator(
task_id='push_task',
python_callable=push_function,
provide_context=True,
dag=DAG)
def pull_function(**kwargs):
ti = kwargs['ti']
msg = ti.xcom_pull(task_ids='push_task',key='the_message')
print("received message: '%s'" % msg)
pull_task = PythonOperator(`enter code here`
task_id='pull_task',
python_callable=pull_function,
provide_context=True,
dag=DAG)
push_task >> pull_task
想知道context['task_instance']
和kwargs['ti']
是从哪里来的,可以参考Airflow macro documentation
在 Airflow 2.0(2020 年 12 月发布)中,TaskFlow API 使通过 XComs 变得更容易。使用此 API,您可以简单地从带有 @task 注释的函数中获取 return 值,它们将在幕后作为 XComs 传递。教程中的示例:
@task()
def extract():
...
return order_data_dict
@task()
def transform(order_data_dict: dict):
...
return total_order_value
order_data = extract()
order_summary = transform(order_data)
在此示例中,order_data
的类型为 XComArg
。它存储由 extract
任务编辑的字典 return。当 transform
任务运行时,order_data
被解包,任务接收存储的普通 Python 对象。
如果你想在 airflow 2 中将 xcom 传递给 bash 操作员,请使用 env
;假设您已推送到 xcom my_xcom_var
,那么您可以在 env
中使用 jinja 来提取 xcom 值,例如
BashOperator(
task_id=mytask,
bash_command="echo ${MYVAR}",
env={"MYVAR": '{{ ti.xcom_pull(key=\'my_xcom_var\') }}'},
dag=dag
)
我需要引用一个由 BashOperator
返回的变量。在我的 task_archive_s3_file
中,我需要从 get_s3_file
获取文件名。该任务只是将 {{ ti.xcom_pull(task_ids=submit_file_to_spark) }}
打印为字符串而不是值。
如果我使用 bash_command
,值打印正确。
get_s3_file = PythonOperator(
task_id='get_s3_file',
python_callable=obj.func_get_s3_file,
trigger_rule=TriggerRule.ALL_SUCCESS,
dag=dag)
submit_file_to_spark = BashOperator(
task_id='submit_file_to_spark',
bash_command="echo 'hello world'",
trigger_rule="all_done",
xcom_push=True,
dag=dag)
task_archive_s3_file = PythonOperator(
task_id='archive_s3_file',
# bash_command="echo {{ ti.xcom_pull(task_ids='submit_file_to_spark') }}",
python_callable=obj.func_archive_s3_file,
params={'s3_path_filename': "{{ ti.xcom_pull(task_ids=submit_file_to_spark) }}" },
dag=dag)
get_s3_file >> submit_file_to_spark >> task_archive_s3_file
像 {{ ti.xcom_pull(...) }}
这样的模板只能在支持模板的参数内部使用,否则它们不会在执行前呈现。请参阅 PythonOperator and BashOperator 的 template_fields
、template_fields_renderers
和 template_ext
属性。
所以 templates_dict
是您用来将模板传递给 python 运算符的方式:
def func_archive_s3_file(**context):
archive(context['templates_dict']['s3_path_filename'])
task_archive_s3_file = PythonOperator(
task_id='archive_s3_file',
dag=dag,
python_callable=obj.func_archive_s3_file,
provide_context=True, # must pass this because templates_dict gets passed via context
templates_dict={'s3_path_filename': "{{ ti.xcom_pull(task_ids='submit_file_to_spark') }}" })
然而,在获取 XCom 值的情况下,另一种选择是仅使用通过上下文提供给您的 TaskInstance
对象:
def func_archive_s3_file(**context):
archive(context['ti'].xcom_pull(task_ids='submit_file_to_spark'))
task_archive_s3_file = PythonOperator(
task_id='archive_s3_file',
dag=dag,
python_callable=obj.func_archive_s3_file,
provide_context=True,
对问题和答案都投了赞成票,但我认为对于那些只想在 DAG 中的 PythonOperator
任务之间传递小数据对象的用户来说,这可以更清楚一点。参考这个问题和 this XCom example 让我找到了以下解决方案。超级简单:
from datetime import datetime
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
DAG = DAG(
dag_id='example_dag',
start_date=datetime.now(),
schedule_interval='@once'
)
def push_function(**kwargs):
ls = ['a', 'b', 'c']
return ls
push_task = PythonOperator(
task_id='push_task',
python_callable=push_function,
provide_context=True,
dag=DAG)
def pull_function(**kwargs):
ti = kwargs['ti']
ls = ti.xcom_pull(task_ids='push_task')
print(ls)
pull_task = PythonOperator(
task_id='pull_task',
python_callable=pull_function,
provide_context=True,
dag=DAG)
push_task >> pull_task
我不确定为什么会这样,但确实如此。社区的几个问题:
- 这里的
ti
发生了什么事?**kwargs
是如何内置的? - 两个功能都需要
provide_context=True
吗?
非常欢迎任何使这个答案更清晰的编辑!
使用相同的代码和修改的参数,如 Startdate
等
import airflow
from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
args = {
'owner': 'Airflow',
'start_date': airflow.utils.dates.days_ago(2),
}
DAG = DAG(
dag_id='simple_xcom',
default_args=args,
# start_date=datetime(2019, 04, 21),
schedule_interval="@daily",
#schedule_interval=timedelta(1)
)
def push_function(**context):
msg='the_message'
print("message to push: '%s'" % msg)
task_instance = context['task_instance']
task_instance.xcom_push(key="the_message", value=msg)
push_task = PythonOperator(
task_id='push_task',
python_callable=push_function,
provide_context=True,
dag=DAG)
def pull_function(**kwargs):
ti = kwargs['ti']
msg = ti.xcom_pull(task_ids='push_task',key='the_message')
print("received message: '%s'" % msg)
pull_task = PythonOperator(`enter code here`
task_id='pull_task',
python_callable=pull_function,
provide_context=True,
dag=DAG)
push_task >> pull_task
想知道context['task_instance']
和kwargs['ti']
是从哪里来的,可以参考Airflow macro documentation
在 Airflow 2.0(2020 年 12 月发布)中,TaskFlow API 使通过 XComs 变得更容易。使用此 API,您可以简单地从带有 @task 注释的函数中获取 return 值,它们将在幕后作为 XComs 传递。教程中的示例:
@task()
def extract():
...
return order_data_dict
@task()
def transform(order_data_dict: dict):
...
return total_order_value
order_data = extract()
order_summary = transform(order_data)
在此示例中,order_data
的类型为 XComArg
。它存储由 extract
任务编辑的字典 return。当 transform
任务运行时,order_data
被解包,任务接收存储的普通 Python 对象。
如果你想在 airflow 2 中将 xcom 传递给 bash 操作员,请使用 env
;假设您已推送到 xcom my_xcom_var
,那么您可以在 env
中使用 jinja 来提取 xcom 值,例如
BashOperator(
task_id=mytask,
bash_command="echo ${MYVAR}",
env={"MYVAR": '{{ ti.xcom_pull(key=\'my_xcom_var\') }}'},
dag=dag
)