Python 气流 - Return 来自 Python 运算符的结果
Python Airflow - Return result from PythonOperator
我用多个 PythonOperators 写了一个 DAG
task1 = af_op.PythonOperator(task_id='Data_Extraction_Environment',
provide_context=True,
python_callable=Task1, dag=dag1)
def Task1(**kwargs):
return(kwargs['dag_run'].conf.get('file'))
我正在从 PythonOperator 调用 "Task1" 方法。该方法正在返回一个值,该值我需要传递给下一个 PythonOperator.How 我可以从 "task1" 变量中获取值吗?或者我如何获取从 Task1 方法返回的值?
已更新:
def Task1(**kwargs):
file_name = kwargs['dag_run'].conf.get[file]
task_instance = kwargs['task_instance']
task_instance.xcom_push(key='file', value=file_name)
return file_name
t1 = PythonOperator(task_id = 'Task1',provide_context=True,python_callable=Task1,dag=dag)
t2 = BashOperator(
task_id='Moving_bucket',
bash_command='python /home/raw.py {{ task_instance.xcom_pull(task_ids='Task1',key='file') }} ',
dag=dag,
)
t2.set_upstream(t1)
您可能想查看 Airflow 的 XCOM:https://airflow.apache.org/docs/apache-airflow/stable/concepts/xcoms.html
如果您 return 来自函数的值,则该值存储在 xcom 中。在您的情况下,您可以像这样从其他 Python 代码访问它:
task_instance = kwargs['task_instance']
task_instance.xcom_pull(task_ids='Task1')
或在这样的模板中:
{{ task_instance.xcom_pull(task_ids='Task1') }}
如果你想指定一个键,你可以推入 XCOM(在任务中):
task_instance = kwargs['task_instance']
task_instance.xcom_push(key='the_key', value=my_str)
之后您可以像这样访问它:
task_instance.xcom_pull(task_ids='my_task', key='the_key')
编辑 1
后续问题:我如何将值传递给另一个 Python运算符,而不是在另一个函数中使用值 - "t2 = "BashOperator(task_id ='Moving_bucket', bash_command='python /home/raw.py "%s" '%file_name, dag=dag)" --- 我想访问 file_name 是 returned通过“任务 1”。这将如何实现?
首先,在我看来,该值实际上 不是 传递给另一个 PythonOperator
,而是传递给 BashOperator
。
其次,我上面的回答已经涵盖了这一点。字段 bash_command
是模板化的(参见来源中的 template_fields
:https://github.com/apache/incubator-airflow/blob/master/airflow/operators/bash_operator.py)。因此,我们可以使用模板化版本:
BashOperator(
task_id='Moving_bucket',
bash_command='python /home/raw.py {{ task_instance.xcom_pull(task_ids='Task1') }} ',
dag=dag,
)
编辑 2
说明:
Airflow 是这样工作的:它将执行 Task1,然后填充 xcom,然后执行下一个任务。因此,为了使您的示例正常工作,您需要先执行 Task1,然后在 Task1 的下游执行 Moving_bucket。
由于您使用的是 return 函数,因此您也可以省略 xcom_pull
中的 key='file'
,而不是在函数中手动设置它。
我用多个 PythonOperators 写了一个 DAG
task1 = af_op.PythonOperator(task_id='Data_Extraction_Environment',
provide_context=True,
python_callable=Task1, dag=dag1)
def Task1(**kwargs):
return(kwargs['dag_run'].conf.get('file'))
我正在从 PythonOperator 调用 "Task1" 方法。该方法正在返回一个值,该值我需要传递给下一个 PythonOperator.How 我可以从 "task1" 变量中获取值吗?或者我如何获取从 Task1 方法返回的值?
已更新:
def Task1(**kwargs):
file_name = kwargs['dag_run'].conf.get[file]
task_instance = kwargs['task_instance']
task_instance.xcom_push(key='file', value=file_name)
return file_name
t1 = PythonOperator(task_id = 'Task1',provide_context=True,python_callable=Task1,dag=dag)
t2 = BashOperator(
task_id='Moving_bucket',
bash_command='python /home/raw.py {{ task_instance.xcom_pull(task_ids='Task1',key='file') }} ',
dag=dag,
)
t2.set_upstream(t1)
您可能想查看 Airflow 的 XCOM:https://airflow.apache.org/docs/apache-airflow/stable/concepts/xcoms.html
如果您 return 来自函数的值,则该值存储在 xcom 中。在您的情况下,您可以像这样从其他 Python 代码访问它:
task_instance = kwargs['task_instance']
task_instance.xcom_pull(task_ids='Task1')
或在这样的模板中:
{{ task_instance.xcom_pull(task_ids='Task1') }}
如果你想指定一个键,你可以推入 XCOM(在任务中):
task_instance = kwargs['task_instance']
task_instance.xcom_push(key='the_key', value=my_str)
之后您可以像这样访问它:
task_instance.xcom_pull(task_ids='my_task', key='the_key')
编辑 1
后续问题:我如何将值传递给另一个 Python运算符,而不是在另一个函数中使用值 - "t2 = "BashOperator(task_id ='Moving_bucket', bash_command='python /home/raw.py "%s" '%file_name, dag=dag)" --- 我想访问 file_name 是 returned通过“任务 1”。这将如何实现?
首先,在我看来,该值实际上 不是 传递给另一个 PythonOperator
,而是传递给 BashOperator
。
其次,我上面的回答已经涵盖了这一点。字段 bash_command
是模板化的(参见来源中的 template_fields
:https://github.com/apache/incubator-airflow/blob/master/airflow/operators/bash_operator.py)。因此,我们可以使用模板化版本:
BashOperator(
task_id='Moving_bucket',
bash_command='python /home/raw.py {{ task_instance.xcom_pull(task_ids='Task1') }} ',
dag=dag,
)
编辑 2
说明: Airflow 是这样工作的:它将执行 Task1,然后填充 xcom,然后执行下一个任务。因此,为了使您的示例正常工作,您需要先执行 Task1,然后在 Task1 的下游执行 Moving_bucket。
由于您使用的是 return 函数,因此您也可以省略 xcom_pull
中的 key='file'
,而不是在函数中手动设置它。