如果传感器数量大于并发数,气流芹菜工人会被阻塞吗?
Airflow celery worker will be blocked if sensor number large than concurrency?
比方说,我将芹菜并发设置为 n,但我有 m(m>n) ExternalTaskSensor 在一个 dag 中,它会检查另一个名为 do_sth 的 dag,这些 ExternalTaskSensor 将消耗所有 celery worker,所以事实上没有人会工作。
但是我不能把并发设置得太高(比如2*m),因为dag do_sth可能会启动太多的进程会导致内存不足。
我很困惑我应该为 celery 并发设置多少?
作者在 ETL best practices with Airflow's Gotchas section 中解决了这个一般性问题。其中一项建议是为您的传感器任务设置一个池,这样您的其他任务就不会被饿死。根据您的情况,一次确定您想要 运行 的传感器任务数(小于您的并发级别),并以此为限设置一个池。设置池后,将池参数传递给每个传感器操作员。
有关池的更多信息,请参阅 Airflow's documentation on concepts。这是将池参数传递给运算符的示例:
aggregate_db_message_job = BashOperator(
task_id='aggregate_db_message_job',
execution_timeout=timedelta(hours=3),
pool='ep_data_pipeline_db_msg_agg',
bash_command=aggregate_db_message_job_cmd, dag=dag)
比方说,我将芹菜并发设置为 n,但我有 m(m>n) ExternalTaskSensor 在一个 dag 中,它会检查另一个名为 do_sth 的 dag,这些 ExternalTaskSensor 将消耗所有 celery worker,所以事实上没有人会工作。
但是我不能把并发设置得太高(比如2*m),因为dag do_sth可能会启动太多的进程会导致内存不足。
我很困惑我应该为 celery 并发设置多少?
作者在 ETL best practices with Airflow's Gotchas section 中解决了这个一般性问题。其中一项建议是为您的传感器任务设置一个池,这样您的其他任务就不会被饿死。根据您的情况,一次确定您想要 运行 的传感器任务数(小于您的并发级别),并以此为限设置一个池。设置池后,将池参数传递给每个传感器操作员。 有关池的更多信息,请参阅 Airflow's documentation on concepts。这是将池参数传递给运算符的示例:
aggregate_db_message_job = BashOperator(
task_id='aggregate_db_message_job',
execution_timeout=timedelta(hours=3),
pool='ep_data_pipeline_db_msg_agg',
bash_command=aggregate_db_message_job_cmd, dag=dag)