Airflow - 任务中的并行执行

Airflow - Parallel execution within a Task

我有一个数据库 table 看起来像:

name        last_successful
dataset1    2020-11-07 10:30
dataset2    2020-11-07 10:30

现在我想在 airflow 中安排一个执行以下操作的工作流:

  1. 从数据库中读取所有数据集 namelast_successful
  2. 对于每个数据集,检查对应于下一个 30 分钟桶的数据是否完成 last_successful + 30 min(假设这是一个黑盒子)
  3. 在与步骤 2 中找到的任何已完成存储桶相对应的 s3 数据分区中写入一个 _SUCCESS 文件,并在数据库中更新 last_successful

由于数据集是相互独立的,所以步骤2+3可以针对每个数据集并行化。我怎样才能在气流中做到这一点?我不想为每个数据集创建单独的任务,因为数据集列表会不断增加。

对评论的讨论进行总结。

创建一个 DAG 作为其从数据库中解析读取记录的一部分是一种不好的做法。 Airflow 不断地解析文件,并会打开一个到数据库的连接来获取最新的记录。这意味着数据库上的连接负载很重。 出于这个原因,Airflow 将为尝试使用此方法的用户添加警告(请参阅 issue)。虽然警告出现在 Airflow Metastore 后端,但同样适用于任何其他数据库。

您可以通过在文件中列出数据集名称来处理您的用例。如果您要创建一个带有动态任务的 DAG,您可以为文件中的任何新条目打开一个“分支”。然后您可以在专用运算符中将条目与您的数据库进行比较,以确保它是有效的。

该方法可能类似于:

def get_file():
    with open('your_file') as f:
         lines = f.read().splitlines()
    return lines

file_list = get_file()
 with DAG(dag_id='my_dag',...
) as dag:
   start_op = DummyOperator(task_id='start_task')
   for dataset in file_list:
          my_op = MyOperator(task_id=dataset)
          start_op >> my_op

这样,每次您将新数据集添加到文件时,它都会自动为它添加 MyOperator

分支