在 google 应用引擎上使用任务队列时如何确定任务的优先级?
How can tasks be prioritized when using the task queue on google app engine?
我正在尝试解决以下问题:
- 我有一系列 "tasks" 我想执行
- 我有固定数量的工人来执行这些工人(因为他们使用 urlfetch 调用外部 API 并且对此 API 的并行调用数量有限)
- 我希望执行这些 "tasks" "as soon as possible"(即最小延迟)
- 这些任务是较大任务的一部分,可以根据原始任务的大小进行分类(即一个小的原始任务可能生成 1 到 100 个任务,一个中等的任务可能生成 100 到 1000 个任务,一个大的超过 1000 个任务).
棘手的部分:我想高效地完成所有这些工作(即最小延迟并使用尽可能多的并行 API 调用 - 不超过限制),但同时尝试防止"large"个原始任务生成的大量任务延迟"small"个原始任务生成的任务。
换句话说:我想为每个任务分配一个 "priority",其中 "small" 个任务具有更高的优先级,从而防止 "large" 个任务造成饥饿。
一些搜索似乎并没有表明任何预制的东西都可用,所以我想出了以下内容:
- 创建三个推送队列:
tasks-small
、tasks-medium
、tasks-large
- 为每个设置最大并发请求数,使总数为最大并发API调用数(例如,如果最大并发API调用数为200,我可以将
tasks-small
设置为 max_concurrent_requests
30,tasks-medium
60 和 tasks-large
100)
- 排队任务时,检查编号。每个队列中的待处理任务(使用诸如 QueueStatistics class 之类的东西),并且,如果其他队列未被 100% 使用,则将任务排在那里,否则只需将任务排入具有相应大小的队列中。
例如,如果我们有任务 T1
,它是一个小任务的一部分,首先检查 tasks-small
是否有空闲 "slots" 并将其排入队列。否则检查 tasks-medium
和 tasks-large
。如果其中 none 个有空闲插槽,则无论如何将它排入 tasks-small
队列,它将在处理之前添加任务之后处理(注意:这不是最佳的,因为如果 "slots" 释放在其他队列上,它们仍然不会处理来自 tasks-small
队列的待处理任务)
另一种选择是使用 PULL 队列,并有一个中央 "coordinator" 根据优先级从该队列中拉取并分派它们,但这似乎会增加一点延迟。
然而,这似乎有点老套,我想知道是否有更好的选择。
编辑:经过一些思考和反馈后,我正在考虑通过以下方式使用 PULL 队列:
- 有两个 PULL 队列(
medium-tasks
和 large-tasks
)
- 有一个并发为 1 的调度程序 (PUSH) 队列(以便任何时候只有一个调度任务运行)。派发任务有多种创建方式:
- 通过一分钟一次的 cron 作业
- 将 medium/large 任务添加到推送队列后
- 工作任务完成后
- 有一个工人(PUSH)队列,其并发度等于工人数
工作流程:
- 小任务直接加入worker队列
- 调度程序任务,无论何时被触发,都会执行以下操作:
- 估计空闲工人的数量(通过查看工人队列中 运行 任务的数量)
- 对于任何 "free" 槽,它从 medium/large tasks PULL 队列中获取一个任务并将其排入一个 worker 队列(或更准确地说:将其添加到 worker PUSH 队列,这将导致它被执行 - 最终 - 在一个工人身上)。
一旦实施并至少进行适度测试,我会报告。
small/medium/large 原始任务队列本身不会有太大帮助 - 一旦原始任务入队,它们将继续产生工作任务,甚至可能打破工作任务队列大小限制。所以你需要 pace/control 排队原始任务。
我会跟踪 datastore/GCS 中的 "todo" 原始任务,并仅在相应队列大小足够低(1 或可能 2待处理作业),来自重复任务、cron 作业或延迟任务(取决于执行原始任务排队所需的速率),这将实现所需的节奏和优先级逻辑,就像推送队列调度程序,但没有您提到的额外延迟。
我没有使用拉取队列,但据我了解,它们可能非常适合您的用例。您可以定义 3 个拉取队列,并让 X
工作人员从他们那里拉取任务,首先尝试 "small" 队列,然后在 "medium" 队列为空时移动到 "medium"(其中 X
是您的最大并发数)。您不需要中央调度程序。
但是,即使没有任务(或 X / threadsPerMachine
?),您也需要为 X
名员工付费,或者自己缩小和扩大他们的规模。
所以,这是另一个想法:使用正确的 maximum concurrency
创建一个推送队列。当您收到新任务时,将其信息推送到 datastore 并排队 generic 作业。然后,该通用作业将查询数据存储以按优先顺序查找任务,并执行它找到的第一个任务。这样,下一个作业仍会执行一个短任务,即使该作业已经从一个大任务中排队。
编辑:我现在迁移到一个更简单的解决方案,类似于@eric-simonton 所描述的:
- 我有多个 PULL 队列,每个优先级一个
- 许多工作人员拉取端点(处理程序)
- 处理程序生成一个随机数并执行简单的 "if less than 0.6, try first the small queue and then the large queue, else vice-versa (large then small)"
- 如果工作人员没有任务或出现错误,他们会进行半随机指数退避直到最大超时(即,他们开始每 1 秒拉一次,并且在每次空拉后大约加倍超时,最多 30 秒)
最后一点是必要的 - 除其他原因外 - 因为从 PULL 队列每秒拉取的次数限制为 10k/s:https://cloud.google.com/appengine/docs/python/taskqueue/overview-pull#Python_Leasing_tasks
我实施了更新中描述的解决方案:
- 两个 PULL 队列(中型任务和大型任务)
- 并发数为 1 的调度程序 (PUSH) 队列
- 并发等于worker数量的worker(PUSH)队列
有关详细信息,请参阅问题。一些注意事项:
- 由于最终一致性,任务可见性有一些延迟(即,调度员任务有时看不到拉队列中的任务,即使它们被插入在一起)——我通过添加倒计时 5 来解决秒到调度程序任务,并添加一个 cron 作业,每分钟添加一个调度程序任务(因此,如果原始调度程序任务没有 "see" 来自拉队列的任务,另一个将在稍后出现)
- 确保为每个任务命名,以消除重复分派它们的可能性
- 您不能从 PULL 队列中租用 0 个项目:-)
- 批处理操作有上限,因此您必须对批处理任务队列调用进行自己的批处理
- 似乎没有办法以编程方式获取队列的 "maximum parallelism" 值,因此我不得不在调度程序中对其进行硬编码(以计算它可以安排的任务数量)
- 如果队列中已有一些(至少 10 个)调度程序任务,则不要添加它们
我正在尝试解决以下问题:
- 我有一系列 "tasks" 我想执行
- 我有固定数量的工人来执行这些工人(因为他们使用 urlfetch 调用外部 API 并且对此 API 的并行调用数量有限)
- 我希望执行这些 "tasks" "as soon as possible"(即最小延迟)
- 这些任务是较大任务的一部分,可以根据原始任务的大小进行分类(即一个小的原始任务可能生成 1 到 100 个任务,一个中等的任务可能生成 100 到 1000 个任务,一个大的超过 1000 个任务).
棘手的部分:我想高效地完成所有这些工作(即最小延迟并使用尽可能多的并行 API 调用 - 不超过限制),但同时尝试防止"large"个原始任务生成的大量任务延迟"small"个原始任务生成的任务。
换句话说:我想为每个任务分配一个 "priority",其中 "small" 个任务具有更高的优先级,从而防止 "large" 个任务造成饥饿。
一些搜索似乎并没有表明任何预制的东西都可用,所以我想出了以下内容:
- 创建三个推送队列:
tasks-small
、tasks-medium
、tasks-large
- 为每个设置最大并发请求数,使总数为最大并发API调用数(例如,如果最大并发API调用数为200,我可以将
tasks-small
设置为max_concurrent_requests
30,tasks-medium
60 和tasks-large
100) - 排队任务时,检查编号。每个队列中的待处理任务(使用诸如 QueueStatistics class 之类的东西),并且,如果其他队列未被 100% 使用,则将任务排在那里,否则只需将任务排入具有相应大小的队列中。
例如,如果我们有任务 T1
,它是一个小任务的一部分,首先检查 tasks-small
是否有空闲 "slots" 并将其排入队列。否则检查 tasks-medium
和 tasks-large
。如果其中 none 个有空闲插槽,则无论如何将它排入 tasks-small
队列,它将在处理之前添加任务之后处理(注意:这不是最佳的,因为如果 "slots" 释放在其他队列上,它们仍然不会处理来自 tasks-small
队列的待处理任务)
另一种选择是使用 PULL 队列,并有一个中央 "coordinator" 根据优先级从该队列中拉取并分派它们,但这似乎会增加一点延迟。
然而,这似乎有点老套,我想知道是否有更好的选择。
编辑:经过一些思考和反馈后,我正在考虑通过以下方式使用 PULL 队列:
- 有两个 PULL 队列(
medium-tasks
和large-tasks
) - 有一个并发为 1 的调度程序 (PUSH) 队列(以便任何时候只有一个调度任务运行)。派发任务有多种创建方式:
- 通过一分钟一次的 cron 作业
- 将 medium/large 任务添加到推送队列后
- 工作任务完成后
- 有一个工人(PUSH)队列,其并发度等于工人数
工作流程:
- 小任务直接加入worker队列
- 调度程序任务,无论何时被触发,都会执行以下操作:
- 估计空闲工人的数量(通过查看工人队列中 运行 任务的数量)
- 对于任何 "free" 槽,它从 medium/large tasks PULL 队列中获取一个任务并将其排入一个 worker 队列(或更准确地说:将其添加到 worker PUSH 队列,这将导致它被执行 - 最终 - 在一个工人身上)。
一旦实施并至少进行适度测试,我会报告。
small/medium/large 原始任务队列本身不会有太大帮助 - 一旦原始任务入队,它们将继续产生工作任务,甚至可能打破工作任务队列大小限制。所以你需要 pace/control 排队原始任务。
我会跟踪 datastore/GCS 中的 "todo" 原始任务,并仅在相应队列大小足够低(1 或可能 2待处理作业),来自重复任务、cron 作业或延迟任务(取决于执行原始任务排队所需的速率),这将实现所需的节奏和优先级逻辑,就像推送队列调度程序,但没有您提到的额外延迟。
我没有使用拉取队列,但据我了解,它们可能非常适合您的用例。您可以定义 3 个拉取队列,并让 X
工作人员从他们那里拉取任务,首先尝试 "small" 队列,然后在 "medium" 队列为空时移动到 "medium"(其中 X
是您的最大并发数)。您不需要中央调度程序。
但是,即使没有任务(或 X / threadsPerMachine
?),您也需要为 X
名员工付费,或者自己缩小和扩大他们的规模。
所以,这是另一个想法:使用正确的 maximum concurrency
创建一个推送队列。当您收到新任务时,将其信息推送到 datastore 并排队 generic 作业。然后,该通用作业将查询数据存储以按优先顺序查找任务,并执行它找到的第一个任务。这样,下一个作业仍会执行一个短任务,即使该作业已经从一个大任务中排队。
编辑:我现在迁移到一个更简单的解决方案,类似于@eric-simonton 所描述的:
- 我有多个 PULL 队列,每个优先级一个
- 许多工作人员拉取端点(处理程序)
- 处理程序生成一个随机数并执行简单的 "if less than 0.6, try first the small queue and then the large queue, else vice-versa (large then small)"
- 如果工作人员没有任务或出现错误,他们会进行半随机指数退避直到最大超时(即,他们开始每 1 秒拉一次,并且在每次空拉后大约加倍超时,最多 30 秒)
最后一点是必要的 - 除其他原因外 - 因为从 PULL 队列每秒拉取的次数限制为 10k/s:https://cloud.google.com/appengine/docs/python/taskqueue/overview-pull#Python_Leasing_tasks
我实施了更新中描述的解决方案:
- 两个 PULL 队列(中型任务和大型任务)
- 并发数为 1 的调度程序 (PUSH) 队列
- 并发等于worker数量的worker(PUSH)队列
有关详细信息,请参阅问题。一些注意事项:
- 由于最终一致性,任务可见性有一些延迟(即,调度员任务有时看不到拉队列中的任务,即使它们被插入在一起)——我通过添加倒计时 5 来解决秒到调度程序任务,并添加一个 cron 作业,每分钟添加一个调度程序任务(因此,如果原始调度程序任务没有 "see" 来自拉队列的任务,另一个将在稍后出现)
- 确保为每个任务命名,以消除重复分派它们的可能性
- 您不能从 PULL 队列中租用 0 个项目:-)
- 批处理操作有上限,因此您必须对批处理任务队列调用进行自己的批处理
- 似乎没有办法以编程方式获取队列的 "maximum parallelism" 值,因此我不得不在调度程序中对其进行硬编码(以计算它可以安排的任务数量)
- 如果队列中已有一些(至少 10 个)调度程序任务,则不要添加它们