Sidekiq 父批等待子批的所有作业完成

Sidekiq Parent batch to wait until child batch's all jobs complete

我想创建一个工作流,其中一个 Sidekiq worker 生成一个批处理作业,其中将再次有一个要生成和完成的 worker 列表,只有在这些子 worker 完全执行后,我想运行父批次的回调(on_complete)。

基本上,我想实现这个工作流程

所以,我从文档中研究了 sidekiq 批处理 here and here

但是,上述两个资源都提到了一个工作流,该工作流需要先完成作业的一个步骤,然后使用 Sidekiq Batches 继续执行另一个步骤。

在我的要求中,我不想在每个单独的批处理作业中创建批处理,然后在所有子批处理完成后在主批处理的回调中做一些工作。

请看下面的代码片段,我已经尽力把它说清楚了。如果需要其他信息,请发表评论。谢谢

Class TeamWorker
    def perform
        Team.all.each do |team.id|
            ParentWorker.perform_async(team.id)
          end
     end
end


Class ParentWorker
    def perform(team_id)
        assets_batch = Sidekiq::Batch.new
        assets_batch.on(:complete, 'ParentWorker#FinalCallback', {:team_id => team_id})
        assets_batch.jobs do
            Team.find(team_id).assets.each do |asset|
                AnotherWorker.perform_async(asset.id)
            end
    end
    def FinalCallback(status, options)
        #This should run last after all child jobs workers completes
    end
end

class AnotherWorker
    def perform(asset_id)
        child_batch = Sidekiq::Batch.new
        child_batch.on(:complete, 'do_something', {asset_id: asset_id})
        child_batch.jobs do
            ChildJobWorker.perform_async
            ChildJobWorker.perform_async
        end
    end
end

class ChildJobWorker
    def perform
        #do some processing/work and return
    end
end

问题是您没有将子批次与父批次相关联。您需要重新打开批次才能添加作业 and/or 个子批次。您需要这样做:

class AnotherWorker
    def perform(asset_id)
      batch.jobs do # reopen this job's batch, now child_batch will really be a child batch
        child_batch = Sidekiq::Batch.new
        child_batch.on(:complete, 'do_something', {asset_id: asset_id})
        child_batch.jobs do
            ChildJobWorker.perform_async
            ChildJobWorker.perform_async
        end
      end
    end
end