Sidekiq 父批等待子批的所有作业完成
Sidekiq Parent batch to wait until child batch's all jobs complete
我想创建一个工作流,其中一个 Sidekiq worker 生成一个批处理作业,其中将再次有一个要生成和完成的 worker 列表,只有在这些子 worker 完全执行后,我想运行父批次的回调(on_complete)。
基本上,我想实现这个工作流程
所以,我从文档中研究了 sidekiq 批处理 here and here
但是,上述两个资源都提到了一个工作流,该工作流需要先完成作业的一个步骤,然后使用 Sidekiq Batches 继续执行另一个步骤。
在我的要求中,我不想在每个单独的批处理作业中创建批处理,然后在所有子批处理完成后在主批处理的回调中做一些工作。
请看下面的代码片段,我已经尽力把它说清楚了。如果需要其他信息,请发表评论。谢谢
Class TeamWorker
def perform
Team.all.each do |team.id|
ParentWorker.perform_async(team.id)
end
end
end
Class ParentWorker
def perform(team_id)
assets_batch = Sidekiq::Batch.new
assets_batch.on(:complete, 'ParentWorker#FinalCallback', {:team_id => team_id})
assets_batch.jobs do
Team.find(team_id).assets.each do |asset|
AnotherWorker.perform_async(asset.id)
end
end
def FinalCallback(status, options)
#This should run last after all child jobs workers completes
end
end
class AnotherWorker
def perform(asset_id)
child_batch = Sidekiq::Batch.new
child_batch.on(:complete, 'do_something', {asset_id: asset_id})
child_batch.jobs do
ChildJobWorker.perform_async
ChildJobWorker.perform_async
end
end
end
class ChildJobWorker
def perform
#do some processing/work and return
end
end
问题是您没有将子批次与父批次相关联。您需要重新打开批次才能添加作业 and/or 个子批次。您需要这样做:
class AnotherWorker
def perform(asset_id)
batch.jobs do # reopen this job's batch, now child_batch will really be a child batch
child_batch = Sidekiq::Batch.new
child_batch.on(:complete, 'do_something', {asset_id: asset_id})
child_batch.jobs do
ChildJobWorker.perform_async
ChildJobWorker.perform_async
end
end
end
end
我想创建一个工作流,其中一个 Sidekiq worker 生成一个批处理作业,其中将再次有一个要生成和完成的 worker 列表,只有在这些子 worker 完全执行后,我想运行父批次的回调(on_complete)。
基本上,我想实现这个工作流程
所以,我从文档中研究了 sidekiq 批处理 here and here
但是,上述两个资源都提到了一个工作流,该工作流需要先完成作业的一个步骤,然后使用 Sidekiq Batches 继续执行另一个步骤。
在我的要求中,我不想在每个单独的批处理作业中创建批处理,然后在所有子批处理完成后在主批处理的回调中做一些工作。
请看下面的代码片段,我已经尽力把它说清楚了。如果需要其他信息,请发表评论。谢谢
Class TeamWorker
def perform
Team.all.each do |team.id|
ParentWorker.perform_async(team.id)
end
end
end
Class ParentWorker
def perform(team_id)
assets_batch = Sidekiq::Batch.new
assets_batch.on(:complete, 'ParentWorker#FinalCallback', {:team_id => team_id})
assets_batch.jobs do
Team.find(team_id).assets.each do |asset|
AnotherWorker.perform_async(asset.id)
end
end
def FinalCallback(status, options)
#This should run last after all child jobs workers completes
end
end
class AnotherWorker
def perform(asset_id)
child_batch = Sidekiq::Batch.new
child_batch.on(:complete, 'do_something', {asset_id: asset_id})
child_batch.jobs do
ChildJobWorker.perform_async
ChildJobWorker.perform_async
end
end
end
class ChildJobWorker
def perform
#do some processing/work and return
end
end
问题是您没有将子批次与父批次相关联。您需要重新打开批次才能添加作业 and/or 个子批次。您需要这样做:
class AnotherWorker
def perform(asset_id)
batch.jobs do # reopen this job's batch, now child_batch will really be a child batch
child_batch = Sidekiq::Batch.new
child_batch.on(:complete, 'do_something', {asset_id: asset_id})
child_batch.jobs do
ChildJobWorker.perform_async
ChildJobWorker.perform_async
end
end
end
end