Airflow 如何用于 运行 不同机器中一个工作流的不同任务?
How can Airflow be used to run distinct tasks of one workflow in separate machines?
免责声明:我(还)不是 Airflow 的用户,今天才发现它,我开始探索它是否适合我的用例。
我有一个数据处理工作流,它是多个任务的顺序(非并行)执行。但是,某些任务需要 运行 在特定机器上执行。 Airflow 可以解决这个问题吗?这个用例的建议实施模型是什么?
谢谢。
是的,您可以使用 queues 在 Airflow 中实现此目的。您可以将任务绑定到特定队列。然后对于机器上的每个工作人员,您可以将其设置为仅从 select 个队列中提取任务。
在代码中,它看起来像这样:
task_1 = BashOperator(
dag=dag,
task_id='task_a',
...
)
task_2 = PythonOperator(
dag=dag,
task_id='task_b',
queue='special',
...
)
注意airflow.cfg里面有这个设置:
# Default queue that tasks get assigned to and that worker listen on.
default_queue = default
因此,如果您以此开始您的工作人员:
Server A> airflow worker
Server B> airflow worker --queues special
Server C> airflow worker --queues default,special
那么task_1可以被A+C服务器拾取,task_2可以被B+C服务器拾取。
免责声明:我(还)不是 Airflow 的用户,今天才发现它,我开始探索它是否适合我的用例。
我有一个数据处理工作流,它是多个任务的顺序(非并行)执行。但是,某些任务需要 运行 在特定机器上执行。 Airflow 可以解决这个问题吗?这个用例的建议实施模型是什么?
谢谢。
是的,您可以使用 queues 在 Airflow 中实现此目的。您可以将任务绑定到特定队列。然后对于机器上的每个工作人员,您可以将其设置为仅从 select 个队列中提取任务。
在代码中,它看起来像这样:
task_1 = BashOperator(
dag=dag,
task_id='task_a',
...
)
task_2 = PythonOperator(
dag=dag,
task_id='task_b',
queue='special',
...
)
注意airflow.cfg里面有这个设置:
# Default queue that tasks get assigned to and that worker listen on.
default_queue = default
因此,如果您以此开始您的工作人员:
Server A> airflow worker
Server B> airflow worker --queues special
Server C> airflow worker --queues default,special
那么task_1可以被A+C服务器拾取,task_2可以被B+C服务器拾取。