Python 气流 - Return 来自 Python 运算符的结果

Python Airflow - Return result from PythonOperator

我用多个 PythonOperators 写了一个 DAG

task1 = af_op.PythonOperator(task_id='Data_Extraction_Environment',
                          provide_context=True,
                          python_callable=Task1, dag=dag1)

def Task1(**kwargs):
    return(kwargs['dag_run'].conf.get('file'))

我正在从 PythonOperator 调用 "Task1" 方法。该方法正在返回一个值,该值我需要传递给下一个 PythonOperator.How 我可以从 "task1" 变量中获取值吗?或者我如何获取从 Task1 方法返回的值?

已更新:

    def Task1(**kwargs):
          file_name = kwargs['dag_run'].conf.get[file]
          task_instance = kwargs['task_instance']
          task_instance.xcom_push(key='file', value=file_name) 
          return file_name

  t1 = PythonOperator(task_id = 'Task1',provide_context=True,python_callable=Task1,dag=dag)

  t2 =   BashOperator(
      task_id='Moving_bucket', 
      bash_command='python /home/raw.py {{ task_instance.xcom_pull(task_ids='Task1',key='file') }} ',
      dag=dag,
    )

t2.set_upstream(t1)

您可能想查看 Airflow 的 XCOM:https://airflow.apache.org/docs/apache-airflow/stable/concepts/xcoms.html

如果您 return 来自函数的值,则该值存储在 xcom 中。在您的情况下,您可以像这样从其他 Python 代码访问它:

task_instance = kwargs['task_instance']
task_instance.xcom_pull(task_ids='Task1')

或在这样的模板中:

{{ task_instance.xcom_pull(task_ids='Task1') }}

如果你想指定一个键,你可以推入 XCOM(在任务中):

task_instance = kwargs['task_instance']
task_instance.xcom_push(key='the_key', value=my_str)

之后您可以像这样访问它:

task_instance.xcom_pull(task_ids='my_task', key='the_key')

编辑 1

后续问题:我如何将值传递给另一个 Python运算符,而不是在另一个函数中使用值 - "t2 = "BashOperator(task_id ='Moving_bucket', bash_command='python /home/raw.py "%s" '%file_name, dag=dag)" --- 我想访问 file_name 是 returned通过“任务 1”。这将如何实现?

首先,在我看来,该值实际上 不是 传递给另一个 PythonOperator,而是传递给 BashOperator

其次,我上面的回答已经涵盖了这一点。字段 bash_command 是模板化的(参见来源中的 template_fieldshttps://github.com/apache/incubator-airflow/blob/master/airflow/operators/bash_operator.py)。因此,我们可以使用模板化版本:

BashOperator(
  task_id='Moving_bucket', 
  bash_command='python /home/raw.py {{ task_instance.xcom_pull(task_ids='Task1') }} ',
  dag=dag,
)

编辑 2

说明: Airflow 是这样工作的:它将执行 Task1,然后填充 xcom,然后执行下一个任务。因此,为了使您的示例正常工作,您需要先执行 Task1,然后在 Task1 的下游执行 Moving_bucket。

由于您使用的是 return 函数,因此您也可以省略 xcom_pull 中的 key='file',而不是在函数中手动设置它。