以用户为中心的工作流程的 Airflow DAG 设计
Airflow DAG design for a user centric workflow
我们正在考虑使用气流来替换我们当前基于自定义 rq 的工作流,但我不确定设计它的最佳方式。或者如果使用气流甚至有意义。
用例是:
- 我们收到用户上传的数据。
- 鉴于接收到的数据类型,我们可以选择运行零个或多个作业
- 每个作业 运行s 如果收到特定的数据类型组合。根据收到的数据
确定的时间范围内,该用户 运行s
- 作业从数据库中读取数据并将结果写入数据库。
- 这些作业可能会触发更多作业。
例如
数据上传后,我们将一个项目放入队列:
上传:
user: 'a'
data:
- type: datatype1
start: 1
end: 3
- type: datatype2
start: 2
end: 3
这会触发:
- 工作 1,用户 'a',开始:1,结束:3
- job2,用户 'a',开始:2,结束:3
然后可能 job1 会在 运行 之后进行一些清理工作。
(如果同一用户没有其他作业 运行ning 时能够将作业限制为仅 运行 也很好。)
我考虑过的方法:
1.
当数据上传到达消息队列时触发 DAG。
然后这个 DAG 确定哪些依赖作业到 运行 并将用户和时间范围作为参数(或 xcom)传递。
2.
当数据上传到达消息队列时触发 DAG。
然后此 DAG 会根据用户和时间范围内的数据类型和模板为作业动态创建 DAGS。
因此您可以获得每个用户、工作、时间范围组合的动态 DAG。
我什至不确定如何从消息队列中触发 DAG...并且发现很难找到与此用例类似的示例。也许那是因为 Airflow 不适合?
任何 help/thoughts/advice 将不胜感激。
谢谢。
Airflow 是围绕基于时间的时间表构建的。它不是为了根据数据的着陆来触发运行而构建的。有其他系统设计用于执行此操作。我听到了类似 pachyderm.io 或 dvs.org 的声音。甚至重新利用 CI 工具或自定义 Jenkins 设置都可以根据文件更改事件或消息队列触发。
不过,您可以尝试通过让外部队列侦听器使用 rest API calls to Airflow 触发 DAG 来使用 Airflow。 EG,如果队列是 AWS SNS 队列,您可以简单地使用 AWS Lambda 侦听器 Python 执行此操作。
我建议每个作业类型(或者是用户,以较小者为准)一个 DAG,触发器逻辑根据队列确定它是正确的。如果有常见的清理任务等,DAG 可能会使用 TriggerDagRunOperator 来启动这些任务,或者您可能只是拥有每个 DAG 包含的那些清理任务的公共库。我觉得后者更干净
DAG 可以将其任务限制在某些池中。您可以为每个用户创建一个池,以限制每个用户的作业运行。或者,如果每个用户有一个 DAG,则可以将该 DAG 的最大并发 DAG 运行设置为合理的值。
我们正在考虑使用气流来替换我们当前基于自定义 rq 的工作流,但我不确定设计它的最佳方式。或者如果使用气流甚至有意义。 用例是:
- 我们收到用户上传的数据。
- 鉴于接收到的数据类型,我们可以选择运行零个或多个作业
- 每个作业 运行s 如果收到特定的数据类型组合。根据收到的数据 确定的时间范围内,该用户 运行s
- 作业从数据库中读取数据并将结果写入数据库。
- 这些作业可能会触发更多作业。
例如
数据上传后,我们将一个项目放入队列:
上传:
user: 'a'
data:
- type: datatype1
start: 1
end: 3
- type: datatype2
start: 2
end: 3
这会触发:
- 工作 1,用户 'a',开始:1,结束:3
- job2,用户 'a',开始:2,结束:3
然后可能 job1 会在 运行 之后进行一些清理工作。 (如果同一用户没有其他作业 运行ning 时能够将作业限制为仅 运行 也很好。)
我考虑过的方法:
1.
当数据上传到达消息队列时触发 DAG。
然后这个 DAG 确定哪些依赖作业到 运行 并将用户和时间范围作为参数(或 xcom)传递。
2.
当数据上传到达消息队列时触发 DAG。
然后此 DAG 会根据用户和时间范围内的数据类型和模板为作业动态创建 DAGS。
因此您可以获得每个用户、工作、时间范围组合的动态 DAG。
我什至不确定如何从消息队列中触发 DAG...并且发现很难找到与此用例类似的示例。也许那是因为 Airflow 不适合?
任何 help/thoughts/advice 将不胜感激。
谢谢。
Airflow 是围绕基于时间的时间表构建的。它不是为了根据数据的着陆来触发运行而构建的。有其他系统设计用于执行此操作。我听到了类似 pachyderm.io 或 dvs.org 的声音。甚至重新利用 CI 工具或自定义 Jenkins 设置都可以根据文件更改事件或消息队列触发。
不过,您可以尝试通过让外部队列侦听器使用 rest API calls to Airflow 触发 DAG 来使用 Airflow。 EG,如果队列是 AWS SNS 队列,您可以简单地使用 AWS Lambda 侦听器 Python 执行此操作。
我建议每个作业类型(或者是用户,以较小者为准)一个 DAG,触发器逻辑根据队列确定它是正确的。如果有常见的清理任务等,DAG 可能会使用 TriggerDagRunOperator 来启动这些任务,或者您可能只是拥有每个 DAG 包含的那些清理任务的公共库。我觉得后者更干净
DAG 可以将其任务限制在某些池中。您可以为每个用户创建一个池,以限制每个用户的作业运行。或者,如果每个用户有一个 DAG,则可以将该 DAG 的最大并发 DAG 运行设置为合理的值。