从单个 python 源代码生成数百个 DAG 时气流变慢

Airflow slows down when generating hundreds of DAGs from single python source code

在我们的大数据项目中,有大约 3000 table 需要加载,所有这些 table 都应该由 Airflow 中的单独 DAG 处理。

在我们的解决方案中,单个 python 文件生成每种类型的 table 加载程序,因此它们可以通过 REST API 以基于事件的方式通过 Cloud Function 单独触发。 因此,我们使用以下方法生成 DAG:

不幸的是,我们绑定了 Airflow 版本 v1.x.x

问题:

我们注意到,当生成多个 DAG 时,Airflow/Cloud Composer 在任务执行之间明显变慢。 当只生成 10-20 个 DAG 时,任务执行之间的时间比我们有 100-200 个 DAG 快得多。 当生成 1000 个 DAG 时,即使没有执行其他 DAG,在完成给定 DAG 的先前任务后也需要几分钟才能开始新任务。

我们不明白为什么任务执行时间受生成的 DAG 数量的影响如此之大。 Airflow 不应该在接近固定时间的情况下在元数据库中搜索 TaskInstances 所需的参数吗? Google.

我们不确定 Cloud Composer 是否 configured/scaled/managed 正确

问题:


这是我们使用的生成器代码的一个非常简单的示例:

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

def create_dag(dag_id, schedule, dag_number, default_args):

    def example(*args):
        print('Example DAG: {}'.format(str(dag_number)))

    dag = DAG(dag_id, schedule_interval=schedule, default_args=default_args)

    with dag:
        t1 = PythonOperator(task_id='example', python_callable=example)

    return dag


for dag_number  in range(1, 5000):
    dag_id = 'Example_{}'.format(str(dag_number))
    default_args = {'owner': 'airflow', 'start_date': datetime(2021, 1, 1)}
    globals()[dag_id] = create_dag(dag_id, '@daily', dag_number, default_args)

是的。这是一个已知问题。它已在 Airflow 2 中修复。

这是 Airflow 1 中处理 DAG 文件的固有方式(主要与生成的查询数量有关)。

除了迁移到 Airflow 2 之外,您无能为力。解决这个问题需要完全重构和半重写 Airflow 调度程序逻辑。

缓解它的一种方法 - 您可能不是从单个文件生成所有 DAG,而是将其拆分为多个 DAG。例如,不是在单个 Python 文件中生成 DAG 对象,而是可以生成 3000 个独立的、动态生成的小型 DAG 文件。这将更好地扩展。

不过,好消息是在 Airflow 2 中,速度快了许多倍且可扩展。 Airlfow 1.10 已停产,不再受支持,也不会再收到任何更新。因此,与其更改流程,不如我衷心建议迁移。