将 Sidekiq 用于 Active Job 并获得 ActiveJob::DeserializationError

Using Sidekiq for Active Job and getting ActiveJob::DeserializationError

我正在尝试使用 Sidekiq 来 运行 下面的工作。

作业在未排队时执行良好 (perform_now),但在调用时失败 (perform_later),它使用 Sidekiq。

AddEmployeesToRoomJob.perform_now room  ## works fine
AddEmployeesToRoomJob.perform_later room  ## breaks in Sidekiq

错误:

AddEmployeesToRoomJob JID-da24b13f405b1ece1212bbd5 INFO: fail: 0.003     sec
2016-08-20T14:57:16.645Z 19456 TID-owmym5fbk WARN:     {"class":"ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper","wrapped"    :"AddEmployeesToRoomJob","queue":"default","args":    [{"job_class":"AddEmployeesToRoomJob","job_id":"0ba5bd30-e281-49a7-a93f-    6e50445183ac","queue_name":"default","priority":null,"arguments":    [{"_aj_globalid":"gid://dragonfly/Room/1"}],"locale":"en"}],"retry":true,    "jid":"da24b13f405b1ece1212bbd5","created_at":1471704675.739077,"enqueued    _at":1471705036.6406531,"error_message":"Error while trying to     deserialize arguments: Couldn't find Room with     'id'=1","error_class":"ActiveJob::DeserializationError","failed_at":14717    04675.946183,"retry_count":4,"retried_at":1471705036.644416}
2016-08-20T14:57:16.645Z 19456 TID-owmym5fbk WARN:     ActiveJob::DeserializationError: Error while trying to deserialize     arguments: Couldn't find Room with 'id'=1
2016-08-20T14:57:16.645Z 19456 TID-owmym5fbk WARN:     /Users/tamlyn/.rvm/gems/ruby-2.2.3/gems/activerecord-    5.0.0.1/lib/active_record/relation/finder_methods.rb:357:in     `raise_record_not_found_exception!'

我的工作 class AddEmployeesToRoomJob < ApplicationJob queue_as:默认

  def perform(room)
    employees = Employee.all
    if employees.length > 0
      employees.each do |employee|
        UserRoom.create(user: employee, room: room)
      end
    end
  end
end

我的想法 我不明白为什么它找不到我传递给执行方法的房间。好像它在作业的队列化/ JSONifying 中以某种方式丢失了该变量?

Sidekiq 文档说

"Unfortunately this means that if the [Room] record is deleted after the job is enqueued but before the perform method is called, exception handling is different."

他们提出了解决方法,但我看不出这对我有什么帮助:

rescue_from ActiveJob::DeserializationError do |exception|
    # handle a deleted user record
end

在此先感谢您的帮助!

我认为将 Room 对象传递给 Sidekiq worker 不是一个好主意。我总是传递数据库对象的主键,然后重新查询。试试这个。

AddEmployeesToRoomJob.perform_later room.id

def perform(room_id)
  room = Room.find(room_id)
  employees = Employee.all
    if employees.length > 0
      employees.each do |employee|
        UserRoom.create(user: employee, room: room)
      end
    end
  end
end

不建议将 Ruby 对象作为参数发送给 sidekick worker。您可以将 Id 作为参数发送并在 perform 方法中初始化对象。如果要发送对象,那么您可以将 Ruby 对象转储为任何其他格式,例如 Json/ Binary / yml,如下所示。

object.to_json 

或者

Marshal.dump(object)

并且在使用 perform 方法中的对象之前,您可以将对象反序列化为 Ruby 对象,如下所示。

JSON.parse(serialized_object)

或者

Marshal.load(serialized_object)

这些是针对您的问题的解决方案,但不是理想的解决方案。

如果可以确保已提交模型,则可以将模型传递给作业。在提交当前事务之前将作业排入队列是一个典型的错误。

您可以在此处找到更多信息: https://github.com/mperham/sidekiq/wiki/FAQ#why-am-i-seeing-a-lot-of-cant-find-modelname-with-id12345-errors-with-sidekiq

在我们的项目中,我们编写了一个模型关注点,以便能够在提交代码块后添加动态。

module AfterCommitOnce
  extend ActiveSupport::Concern

  included do
    after_commit :execute_after_commit_handlers
  end

  def after_commit_once(&block)
    @after_commits = @after_commits || []
    @after_commits << block
  end

  private

  def execute_after_commit_handlers
    @after_commits = @after_commits || []
    @after_commits.each do |ac|
      ac.call
    end
    @after_commits = []
  end
end

将此问题添加到您的 Employee 模型中,您可以指定在将所有更改提交到您的数据库后要执行的作业:

...
employee.after_commit_once do
   AddEmployeesToRoomJob.perform_later
end
employee.save!

我自己偶然发现了这个方法 discard_on 非常有用。大多数情况下,没有必要对已删除或从未创建的记录执行作业。

示例:

class ApplicationJob < ActiveJob::Base
  discard_on ActiveJob::DeserializationError do |job, error|
    Rails.logger.error("Skipping job because of ActiveJob::DeserializationError (#{error.message})")
  end
end