Sidekiq 的大任务或多个小任务

Big task or multiple small tasks with Sidekiq

我正在写一个工作人员将很多用户添加到一个组中。我在想 运行 一个拥有所有用户的大任务,或者像 100 个用户或每个任务一个一个的批处理更好。

目前这是我的代码

class AddUsersToGroupWorker
  include Sidekiq::Worker
  sidekiq_options :queue => :group_utility

  def perform(store_id, group_id, user_ids_to_add)
    begin
      store = Store.find store_id
      group = Group.find group_id
    rescue ActiveRecord::RecordNotFound => e
      Airbrake.notify e
      return
    end

    users_to_process = store.users.where(id: user_ids_to_add)
                                  .where.not(id: group.user_ids)
    group.users += users_to_process

    users_to_process.map(&:id).each do |user_to_process_id|
      UpdateLastUpdatesForUserWorker.perform_async store.id, user_to_process_id
    end
  end
end 

也许在我的方法中有这样的东西会更好:

def add_users
    users_to_process = store.users.where(id: user_ids_to_add)
                                  .where.not(id: group.user_ids)

    users_to_process.map(&:id).each do |user_to_process_id|
      AddUserToGroupWorker.perform_async group_id, user_to_process_id
      UpdateLastUpdatesForUserWorker.perform_async store.id, user_to_process_id
    end
end

但是这么多 find 请求。你怎么看?

如果需要,我有一个 sidekig pro 许可证(例如批处理)。

没有灵丹妙药。这取决于您的目标和应用程序。要问自己的一般问题:

  • 您可以将多少用户 ID 传递给工作人员?能过100吗? 1000000 呢?
  • 你们的工人可以工作多长时间?它应该对工作时间有任何限制吗?他们能卡住吗?

对于大型应用程序,有必要将传递的参数拆分为更小的块,以避免创建长运行 的作业。创建大量小型工作可以让您轻松扩展 - 您可以随时添加更多工人。

另外,为 worker 定义超时类型以停止处理卡住的 worker 可能是个好主意。

这是我的想法。

1.执行单个 SQL 查询而不是 N 个查询

这一行:group.users += users_to_process 可能产生 N SQL 个查询(其中 N 是 users_to_process.count)。我假设您在用户和组之间有多对多连接(使用 user_groups 加入 table/model),因此您应该使用一些 Mass inserting data technique:

users_to_process_ids = store.users.where(id: user_ids_to_add)
                         .where.not(id: group.user_ids)
                         .pluck(:id)
sql_values = users_to_process_ids.map{|i| "(#{i.to_i}, #{group.id.to_i}, NOW(), NOW())"}
Group.connection.execute("
  INSERT INTO groups_users (user_id, group_id, created_at, updated_at)
  VALUES #{sql_values.join(",")}
")

是的,它是原始的 SQL。而且.

2。用户 pluck(:id) 而不是 map(&:id)

pluck 更快,因为:

  • 它将 select 仅 'id' 列,因此从 DB
  • 传输的数据较少
  • 更重要的是,它不会为每个原始文件创建 ActiveRecord 对象

做 SQL 很便宜。创建 Ruby 个对象非常昂贵。

3。使用水平并行化而不是垂直并行化

我在这里的意思是,如果您需要对十几个记录执行顺序任务 A -> B -> C,有两种主要的拆分工作的方法:

  • 垂直细分AWorkerA(1)A(2)A(3)BWorkerB(1),等等; CWorker 完成所有 C(i) 个工作;
  • 横向分割UniversalWorkerA(1)+B(1)+C(1).

使用后一种(水平)方式。

这是经验的陈述,而不是从某些理论的角度(两种方式都是可行的)。

为什么你应该这样做?

  • 当您使用垂直分割时,当您将工作从一个工人传递给另一个工人时,您可能会遇到错误。喜欢 such kind of errors. You will pull your hair out if you bump into such errors, because they aren't persistent and easily reproducible. Sometimes they happen and sometimes they aren't. Is it possible to write a code which will pass the work down the chain without errors? Sure, it is. But it's better to keep it simple.
  • 假设您的服务器处于静止状态。然后突然有新的工作机会到来。您的 BC 工人只会浪费 RAM,而您的 A 工人会完成这项工作。然后你的 AC 会浪费 RAM,而 B 正在工作。等等。如果你进行水平分割,你的资源消耗会自行消失。

将该建议应用于您的具体情况:对于初学者,不要在另一个异步任务中调用 perform_async

4.批量处理

回答您最初的问题 – 是的,分批处理。创建和管理异步任务本身会占用一些资源,因此无需创建太多。


TL;DR 所以最后,您的代码可能如下所示:

# model code

BATCH_SIZE = 100

def add_users
  users_to_process_ids = store.users.where(id: user_ids_to_add)
                           .where.not(id: group.user_ids)
                           .pluck(:id)
  # With 100,000 users performance of this query should be acceptable
  # to make it in a synchronous fasion
  sql_values = users_to_process_ids.map{|i| "(#{i.to_i}, #{group.id.to_i}, NOW(), NOW())"}
  Group.connection.execute("
    INSERT INTO groups_users (user_id, group_id, created_at, updated_at)
    VALUES #{sql_values.join(",")}
  ")

  users_to_process_ids.each_slice(BATCH_SIZE) do |batch|
    AddUserToGroupWorker.perform_async group_id, batch
  end
end

# add_user_to_group_worker.rb

def perform(group_id, user_ids_to_add)
  group = Group.find group_id

  # Do some heavy load with a batch as a whole
  # ...
  # ...
  # If nothing here is left, call UpdateLastUpdatesForUserWorker from the model instead

  user_ids_to_add.each do |id|
    # do it synchronously – we already parallelized the job
    # by splitting it in slices in the model above
    UpdateLastUpdatesForUserWorker.new.perform store.id, user_to_process_id
  end
end