保证一些操作符将在同一个气流工作者上执行
Guarantee that some operators will be executed on the same airflow worker
我有一个 DAG
- 从云存储下载一个 csv 文件
- 通过 https
将 csv 文件上传到第三方
我正在执行的 airflow 集群默认使用 CeleryExecutor
,所以我担心在某些时候当我扩大 worker 数量时,这些任务可能会在不同的 worker 上执行。例如。工作人员 A 进行下载,工作人员 B 尝试上传,但没有找到文件(因为它在工作人员 A 上)
是否有可能以某种方式保证下载和上传运算符都在同一个气流工作者上执行?
对于这类用例,我们有两种解决方案:
- 使用两者共享的网络安装驱动器
工作人员,以便下载和上传任务都可以访问
到同一个文件系统
- 使用特定于工作人员的 Airflow queue。如果只有一个工作人员在监听这个队列,你将保证他们都可以访问同一个文件系统。请注意,每个工作人员都可以监听多个队列,因此您可以让它监听 "default" 队列以及用于此任务的自定义队列。
将第 1 步(csv 下载)和第 2 步(csv 上传)放入 subdag,然后通过 SubDagOperator 将 executor
选项设置为 SequentialExecutor
来触发它 - 这个将确保步骤 1 和 2 运行 在同一个工人身上。
这是一个说明该概念的工作 DAG 文件(实际操作作为 DummyOperators 存根),在一些更大的过程的上下文中有 download/upload 步骤:
from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.subdag_operator import SubDagOperator
from airflow.executors.sequential_executor import SequentialExecutor
PARENT_DAG_NAME='subdaggy'
CHILD_DAG_NAME='subby'
def make_sub_dag(parent_dag_name, child_dag_name, start_date, schedule_interval):
dag = DAG(
'%s.%s' % (parent_dag_name, child_dag_name),
schedule_interval=schedule_interval,
start_date=start_date
)
task_download = DummyOperator(
task_id = 'task_download_csv',
dag=dag
)
task_upload = DummyOperator(
task_id = 'task_upload_csv',
dag=dag
)
task_download >> task_upload
return dag
main_dag = DAG(
PARENT_DAG_NAME,
schedule_interval=None,
start_date=datetime(2017,1,1)
)
main_task_1 = DummyOperator(
task_id = 'main_1',
dag = main_dag
)
main_task_2 = SubDagOperator(
task_id = CHILD_DAG_NAME,
subdag=make_sub_dag(PARENT_DAG_NAME, CHILD_DAG_NAME, main_dag.start_date, main_dag.schedule_interval),
executor=SequentialExecutor(),
dag=main_dag
)
main_task_3 = DummyOperator(
task_id = 'main_3',
dag = main_dag
)
main_task_1 >> main_task_2 >> main_task_3
我有一个 DAG
- 从云存储下载一个 csv 文件
- 通过 https 将 csv 文件上传到第三方
我正在执行的 airflow 集群默认使用 CeleryExecutor
,所以我担心在某些时候当我扩大 worker 数量时,这些任务可能会在不同的 worker 上执行。例如。工作人员 A 进行下载,工作人员 B 尝试上传,但没有找到文件(因为它在工作人员 A 上)
是否有可能以某种方式保证下载和上传运算符都在同一个气流工作者上执行?
对于这类用例,我们有两种解决方案:
- 使用两者共享的网络安装驱动器 工作人员,以便下载和上传任务都可以访问 到同一个文件系统
- 使用特定于工作人员的 Airflow queue。如果只有一个工作人员在监听这个队列,你将保证他们都可以访问同一个文件系统。请注意,每个工作人员都可以监听多个队列,因此您可以让它监听 "default" 队列以及用于此任务的自定义队列。
将第 1 步(csv 下载)和第 2 步(csv 上传)放入 subdag,然后通过 SubDagOperator 将 executor
选项设置为 SequentialExecutor
来触发它 - 这个将确保步骤 1 和 2 运行 在同一个工人身上。
这是一个说明该概念的工作 DAG 文件(实际操作作为 DummyOperators 存根),在一些更大的过程的上下文中有 download/upload 步骤:
from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.subdag_operator import SubDagOperator
from airflow.executors.sequential_executor import SequentialExecutor
PARENT_DAG_NAME='subdaggy'
CHILD_DAG_NAME='subby'
def make_sub_dag(parent_dag_name, child_dag_name, start_date, schedule_interval):
dag = DAG(
'%s.%s' % (parent_dag_name, child_dag_name),
schedule_interval=schedule_interval,
start_date=start_date
)
task_download = DummyOperator(
task_id = 'task_download_csv',
dag=dag
)
task_upload = DummyOperator(
task_id = 'task_upload_csv',
dag=dag
)
task_download >> task_upload
return dag
main_dag = DAG(
PARENT_DAG_NAME,
schedule_interval=None,
start_date=datetime(2017,1,1)
)
main_task_1 = DummyOperator(
task_id = 'main_1',
dag = main_dag
)
main_task_2 = SubDagOperator(
task_id = CHILD_DAG_NAME,
subdag=make_sub_dag(PARENT_DAG_NAME, CHILD_DAG_NAME, main_dag.start_date, main_dag.schedule_interval),
executor=SequentialExecutor(),
dag=main_dag
)
main_task_3 = DummyOperator(
task_id = 'main_3',
dag = main_dag
)
main_task_1 >> main_task_2 >> main_task_3