Airflow:为每个文件 运行 DAG 的正确方法
Airflow: Proper way to run DAG for each file
我有以下任务要解决:
Files are being sent at irregular times through an endpoint and stored locally. I need to trigger a DAG run for each of these files. For each file the same tasks will be performed
总体流程如下:对于每个文件,运行 任务 A->B->C->D
正在批量处理文件。虽然这个任务对我来说似乎微不足道,但我找到了几种方法来做到这一点,但我很困惑哪一个是 "proper" 那个(如果有的话)。
第一种模式:使用实验性 REST API 触发 dag。
即公开一个 Web 服务,该服务接收请求和文件,将其存储到文件夹中,并使用 experimental REST api 触发 DAG,方法是将 file_id 作为 conf
缺点:REST api 仍然是实验性的,不确定 Airflow 如何处理同时有许多请求的负载测试(这不应该发生,但是,如果发生了怎么办?)
第二个模式:2个DAG。一种是使用 TriggerDagOperator 进行感知和触发,一种是处理。
始终使用与之前描述的相同的 ws,但这次它只存储文件。那么我们有:
- 第一个 dag:使用 FileSensor 和 TriggerDagOperator 触发给定 N 个文件的 N 个 dag
- 第二个 dag:任务 A->B->C
缺点:需要避免将相同的文件发送到两个不同的 DAG 运行。
示例:
文件夹中的文件 x.json
传感器找到 x,触发 DAG (1)
传感器返回并重新安排。如果 DAG (1) 没有 process/move 文件,则传感器 DAG 可能会重新安排具有相同文件的新 DAG 运行。这是不需要的。
第三种模式:文件中文件,任务A->B->C
如 中所示。
缺点:这可以工作,但是我不喜欢的是 UI 可能会搞砸,因为每个 DAG 运行 看起来都不会相同,但它会随着正在处理的文件数量而变化。此外,如果有 1000 个文件要处理,运行 可能很难阅读
第四种模式:使用子标签
我还不确定它们是如何像我看到的那样完全工作的 they are not encouraged (at the end), however it should be possible to spawn a subdag for each file and have it running. Similar to 。
缺点:似乎子标签只能与顺序执行器一起使用。
我是不是遗漏了一些东西并且过度思考了一些应该(在我看来)非常简单的东西?谢谢
似乎您应该能够 运行 带有 bash 运算符的批处理器 dag 来清除文件夹,只需确保在 dag 上设置 depends_on_past=True
以确保在下次安排 dag 之前,文件夹已成功清除。
我找到这篇文章:https://medium.com/@igorlubimov/dynamic-scheduling-in-airflow-52979b3e6b13
这里使用了一个新的算子,即TriggerMultiDagRunOperator。我认为这符合我的需要。
我知道我迟到了,但我会选择第二种模式:“2 个 dags。一个使用 TriggerDagOperator 感知和触发,一个过程”,因为:
- 每个文件都可以并行执行
- 第一个 DAG 可以选择要处理的文件,重命名它(添加后缀“_processing”或将其移动到处理文件夹)
- 如果我是你们公司的新开发人员,打开工作流,我想了解工作流的逻辑是什么,而不是上次动态构建时处理了哪些文件
- 如果 dag 2 发现文件有问题,则会对其重命名(使用“_error”后缀或将其移至错误文件夹)
- 这是一种无需创建任何额外运算符即可处理文件的标准方法
- 它使 de DAG 幂等且更易于测试。更多信息在此 article
重命名 and/or 移动文件是在每个 ETL 中处理文件的一种非常标准的方法。
顺便说一句,我一直推荐这篇文章https://medium.com/bluecore-engineering/were-all-using-airflow-wrong-and-how-to-fix-it-a56f14cb0753。没有
我有以下任务要解决:
Files are being sent at irregular times through an endpoint and stored locally. I need to trigger a DAG run for each of these files. For each file the same tasks will be performed
总体流程如下:对于每个文件,运行 任务 A->B->C->D
正在批量处理文件。虽然这个任务对我来说似乎微不足道,但我找到了几种方法来做到这一点,但我很困惑哪一个是 "proper" 那个(如果有的话)。
第一种模式:使用实验性 REST API 触发 dag。
即公开一个 Web 服务,该服务接收请求和文件,将其存储到文件夹中,并使用 experimental REST api 触发 DAG,方法是将 file_id 作为 conf
缺点:REST api 仍然是实验性的,不确定 Airflow 如何处理同时有许多请求的负载测试(这不应该发生,但是,如果发生了怎么办?)
第二个模式:2个DAG。一种是使用 TriggerDagOperator 进行感知和触发,一种是处理。
始终使用与之前描述的相同的 ws,但这次它只存储文件。那么我们有:
- 第一个 dag:使用 FileSensor 和 TriggerDagOperator 触发给定 N 个文件的 N 个 dag
- 第二个 dag:任务 A->B->C
缺点:需要避免将相同的文件发送到两个不同的 DAG 运行。 示例:
文件夹中的文件 x.json 传感器找到 x,触发 DAG (1)
传感器返回并重新安排。如果 DAG (1) 没有 process/move 文件,则传感器 DAG 可能会重新安排具有相同文件的新 DAG 运行。这是不需要的。
第三种模式:文件中文件,任务A->B->C
如
缺点:这可以工作,但是我不喜欢的是 UI 可能会搞砸,因为每个 DAG 运行 看起来都不会相同,但它会随着正在处理的文件数量而变化。此外,如果有 1000 个文件要处理,运行 可能很难阅读
第四种模式:使用子标签
我还不确定它们是如何像我看到的那样完全工作的 they are not encouraged (at the end), however it should be possible to spawn a subdag for each file and have it running. Similar to
缺点:似乎子标签只能与顺序执行器一起使用。
我是不是遗漏了一些东西并且过度思考了一些应该(在我看来)非常简单的东西?谢谢
似乎您应该能够 运行 带有 bash 运算符的批处理器 dag 来清除文件夹,只需确保在 dag 上设置 depends_on_past=True
以确保在下次安排 dag 之前,文件夹已成功清除。
我找到这篇文章:https://medium.com/@igorlubimov/dynamic-scheduling-in-airflow-52979b3e6b13
这里使用了一个新的算子,即TriggerMultiDagRunOperator。我认为这符合我的需要。
我知道我迟到了,但我会选择第二种模式:“2 个 dags。一个使用 TriggerDagOperator 感知和触发,一个过程”,因为:
- 每个文件都可以并行执行
- 第一个 DAG 可以选择要处理的文件,重命名它(添加后缀“_processing”或将其移动到处理文件夹)
- 如果我是你们公司的新开发人员,打开工作流,我想了解工作流的逻辑是什么,而不是上次动态构建时处理了哪些文件
- 如果 dag 2 发现文件有问题,则会对其重命名(使用“_error”后缀或将其移至错误文件夹)
- 这是一种无需创建任何额外运算符即可处理文件的标准方法
- 它使 de DAG 幂等且更易于测试。更多信息在此 article
重命名 and/or 移动文件是在每个 ETL 中处理文件的一种非常标准的方法。
顺便说一句,我一直推荐这篇文章https://medium.com/bluecore-engineering/were-all-using-airflow-wrong-and-how-to-fix-it-a56f14cb0753。没有