在 Airflow 上使用一个 Python 任务的输出并用作另一个 Python 任务的输入
Using the output of one Python task and using as the input to another Python Task on Airflow
所以我正在使用 Apache Airflow 创建一个数据流,用于获取存储在 Pandas 数据帧中的一些数据,然后将其存储到 MongoDB 中。所以我有两种 python 方法,一种用于获取数据并返回数据帧,另一种用于将数据存储到相关数据库中。如何获取一项任务的输出并将其作为另一项任务的输入?这是我目前所拥有的(总结和浓缩版)
我研究了 xcom pull 和 push 的概念,这就是我在下面实现的,我还看到有一个用于 Airflow 的 MongoHook,但不太确定如何使用它。
import pandas as pd
import pymongo
import airflow
from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
def get_data(name, **context):
data = pd.read_csv('dataset.csv')
df = data.loc[data.name == name]
context['ti'].xcom_push(task_ids=['get-data'], value=data)
def push_to_db(df, dbname, collection):
client = pymongo.MongoClient(-insert creds here-)
db = client[dbname][collection]
data = df.to_dict(orient='records')
db.insert_many(data)
args = {
'owner': 'Airflow',
'start_date': airflow.utils.dates.days_ago(2),
}
dag = DAG(
dag_id='simple_xcom',
default_args=args,
start_date=datetime(2019, 09, 02),
schedule_interval="@daily",
retries=2
)
task1 = PythonOperator(task_id='get-data', params=['name': 'John'],
python_callable=get_data,
provide_context=True, dag=dag)
task2 = PythonOperator(task_id='load-db', params=['df': context['ti'].xcom_pull(task_ids=['get-data'], key='data'),
'dbname': 'person', 'table': 'salary'),
python_callable=push_to_db, provide_context=True, dag=dag)
task1 >> task2
每次我尝试 运行 它时,它都显示上下文不存在。所以也许我在将一项任务的输出作为另一项任务的输入方面做错了?
查看示例 xcom DAG。
https://github.com/apache/airflow/blob/master/airflow/example_dags/example_xcom.py
正如上面的答案,自定义 XCom 后端可以解决问题。
我们最近为气流实施了自定义 XCom 后端,由 vineyard 支持,以支持此类情况。
vineyard XCom 后端支持零拷贝 DAG 中任务之间的数据共享,并支持python 值,如numpy.ndarray
、pandas.DataFrame
,数据在tensorflow/mxnet/pytorch。
提供商在那里是开源的:https://github.com/v6d-io/v6d/tree/main/python/vineyard/contrib/airflow
借助 Vineyard XCom 后端,用户可以拥有直接生产和消费 pandas.DataFrame
的 dag,而无需任何“to_csv” + “from_csv” hack,
import numpy as np
import pandas as pd
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow',
}
@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example'])
def taskflow_etl_pandas():
@task()
def extract():
order_data_dict = pd.DataFrame({
'a': np.random.rand(100000),
'b': np.random.rand(100000),
})
return order_data_dict
@task(multiple_outputs=True)
def transform(order_data_dict: dict):
return {"total_order_value": order_data_dict["a"].sum()}
@task()
def load(total_order_value: float):
print(f"Total order value is: {total_order_value:.2f}")
order_data = extract()
order_summary = transform(order_data)
load(order_summary["total_order_value"])
taskflow_etl_pandas_dag = taskflow_etl_pandas()
希望对您有所帮助。
所以我正在使用 Apache Airflow 创建一个数据流,用于获取存储在 Pandas 数据帧中的一些数据,然后将其存储到 MongoDB 中。所以我有两种 python 方法,一种用于获取数据并返回数据帧,另一种用于将数据存储到相关数据库中。如何获取一项任务的输出并将其作为另一项任务的输入?这是我目前所拥有的(总结和浓缩版)
我研究了 xcom pull 和 push 的概念,这就是我在下面实现的,我还看到有一个用于 Airflow 的 MongoHook,但不太确定如何使用它。
import pandas as pd
import pymongo
import airflow
from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
def get_data(name, **context):
data = pd.read_csv('dataset.csv')
df = data.loc[data.name == name]
context['ti'].xcom_push(task_ids=['get-data'], value=data)
def push_to_db(df, dbname, collection):
client = pymongo.MongoClient(-insert creds here-)
db = client[dbname][collection]
data = df.to_dict(orient='records')
db.insert_many(data)
args = {
'owner': 'Airflow',
'start_date': airflow.utils.dates.days_ago(2),
}
dag = DAG(
dag_id='simple_xcom',
default_args=args,
start_date=datetime(2019, 09, 02),
schedule_interval="@daily",
retries=2
)
task1 = PythonOperator(task_id='get-data', params=['name': 'John'],
python_callable=get_data,
provide_context=True, dag=dag)
task2 = PythonOperator(task_id='load-db', params=['df': context['ti'].xcom_pull(task_ids=['get-data'], key='data'),
'dbname': 'person', 'table': 'salary'),
python_callable=push_to_db, provide_context=True, dag=dag)
task1 >> task2
每次我尝试 运行 它时,它都显示上下文不存在。所以也许我在将一项任务的输出作为另一项任务的输入方面做错了?
查看示例 xcom DAG。
https://github.com/apache/airflow/blob/master/airflow/example_dags/example_xcom.py
正如上面的答案,自定义 XCom 后端可以解决问题。
我们最近为气流实施了自定义 XCom 后端,由 vineyard 支持,以支持此类情况。
vineyard XCom 后端支持零拷贝 DAG 中任务之间的数据共享,并支持python 值,如numpy.ndarray
、pandas.DataFrame
,数据在tensorflow/mxnet/pytorch。
提供商在那里是开源的:https://github.com/v6d-io/v6d/tree/main/python/vineyard/contrib/airflow
借助 Vineyard XCom 后端,用户可以拥有直接生产和消费 pandas.DataFrame
的 dag,而无需任何“to_csv” + “from_csv” hack,
import numpy as np
import pandas as pd
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow',
}
@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example'])
def taskflow_etl_pandas():
@task()
def extract():
order_data_dict = pd.DataFrame({
'a': np.random.rand(100000),
'b': np.random.rand(100000),
})
return order_data_dict
@task(multiple_outputs=True)
def transform(order_data_dict: dict):
return {"total_order_value": order_data_dict["a"].sum()}
@task()
def load(total_order_value: float):
print(f"Total order value is: {total_order_value:.2f}")
order_data = extract()
order_summary = transform(order_data)
load(order_summary["total_order_value"])
taskflow_etl_pandas_dag = taskflow_etl_pandas()
希望对您有所帮助。