Sidekiq 的大任务或多个小任务
Big task or multiple small tasks with Sidekiq
我正在写一个工作人员将很多用户添加到一个组中。我在想 运行 一个拥有所有用户的大任务,或者像 100 个用户或每个任务一个一个的批处理更好。
目前这是我的代码
class AddUsersToGroupWorker
include Sidekiq::Worker
sidekiq_options :queue => :group_utility
def perform(store_id, group_id, user_ids_to_add)
begin
store = Store.find store_id
group = Group.find group_id
rescue ActiveRecord::RecordNotFound => e
Airbrake.notify e
return
end
users_to_process = store.users.where(id: user_ids_to_add)
.where.not(id: group.user_ids)
group.users += users_to_process
users_to_process.map(&:id).each do |user_to_process_id|
UpdateLastUpdatesForUserWorker.perform_async store.id, user_to_process_id
end
end
end
也许在我的方法中有这样的东西会更好:
def add_users
users_to_process = store.users.where(id: user_ids_to_add)
.where.not(id: group.user_ids)
users_to_process.map(&:id).each do |user_to_process_id|
AddUserToGroupWorker.perform_async group_id, user_to_process_id
UpdateLastUpdatesForUserWorker.perform_async store.id, user_to_process_id
end
end
但是这么多 find
请求。你怎么看?
如果需要,我有一个 sidekig pro 许可证(例如批处理)。
没有灵丹妙药。这取决于您的目标和应用程序。要问自己的一般问题:
- 您可以将多少用户 ID 传递给工作人员?能过100吗? 1000000 呢?
- 你们的工人可以工作多长时间?它应该对工作时间有任何限制吗?他们能卡住吗?
对于大型应用程序,有必要将传递的参数拆分为更小的块,以避免创建长运行 的作业。创建大量小型工作可以让您轻松扩展 - 您可以随时添加更多工人。
另外,为 worker 定义超时类型以停止处理卡住的 worker 可能是个好主意。
这是我的想法。
1.执行单个 SQL 查询而不是 N 个查询
这一行:group.users += users_to_process
可能产生 N SQL 个查询(其中 N 是 users_to_process.count)。我假设您在用户和组之间有多对多连接(使用 user_groups
加入 table/model),因此您应该使用一些 Mass inserting data technique:
users_to_process_ids = store.users.where(id: user_ids_to_add)
.where.not(id: group.user_ids)
.pluck(:id)
sql_values = users_to_process_ids.map{|i| "(#{i.to_i}, #{group.id.to_i}, NOW(), NOW())"}
Group.connection.execute("
INSERT INTO groups_users (user_id, group_id, created_at, updated_at)
VALUES #{sql_values.join(",")}
")
是的,它是原始的 SQL。而且快.
2。用户 pluck(:id)
而不是 map(&:id)
pluck
更快,因为:
- 它将 select 仅 'id' 列,因此从 DB
传输的数据较少
- 更重要的是,它不会为每个原始文件创建 ActiveRecord 对象
做 SQL 很便宜。创建 Ruby 个对象非常昂贵。
3。使用水平并行化而不是垂直并行化
我在这里的意思是,如果您需要对十几个记录执行顺序任务 A -> B -> C
,有两种主要的拆分工作的方法:
- 垂直细分。
AWorker
做 A(1)
、A(2)
、A(3)
; BWorker
做 B(1)
,等等; CWorker
完成所有 C(i)
个工作;
- 横向分割。
UniversalWorker
做 A(1)+B(1)+C(1)
.
使用后一种(水平)方式。
这是经验的陈述,而不是从某些理论的角度(两种方式都是可行的)。
为什么你应该这样做?
- 当您使用垂直分割时,当您将工作从一个工人传递给另一个工人时,您可能会遇到错误。喜欢 such kind of errors. You will pull your hair out if you bump into such errors, because they aren't persistent and easily reproducible. Sometimes they happen and sometimes they aren't. Is it possible to write a code which will pass the work down the chain without errors? Sure, it is. But it's better to keep it simple.
- 假设您的服务器处于静止状态。然后突然有新的工作机会到来。您的
B
和 C
工人只会浪费 RAM,而您的 A
工人会完成这项工作。然后你的 A
和 C
会浪费 RAM,而 B
正在工作。等等。如果你进行水平分割,你的资源消耗会自行消失。
将该建议应用于您的具体情况:对于初学者,不要在另一个异步任务中调用 perform_async
。
4.批量处理
回答您最初的问题 – 是的,分批处理。创建和管理异步任务本身会占用一些资源,因此无需创建太多。
TL;DR 所以最后,您的代码可能如下所示:
# model code
BATCH_SIZE = 100
def add_users
users_to_process_ids = store.users.where(id: user_ids_to_add)
.where.not(id: group.user_ids)
.pluck(:id)
# With 100,000 users performance of this query should be acceptable
# to make it in a synchronous fasion
sql_values = users_to_process_ids.map{|i| "(#{i.to_i}, #{group.id.to_i}, NOW(), NOW())"}
Group.connection.execute("
INSERT INTO groups_users (user_id, group_id, created_at, updated_at)
VALUES #{sql_values.join(",")}
")
users_to_process_ids.each_slice(BATCH_SIZE) do |batch|
AddUserToGroupWorker.perform_async group_id, batch
end
end
# add_user_to_group_worker.rb
def perform(group_id, user_ids_to_add)
group = Group.find group_id
# Do some heavy load with a batch as a whole
# ...
# ...
# If nothing here is left, call UpdateLastUpdatesForUserWorker from the model instead
user_ids_to_add.each do |id|
# do it synchronously – we already parallelized the job
# by splitting it in slices in the model above
UpdateLastUpdatesForUserWorker.new.perform store.id, user_to_process_id
end
end
我正在写一个工作人员将很多用户添加到一个组中。我在想 运行 一个拥有所有用户的大任务,或者像 100 个用户或每个任务一个一个的批处理更好。
目前这是我的代码
class AddUsersToGroupWorker
include Sidekiq::Worker
sidekiq_options :queue => :group_utility
def perform(store_id, group_id, user_ids_to_add)
begin
store = Store.find store_id
group = Group.find group_id
rescue ActiveRecord::RecordNotFound => e
Airbrake.notify e
return
end
users_to_process = store.users.where(id: user_ids_to_add)
.where.not(id: group.user_ids)
group.users += users_to_process
users_to_process.map(&:id).each do |user_to_process_id|
UpdateLastUpdatesForUserWorker.perform_async store.id, user_to_process_id
end
end
end
也许在我的方法中有这样的东西会更好:
def add_users
users_to_process = store.users.where(id: user_ids_to_add)
.where.not(id: group.user_ids)
users_to_process.map(&:id).each do |user_to_process_id|
AddUserToGroupWorker.perform_async group_id, user_to_process_id
UpdateLastUpdatesForUserWorker.perform_async store.id, user_to_process_id
end
end
但是这么多 find
请求。你怎么看?
如果需要,我有一个 sidekig pro 许可证(例如批处理)。
没有灵丹妙药。这取决于您的目标和应用程序。要问自己的一般问题:
- 您可以将多少用户 ID 传递给工作人员?能过100吗? 1000000 呢?
- 你们的工人可以工作多长时间?它应该对工作时间有任何限制吗?他们能卡住吗?
对于大型应用程序,有必要将传递的参数拆分为更小的块,以避免创建长运行 的作业。创建大量小型工作可以让您轻松扩展 - 您可以随时添加更多工人。
另外,为 worker 定义超时类型以停止处理卡住的 worker 可能是个好主意。
这是我的想法。
1.执行单个 SQL 查询而不是 N 个查询
这一行:group.users += users_to_process
可能产生 N SQL 个查询(其中 N 是 users_to_process.count)。我假设您在用户和组之间有多对多连接(使用 user_groups
加入 table/model),因此您应该使用一些 Mass inserting data technique:
users_to_process_ids = store.users.where(id: user_ids_to_add)
.where.not(id: group.user_ids)
.pluck(:id)
sql_values = users_to_process_ids.map{|i| "(#{i.to_i}, #{group.id.to_i}, NOW(), NOW())"}
Group.connection.execute("
INSERT INTO groups_users (user_id, group_id, created_at, updated_at)
VALUES #{sql_values.join(",")}
")
是的,它是原始的 SQL。而且快.
2。用户 pluck(:id)
而不是 map(&:id)
pluck
更快,因为:
- 它将 select 仅 'id' 列,因此从 DB 传输的数据较少
- 更重要的是,它不会为每个原始文件创建 ActiveRecord 对象
做 SQL 很便宜。创建 Ruby 个对象非常昂贵。
3。使用水平并行化而不是垂直并行化
我在这里的意思是,如果您需要对十几个记录执行顺序任务 A -> B -> C
,有两种主要的拆分工作的方法:
- 垂直细分。
AWorker
做A(1)
、A(2)
、A(3)
;BWorker
做B(1)
,等等;CWorker
完成所有C(i)
个工作; - 横向分割。
UniversalWorker
做A(1)+B(1)+C(1)
.
使用后一种(水平)方式。
这是经验的陈述,而不是从某些理论的角度(两种方式都是可行的)。
为什么你应该这样做?
- 当您使用垂直分割时,当您将工作从一个工人传递给另一个工人时,您可能会遇到错误。喜欢 such kind of errors. You will pull your hair out if you bump into such errors, because they aren't persistent and easily reproducible. Sometimes they happen and sometimes they aren't. Is it possible to write a code which will pass the work down the chain without errors? Sure, it is. But it's better to keep it simple.
- 假设您的服务器处于静止状态。然后突然有新的工作机会到来。您的
B
和C
工人只会浪费 RAM,而您的A
工人会完成这项工作。然后你的A
和C
会浪费 RAM,而B
正在工作。等等。如果你进行水平分割,你的资源消耗会自行消失。
将该建议应用于您的具体情况:对于初学者,不要在另一个异步任务中调用 perform_async
。
4.批量处理
回答您最初的问题 – 是的,分批处理。创建和管理异步任务本身会占用一些资源,因此无需创建太多。
TL;DR 所以最后,您的代码可能如下所示:
# model code
BATCH_SIZE = 100
def add_users
users_to_process_ids = store.users.where(id: user_ids_to_add)
.where.not(id: group.user_ids)
.pluck(:id)
# With 100,000 users performance of this query should be acceptable
# to make it in a synchronous fasion
sql_values = users_to_process_ids.map{|i| "(#{i.to_i}, #{group.id.to_i}, NOW(), NOW())"}
Group.connection.execute("
INSERT INTO groups_users (user_id, group_id, created_at, updated_at)
VALUES #{sql_values.join(",")}
")
users_to_process_ids.each_slice(BATCH_SIZE) do |batch|
AddUserToGroupWorker.perform_async group_id, batch
end
end
# add_user_to_group_worker.rb
def perform(group_id, user_ids_to_add)
group = Group.find group_id
# Do some heavy load with a batch as a whole
# ...
# ...
# If nothing here is left, call UpdateLastUpdatesForUserWorker from the model instead
user_ids_to_add.each do |id|
# do it synchronously – we already parallelized the job
# by splitting it in slices in the model above
UpdateLastUpdatesForUserWorker.new.perform store.id, user_to_process_id
end
end