如果传感器数量大于并发数,气流芹菜工人会被阻塞吗?

Airflow celery worker will be blocked if sensor number large than concurrency?

比方说,我将芹菜并发设置为 n,但我有 m(m>n) ExternalTask​​Sensor 在一个 dag 中,它会检查另一个名为 do_sth 的 dag,这些 ExternalTask​​Sensor 将消耗所有 celery worker,所以事实上没有人会工作。

但是我不能把并发设置得太高(比如2*m),因为dag do_sth可能会启动太多的进程会导致内存不足。

我很困惑我应该为 celery 并发设置多少?

作者在 ETL best practices with Airflow's Gotchas section 中解决了这个一般性问题。其中一项建议是为您的传感器任务设置一个池,这样您的其他任务就不会被饿死。根据您的情况,一次确定您想要 运行 的传感器任务数(小于您的并发级别),并以此为限设置一个池。设置池后,将池参数传递给每个传感器操作员。 有关池的更多信息,请参阅 Airflow's documentation on concepts。这是将池参数传递给运算符的示例:

aggregate_db_message_job = BashOperator( 
    task_id='aggregate_db_message_job', 
    execution_timeout=timedelta(hours=3), 
    pool='ep_data_pipeline_db_msg_agg',
    bash_command=aggregate_db_message_job_cmd, dag=dag)