Airflow - 任务中的并行执行
Airflow - Parallel execution within a Task
我有一个数据库 table 看起来像:
name last_successful
dataset1 2020-11-07 10:30
dataset2 2020-11-07 10:30
现在我想在 airflow 中安排一个执行以下操作的工作流:
- 从数据库中读取所有数据集
name
和 last_successful
。
- 对于每个数据集,检查对应于下一个 30 分钟桶的数据是否完成
last_successful + 30 min
(假设这是一个黑盒子)
- 在与步骤 2 中找到的任何已完成存储桶相对应的 s3 数据分区中写入一个 _SUCCESS 文件,并在数据库中更新
last_successful
。
由于数据集是相互独立的,所以步骤2+3可以针对每个数据集并行化。我怎样才能在气流中做到这一点?我不想为每个数据集创建单独的任务,因为数据集列表会不断增加。
对评论的讨论进行总结。
创建一个 DAG 作为其从数据库中解析读取记录的一部分是一种不好的做法。 Airflow 不断地解析文件,并会打开一个到数据库的连接来获取最新的记录。这意味着数据库上的连接负载很重。
出于这个原因,Airflow 将为尝试使用此方法的用户添加警告(请参阅 issue)。虽然警告出现在 Airflow Metastore 后端,但同样适用于任何其他数据库。
您可以通过在文件中列出数据集名称来处理您的用例。如果您要创建一个带有动态任务的 DAG,您可以为文件中的任何新条目打开一个“分支”。然后您可以在专用运算符中将条目与您的数据库进行比较,以确保它是有效的。
该方法可能类似于:
def get_file():
with open('your_file') as f:
lines = f.read().splitlines()
return lines
file_list = get_file()
with DAG(dag_id='my_dag',...
) as dag:
start_op = DummyOperator(task_id='start_task')
for dataset in file_list:
my_op = MyOperator(task_id=dataset)
start_op >> my_op
这样,每次您将新数据集添加到文件时,它都会自动为它添加 MyOperator
分支
我有一个数据库 table 看起来像:
name last_successful
dataset1 2020-11-07 10:30
dataset2 2020-11-07 10:30
现在我想在 airflow 中安排一个执行以下操作的工作流:
- 从数据库中读取所有数据集
name
和last_successful
。 - 对于每个数据集,检查对应于下一个 30 分钟桶的数据是否完成
last_successful + 30 min
(假设这是一个黑盒子) - 在与步骤 2 中找到的任何已完成存储桶相对应的 s3 数据分区中写入一个 _SUCCESS 文件,并在数据库中更新
last_successful
。
由于数据集是相互独立的,所以步骤2+3可以针对每个数据集并行化。我怎样才能在气流中做到这一点?我不想为每个数据集创建单独的任务,因为数据集列表会不断增加。
对评论的讨论进行总结。
创建一个 DAG 作为其从数据库中解析读取记录的一部分是一种不好的做法。 Airflow 不断地解析文件,并会打开一个到数据库的连接来获取最新的记录。这意味着数据库上的连接负载很重。 出于这个原因,Airflow 将为尝试使用此方法的用户添加警告(请参阅 issue)。虽然警告出现在 Airflow Metastore 后端,但同样适用于任何其他数据库。
您可以通过在文件中列出数据集名称来处理您的用例。如果您要创建一个带有动态任务的 DAG,您可以为文件中的任何新条目打开一个“分支”。然后您可以在专用运算符中将条目与您的数据库进行比较,以确保它是有效的。
该方法可能类似于:
def get_file():
with open('your_file') as f:
lines = f.read().splitlines()
return lines
file_list = get_file()
with DAG(dag_id='my_dag',...
) as dag:
start_op = DummyOperator(task_id='start_task')
for dataset in file_list:
my_op = MyOperator(task_id=dataset)
start_op >> my_op
这样,每次您将新数据集添加到文件时,它都会自动为它添加 MyOperator