Airflow 如何用于 运行 不同机器中一个工作流的不同任务?

How can Airflow be used to run distinct tasks of one workflow in separate machines?

免责声明:我(还)不是 Airflow 的用户,今天才发现它,我开始探索它是否适合我的用例。

我有一个数据处理工作流,它是多个任务的顺序(非并行)执行。但是,某些任务需要 运行 在特定机器上执行。 Airflow 可以解决这个问题吗?这个用例的建议实施模型是什么?

谢谢。

是的,您可以使用 queues 在 Airflow 中实现此目的。您可以将任务绑定到特定队列。然后对于机器上的每个工作人员,您可以将其设置为仅从 select 个队列中提取任务。

在代码中,它看起来像这样:

task_1 = BashOperator(
    dag=dag,
    task_id='task_a',
    ...
)

task_2 = PythonOperator(
    dag=dag,
    task_id='task_b',
    queue='special',
    ...
)

注意airflow.cfg里面有这个设置:

# Default queue that tasks get assigned to and that worker listen on.
default_queue = default

因此,如果您以此开始您的工作人员:

Server A> airflow worker
Server B> airflow worker --queues special
Server C> airflow worker --queues default,special

那么task_1可以被A+C服务器拾取,task_2可以被B+C服务器拾取。