将成功和失败处理程序传递给 ActiveJob

Passing success and failure handlers to an ActiveJob

我有一个 ActiveJob,它应该通过 HTTP 从外部系统加载一段数据。该作业完成后,我想排队执行一些后处理的第二个作业,然后将数据提交到不同的外部系统。

我不想让第一份工作知道第二份工作,因为

  1. 封装
  2. 可重用性
  3. 这是第一份工作的none,基本上

同样,我不希望第一份工作关心如果数据加载失败接下来会发生什么——也许用户会收到通知,也许我们在超时后重试,也许我们只是记录它并抛出我们的手 -- 同样,它可能会根据异常的详细信息而有所不同,并且作业不需要包含该逻辑或与其他系统的连接来处理它。

在 Java 中(这是我最有经验的地方),我可以使用像 Guava 的 ListenableFuture 这样的东西在事后添加成功和失败回调:

MyDataLoader loader = new MyDataLoader(someDataSource)
ListenableFuture<Data> future = executor.submit(loader);
Futures.addCallback(future, new FutureCallback<Data>() {
    public void onSuccess(Data result) {
        processData(result);
    }
    public void onFailure(Throwable t) {
        handleFailure(t);
    }
});
但是,

ActiveJob 似乎没有提供这种外部回调机制——我可以从 "Active Job Basics"、after_perform 和 [=] 中的 relevant sections 中了解到14=] 只能从作业 class 中调用。而且after_peform并不是为了区分成功与失败。

所以我能想到的最好的方法(我并不是说它很好)是将几个 lambda 表达式传递到作业的 perform 方法中,因此:

class MyRecordLoader < ActiveJob::Base

  # Loads data expensively (hopefully on a background queue) and passes
  # the result, or any exception, to the appropriate specified lambda.
  #
  # @param data_source [String] the URL to load data from
  # @param on_success [-> (String)] A lambda that will be passed the record
  #   data, if it's loaded successfully
  # @param on_failure [-> (Exception)] A lambda that will be passed any
  #   exception, if there is one
  def perform(data_source, on_success, on_failure)
    begin
      result = load_data_expensively_from data_source
      on_success.call(result)
    rescue => exception
      on_failure.call(exception)
    end
  end

end

(旁注:我不知道用于将 lambda 声明为参数的 yardoc 语法是什么。这看起来是否正确,或者,如果不正确,是否合理?)

然后调用者必须传递这些:

MyRecordLoader.perform_later(
  some_data_source,
  method(:process_data),
  method(:handle_failure)
)

这并不 糟糕 ,至少在调用端是这样,但它看起来很笨拙,我不禁怀疑这有一个共同的模式,我只是没有找到。我有点担心,作为一个 Ruby/Rails 新手,我只是让 ActiveJob 去做一些它一开始就不想做的事情。我找到的所有 ActiveJob 示例都是 'fire and forget' -- 异步 "returning" 结果似乎不是 ActiveJob 用例。

此外,我不清楚这是否适用于像 Resque 这样在单独进程中运行作业的后端。

"the Ruby way"要做什么?


更新: by dre-hh, ActiveJob turned out not to be the right tool here. It was also unreliable, and overcomplicated for the situation. I switched to Concurrent Ruby instead, which fits the use case better, and which, since the tasks are mostly IO-bound, is fast enough even on MRI, despite the GIL.

ActiveJob 不是像 future 或 promise 这样的异步库。

它只是一个用于在后台执行任务的界面。当前 thread/process 没有收到此操作的结果。

比如使用Sidekiq作为ActiveJob队列时,会将perform方法的参数序列化到redis store中。 rails 应用上下文中的另一个守护进程 运行 将监视 Redis 队列并使用序列化数据实例化您的工作程序。

所以传递回调可能没问题,但是为什么要将它们作为另一个 class 的方法。如果回调是动态的(在不同的调用中发生变化),则传递回调是有意义的。然而,当您在调用 class 上实现它们时,请考虑将这些方法移至您的工作人员 class.