您如何确保同一管道不会同时执行两次
How can you ensure, that the same pipeline is not executed twice at the same time
嘿 :) 我有一个关于锁定或互斥行为的问题。
场景:
让我们假设以下场景:
- 管道正在处理一些本地文件。这些文件由 CI-CD 作业放置。处理后我想删除文件。如果作业花费的时间超过计划间隔
,这将导致竞争条件
- 两个管道占用大量资源,因此不能 运行 并行。
可能的解决方案
- 目前我会在 运行ning 服务中使用某种 Mutex 或 Lock,其中管道可以注册并允许或不执行。
- 复制数据以确保每个工作流都可以清理和使用自己的数据。
- 创建本地锁定文件,并确保成功后该文件将被删除。
- 创建一个较小的调度间隔并检查是否存在锁定。如果不满足条件,则干净地退出。
我知道这可能不是 dagster 的正常用例,但我也想将 dagster 用于其他工作流,例如清理任务和触发其他管道。
谢谢
我不熟悉 dagster,但我在其他上下文中成功使用的一种机制是利用在类 Unix 系统中重命名或 mv 是原子操作这一事实。对于 post 运行 清理的第一个要求:
新文件被放入输入目录。一组输入文件可以隔离在它们自己的目录中。
当管道进程启动时,它的第一个操作是从输入目录select一个文件(或目录)并将其移动到管道实例拥有的工作目录。如果输入目录中没有可用文件,进程将自行关闭。
如果 mv 成功,进程将继续对刚刚移动到其工作目录的文件(目录)执行操作。完成后,它会自行清理,可能是通过对其工作目录进行递归删除。
如果 mv 失败,则意味着另一个进程从这个文件下抓取了新文件。失败的进程自行关闭。
对于一次仅 运行 一个管道进程的第二个要求,您可以使用独占创建哨兵文件并让进程失败并在未成功创建哨兵文件时退出.在 python 3 中,代码可能类似于
try:
open('sentinel', 'x').close()
except FileExistsError:
exit("someone else already has sentinel")
do_stuff()
os.remove('sentinel')
当然,如果您的进程在 do_stuff() 的某处崩溃,您将不得不手动清理 sentinel 文件,或者您可以使用 atexit 处理程序来确保甚至删除 sentinel在 do_stuff().
崩溃的情况下
感谢您分享您的用例。我认为 Dagster 当前不支持这些特性。然而 0.10.0 版本(几个月后)将包括 运行 级队列,允许您对并发管道设置 运行s 限制。目前它只支持 运行s 的全局限制,但很快将支持添加基于管道标签的规则(例如,标记为 'resource-heavy' 的管道可以限制为 3 个并发 运行s)。看起来这可能适合这个用例?
预览当前排队系统的指南是here。也可以随时通过@johann 的 Dagster slack 与我联系!
针对场景 #2(处理资源占用非常大且不能 运行 并行的管道)的建议是使用 dagster 的 Celery integrations, such as the celery_executor, celery_docker_executor, or the celery_k8s_job_executor(如果您使用的是 kubernetes)。
这些工作的方式是 Dagster 管道 运行 协调器将每个可靠的执行任务添加到 Celery 队列中,Celery 允许您限制每个队列中活动任务的数量。例如,这通常用于确保在给定时间只有 X 个实体连接到 Redshift。
Dagster 还支持使用多个队列,因此您可以为资源密集型实体创建一个队列,为非资源密集型实体创建另一个队列(具有更高的并发限制)。
关于场景 #1,我不确定您有什么设计限制。一种想法是使用管道 运行 标签的标记方案来跟踪哪个管道 运行 对应于哪个文件;然后对于每个文件,执行文件清理的过程在删除之前首先验证是否存在成功的管道 运行(通过查询 运行s 数据库)。
嘿 :) 我有一个关于锁定或互斥行为的问题。
场景:
让我们假设以下场景:
- 管道正在处理一些本地文件。这些文件由 CI-CD 作业放置。处理后我想删除文件。如果作业花费的时间超过计划间隔 ,这将导致竞争条件
- 两个管道占用大量资源,因此不能 运行 并行。
可能的解决方案
- 目前我会在 运行ning 服务中使用某种 Mutex 或 Lock,其中管道可以注册并允许或不执行。
- 复制数据以确保每个工作流都可以清理和使用自己的数据。
- 创建本地锁定文件,并确保成功后该文件将被删除。
- 创建一个较小的调度间隔并检查是否存在锁定。如果不满足条件,则干净地退出。
我知道这可能不是 dagster 的正常用例,但我也想将 dagster 用于其他工作流,例如清理任务和触发其他管道。
谢谢
我不熟悉 dagster,但我在其他上下文中成功使用的一种机制是利用在类 Unix 系统中重命名或 mv 是原子操作这一事实。对于 post 运行 清理的第一个要求:
新文件被放入输入目录。一组输入文件可以隔离在它们自己的目录中。
当管道进程启动时,它的第一个操作是从输入目录select一个文件(或目录)并将其移动到管道实例拥有的工作目录。如果输入目录中没有可用文件,进程将自行关闭。
如果 mv 成功,进程将继续对刚刚移动到其工作目录的文件(目录)执行操作。完成后,它会自行清理,可能是通过对其工作目录进行递归删除。
如果 mv 失败,则意味着另一个进程从这个文件下抓取了新文件。失败的进程自行关闭。
对于一次仅 运行 一个管道进程的第二个要求,您可以使用独占创建哨兵文件并让进程失败并在未成功创建哨兵文件时退出.在 python 3 中,代码可能类似于
try:
open('sentinel', 'x').close()
except FileExistsError:
exit("someone else already has sentinel")
do_stuff()
os.remove('sentinel')
当然,如果您的进程在 do_stuff() 的某处崩溃,您将不得不手动清理 sentinel 文件,或者您可以使用 atexit 处理程序来确保甚至删除 sentinel在 do_stuff().
崩溃的情况下感谢您分享您的用例。我认为 Dagster 当前不支持这些特性。然而 0.10.0 版本(几个月后)将包括 运行 级队列,允许您对并发管道设置 运行s 限制。目前它只支持 运行s 的全局限制,但很快将支持添加基于管道标签的规则(例如,标记为 'resource-heavy' 的管道可以限制为 3 个并发 运行s)。看起来这可能适合这个用例?
预览当前排队系统的指南是here。也可以随时通过@johann 的 Dagster slack 与我联系!
针对场景 #2(处理资源占用非常大且不能 运行 并行的管道)的建议是使用 dagster 的 Celery integrations, such as the celery_executor, celery_docker_executor, or the celery_k8s_job_executor(如果您使用的是 kubernetes)。
这些工作的方式是 Dagster 管道 运行 协调器将每个可靠的执行任务添加到 Celery 队列中,Celery 允许您限制每个队列中活动任务的数量。例如,这通常用于确保在给定时间只有 X 个实体连接到 Redshift。
Dagster 还支持使用多个队列,因此您可以为资源密集型实体创建一个队列,为非资源密集型实体创建另一个队列(具有更高的并发限制)。
关于场景 #1,我不确定您有什么设计限制。一种想法是使用管道 运行 标签的标记方案来跟踪哪个管道 运行 对应于哪个文件;然后对于每个文件,执行文件清理的过程在删除之前首先验证是否存在成功的管道 运行(通过查询 运行s 数据库)。