Rufus Scheduler - 帮助启动/停止模型中的作业

Rufus Scheduler - Help Starting / Stopping Jobs in Model

阅读 Rufus Scheduler 文档大约 100 遍后,我能找到的每个 Whosebug 问题,查看源代码,最后阅读 how to report bugs effectively 两遍。我已经到了@jmettraux 需要你帮助的地步:

应用程序

我正在编写一个内部应用程序,它允许我的团队创建定期 运行 将值保存到数据库的代码的指标。我有以下设置。

schema.rb

create_table "metrics", force: :cascade do |t|
    t.string   "frequency"
    t.string   "name"
    t.text     "data"
    t.datetime "created_at", null: false
    t.datetime "updated_at", null: false
    t.enter code hereboolean  "active"
  end

initializers/scheduler.rb

require 'rufus-scheduler'

class Rufus::Scheduler::Job
  def report
    logger = Logger.new(STDOUT)
    logger.info "Job: #{@id} Tags: #{@tags} Frequency: #{@frequency}"
  end
end

SCHEDULER = Rufus::Scheduler.new(:lockfile => ".rufus-scheduler.lock")

unless defined?(Rails::Console) || File.split([=11=]).last == 'rake'
  #Launch All Matrics Jobs that are active
  @metrics = Metric.active

  unless SCHEDULER.down?
    @metrics.each_with_index do | metric, index |
      SCHEDULER.every metric.frequency, :tags => metric.id, :overlap => false, :timeout => metric.try(:timeout) || '3m' do | job |
        metric.add_value
        job.report
      end
    end
  end
end

什么有效

上面的代码完美运行,Rufus 正确初始化,所有作业都按预期启动。

问题

我 运行 遇到的一个问题是,一旦应用 运行 我需要能够取消安排作业并在保存指标时重新安排它们。我使用 Rufus 标签在创建标签时分配一个带有指标 ID 的标签,并使用它来加载它们以取消计划。 这是我当前的带有相关代码的度量模型。

models/metric.rb

class Metric < ApplicationRecord

  ...

  after_save :update_job

  private
  def update_job
    if self.changed.present?
      job = SCHEDULER.jobs(tag: self.id).first
      if job.present?
        logger.info "Job #{job.id}, for Metric #{self.id} unscheduled."
        SCHEDULER.unschedule(job.id)
      end

      if self.active
        metric = self
        new_job = SCHEDULER.every self.frequency, :tags => self.id, :overlap => false, :timeout => self.try(:timeout) || '3m' do | job |
          metric.add_value
        end
        logger.info "Job #{new_job} for Metric #{self.id} scheduled."
      end
    end
  end

end

真正的问题!

Scenario 1:

  • Application Starts
  • scheduler initilizer loads all active metrics
  • scheduler initilizer starts all active metrics as 'every' job
  • User edits an active metric setting it to inactive
  • The update_job method is ran after the metric is saved
  • inside the update_job method the job for the current metric is found successfully
  • the unscheudle method is called on the job
  • PROBLEM HERE: The job runs again even after being unscheduled

Scenario 2:

  • Application Starts
  • scheduler initilizer loads all active metrics
  • scheduler initilizer starts all active metrics as 'every' job
  • User edits an inactive metric setting it to active
  • The update_job method is ran after the metric is saved
  • inside the update_job method a new 'every' job is scheduled for the metric
  • PROBLEM HERE: The job never gets started

我试过的

System/Application 信息

第 2 阶段日志 - 12/20/2016

场景 1:

  xxx SCHEDULER started: 70168058449780
  xxx trigger (scheduled in initializer) Job: Rufus::Scheduler::EveryJob every_1482273018.01852_1684784125738549751 (70168045492220) Tags: ["2"] Frequency: 20.0
  xxx Metric 2 #update_job active: false
  xxx Metric 2 #update_job 0 SCHEDULER Rufus::Scheduler 3.2.2 70168058449780 down? false at_jobs: 0 in_jobs: 0 every_jobs: 2 interval_jobs: 0 cron_jobs: 0
  xxx Metric 2 found Job Job: Rufus::Scheduler::EveryJob every_1482273018.01852_1684784125738549751 (70168045492220) Tags: ["2"] Frequency: 20.0
  xxx Metric 2 Job every_1482273018.01852_1684784125738549751 unscheduled
  xxx Metric 2 #update_job 1 SCHEDULER Rufus::Scheduler 3.2.2 70168058449780 down? false at_jobs: 0 in_jobs: 0 every_jobs: 1 interval_jobs: 0 cron_jobs: 0
  xxx Metric 2 #update_job 2 SCHEDULER Rufus::Scheduler 3.2.2 70168058449780 down? false at_jobs: 0 in_jobs: 0 every_jobs: 1 interval_jobs: 0 cron_jobs: 0
  xxx trigger (scheduled in initializer) Job: Rufus::Scheduler::EveryJob every_1482273018.01852_1684784125738549751 (70168045492220) Tags: ["2"] Frequency: 20.0
  xxx trigger (scheduled in initializer) Job: Rufus::Scheduler::EveryJob every_1482273018.01852_1684784125738549751 (70168045492220) Tags: ["2"] Frequency: 20.0

场景 2:

  xxx SCHEDULER started: 70272158805840
  xxx Metric 2 #update_job active: true
  xxx Metric 2 #update_job 0 SCHEDULER Rufus::Scheduler 3.2.2 70272158805840 down? false at_jobs: 0 in_jobs: 0 every_jobs: 1 interval_jobs: 0 cron_jobs: 0
  xxx Metric 2 #update_job 1 SCHEDULER Rufus::Scheduler 3.2.2 70272158805840 down? false at_jobs: 0 in_jobs: 0 every_jobs: 1 interval_jobs: 0 cron_jobs: 0
  xxx Metric 2 Job every_1482273159.799382_1489585858424637616 scheduled
  xxx Metric 2 #update_job 2 SCHEDULER Rufus::Scheduler 3.2.2 70272158805840 down? false at_jobs: 0 in_jobs: 0 every_jobs: 2 interval_jobs: 0 cron_jobs: 0

第 3 阶段日志 - 2016 年 12 月 20 日

场景 1:

xxx SCHEDULER started: 70321240613760
xxx sc Rufus::Scheduler 3.2.2 70321240613760
xxx sc down? false
xxx sc Process.pid 39975
xxx sc jobs:
xxx sc   0: j: Rufus::Scheduler::EveryJob i: "every_1482335616.453129_4198855636205920237" oi: 70321198587560 ts: ["2"] frq: 20.0 ua: nil c: 0 nt: 2016-12-21 09:53:56 -0600
xxx initializer over.
xxx Metric 2 Job every_1482335616.453129_4198855636205920237 scheduled in initializer for Metric 2
xxx Metric 2 Job every_1482335616.453129_4198855636205920237 scheduled in initializer for Metric 2
xxx Metric 2 Job every_1482335616.453129_4198855636205920237 scheduled in initializer for Metric 2
xxx Metric 2 #update_job changed: ["active", "updated_at"] active: false
xxx Metric 2 sc #update_job 0 Rufus::Scheduler 3.2.2 70321240613760
xxx Metric 2 sc #update_job 0 down? false
xxx Metric 2 sc #update_job 0 Process.pid 40037
xxx Metric 2 sc #update_job 0 jobs:
xxx Metric 2 sc #update_job 0   0: j: Rufus::Scheduler::EveryJob i: "every_1482335616.453129_4198855636205920237" oi: 70321198587560 ts: ["2"] frq: 20.0 ua: nil c: 0 nt: 2016-12-21 09:53:56 -0600
xxx Metric 2 found Job j: Rufus::Scheduler::EveryJob i: "every_1482335616.453129_4198855636205920237" oi: 70321198587560 ts: ["2"] frq: 20.0 ua: nil c: 0 nt: 2016-12-21 09:53:56 -0600 first
xxx Metric 2 Job every_1482335616.453129_4198855636205920237 unscheduled
xxx Metric 2 Job every_1482335616.453129_4198855636205920237 scheduled? false
xxx Metric 2 sc #update_job 1 Rufus::Scheduler 3.2.2 70321240613760
xxx Metric 2 sc #update_job 1 down? false
xxx Metric 2 sc #update_job 1 Process.pid 40037
xxx Metric 2 sc #update_job 1 jobs:
xxx Metric 2 Job every_1482335616.453129_4198855636205920237 scheduled in initializer for Metric 2

场景 2:

xxx SCHEDULER started: 70152969118240
xxx sc Rufus::Scheduler 3.2.2 70152969118240
xxx sc down? false
xxx sc Process.pid 40811
xxx sc jobs:
xxx initializer over.
xxx Metric 2 #update_job changed: ["active", "updated_at"] active: true
xxx Metric 2 sc #update_job 0 Rufus::Scheduler 3.2.2 70152969118240
xxx Metric 2 sc #update_job 0 down? false
xxx Metric 2 sc #update_job 0 Process.pid 40872
xxx Metric 2 sc #update_job 0 jobs:
xxx Metric 2 sc #update_job 1 Rufus::Scheduler 3.2.2 70152969118240
xxx Metric 2 sc #update_job 1 down? false
xxx Metric 2 sc #update_job 1 Process.pid 40872
xxx Metric 2 sc #update_job 1 jobs:
xxx Metric 2 Job every_1482336051.9823449_3069051954259909338 scheduled in #update_job
xxx Metric 2 sc #update_job 2 Rufus::Scheduler 3.2.2 70152969118240
xxx Metric 2 sc #update_job 2 down? false
xxx Metric 2 sc #update_job 2 Process.pid 40872
xxx Metric 2 sc #update_job 2 jobs:
xxx Metric 2 sc #update_job 2   0: j: Rufus::Scheduler::EveryJob i: "every_1482336051.9823449_3069051954259909338" oi: 70152971745840 ts: ["2"] frq: 20.0 ua: nil c: 0 nt: 2016-12-21 10:01:11 -0600

When the above code runs for an active metric going inactive I successfully get the Logger messages and load the right job. As well, when I use better errors to inspect the code at that area the job itself sets the @unscheduled_at attribute correctly however the Job keeps getting rescheduled to run again.

On the other side of it when the update_job method runs and is trying to reschedule the job it never starts.

所以你是想说作业被重新安排了,但新的、重新安排的作业永远不会触发?

如果我天真地查看您的代码(为了更好地理解而重新散列):

def update_job

  if self.changed.present?
    job = SCHEDULER.jobs(tag: self.id).first
    if job.present?
      logger.info "Job #{job.id}, for Metric #{self.id} unscheduled."
      job.unschedule
    end

    if self.active
      metric = self
      new_job_id =
        SCHEDULER.every(
          self.frequency,
          :tags => self.id,
          :overlap => false,
          :timeout => self.try(:timeout) || '3m'
        ) { |job| metric.add_value }
      logger.info "Job #{new_job_id} for Metric #{self.id} scheduled."
    end
  end
end

您描述的症状可以恢复到"application never enters the reschedule block of the update_job method"吗?

The Metric with ID 2 has a frequency of 20s. It runs twice then gets unscheduled correctly so I know its working properly outside of the model code.

所以它在您的代码模型中。小概率跟rufus-scheduler有关

请仔细改写您的"actual question"。在当前版本中,它让我感到困惑。

第 2 阶段 - 2016-12-21

你能试试下面的代码吗?

警告:它未经测试,它可能包含您一定会发现并修复以使其可用的错误。

然后尝试您的两个场景并报告,使用 "xxx " 此代码发出的日志 grep。它可能会告诉我们出了什么问题。

提前致谢。

# initializers/scheduler.rb

require 'rufus-scheduler'

class Rufus::Scheduler
  def to_report_s
    a = []
    a << self.class
    a << Rufus::Scheduler::VERSION
    a << self.object_id
    a << "down? #{self.down?}"
    %w[ at in every internal cron ].each do |flav|
      m = "#{flav}_jobs".to_sym
      a << "#{m}: #{self.send(m).size}"
    end
    a.collect(&:to_s).join(' ')
  end
end
class Rufus::Scheduler::Job
  def to_report_s
    "Job: #{self.class} #{@id} (#{self.object_id}) " +
    "Tags: #{@tags.inspect} Frequency: #{@frequency}"
  end
end

#SCHEDULER = Rufus::Scheduler.new(:lockfile => ".rufus-scheduler.lock")
SCHEDULER = Rufus::Scheduler.new

# Add "global" error handler to the rufus-scheduler instance
#
def SCHEDULER.on_error(job, error)

  Rails.logger.error(
    "xxx err#{error.object_id} rufus-scheduler intercepted #{error.inspect}" +
    " in job #{job.to_report_s}")
  error.backtrace.each_with_index do |line, i|
    Rails.logger.error(
      "xxx err#{error.object_id} #{i}: #{line}")
  end
end

logger.info("xxx SCHEDULER started: #{SCHEDULER.object_id}")

unless (
  defined?(Rails::Console) || File.split([=11=]).last == 'rake' ||
  SCHEDULER.down?
)
  # launch All Matrics Jobs that are active

  Metric.active.each do |metric|

    SCHEDULER.every(
      metric.frequency,
      :tags => metric.id,
      :overlap => false,
      :timeout => metric.try(:timeout) || '3m'
    ) do |job|
      logger.info "xxx trigger (scheduled in initializer) #{job.to_report_s}"
      metric.add_value
    end
  end
end

# models/metric.rb

class Metric < ApplicationRecord

  after_save :update_job

  private

  def update_job

    lip = "Metric #{id}" # logger info prefix
    logger.info("xxx #{lip} #update_job active: #{self.active.inspect}")

    if self.changed.present?

      logger.info("xxx #{lip} #update_job 0 SCHEDULER #{SCHEDULER.to_report_s}")

      #job = SCHEDULER.jobs(tag: self.id).first
      #if job.present?
      if job = SCHEDULER.jobs(tag: self.id).first
        logger.info "xxx #{lip} found Job #{job.to_report_s}"
        #SCHEDULER.unschedule(job.id)
        job.unschedule
        logger.info "xxx #{lip} Job #{job.id} unscheduled"
      end

      logger.info("xxx #{lip} #update_job 1 SCHEDULER #{SCHEDULER.to_report_s}")

      if self.active
        metric = self
        job_id = SCHEDULER.every(
          self.frequency,
          :tags => self.id,
          :overlap => false,
          :timeout => self.try(:timeout) || '3m'
        ) do |job|
          logger.info "xxx trigger (scheduled in #updateJob) #{job.to_report_s}"
          metric.add_value
        end
        logger.info "xxx #{lip} Job #{job_id} scheduled"
      end

      logger.info("xxx #{lip} #update_job 2 SCHEDULER #{SCHEDULER.to_report_s}")
    end
  end
end

第 3 阶段 - 2016-12-21

能否请您使用以下代码重试:

# initializers/scheduler.rb

require 'rufus-scheduler'

class Rufus::Scheduler
  def to_report_a
    a = []
    a << "#{self.class} #{Rufus::Scheduler::VERSION} #{object_id}"
    a << "down? #{self.down?}"
    a << "Process.pid #{Process.pid}"
    a << "jobs:"
    jobs.each_with_index { |job, i| a << "  #{i}: #{job.to_report_s}" }
    a.collect(&:to_s)
  end
end
class Rufus::Scheduler::Job
  def to_report_s
    {
      j: self.class, i: @id, oi: object_id, ts: @tags,
      frq: @frequency, ua: @unscheduled_at, c: count, nt: next_time
    }
      .collect { |k, v| "#{k}: #{v.inspect}" }
      .join(' ')
  end
end

#SCHEDULER = Rufus::Scheduler.new(:lockfile => ".rufus-scheduler.lock")
SCHEDULER = Rufus::Scheduler.new

# Add "global" error handler to the rufus-scheduler instance
#
def SCHEDULER.on_error(job, error)
  lep = "xxx err#{error.object_id}"
  Rails.logger.error(
    "xxx #{lep} rufus-scheduler intercepted #{error.inspect}" +
    " in job #{job.to_report_s}")
  error.backtrace.each_with_index do |line, i|
    Rails.logger.error(
      "xxx #{lep} #{i}: #{line}")
  end
  Rails.logger.error("xxx #{lep} scheduler:")
  SCHEDULER.to_report_a.each { |l| Rails.logger.error("xxx #{lep} #{l}") }
end

logger.info("xxx SCHEDULER started: #{SCHEDULER.object_id}")

unless (
  defined?(Rails::Console) || File.split([=13=]).last == 'rake' ||
  SCHEDULER.down?
)
  # launch All Matrics Jobs that are active

  Metric.active.each do |metric|

    lip = "Metric #{metric.id}" # logger info prefix

    job_id =
      SCHEDULER.every(
        metric.frequency,
        :tags => metric.id,
        :overlap => false,
        :timeout => metric.try(:timeout) || '3m'
      ) do |job|
        logger.info(
          "xxx #{lip} trigger (scheduled in initializer) " +
          "#{job.to_report_s}")
        metric.add_value
      end
    logger.info(
      "xxx #{lip} Job #{job_id} scheduled in initializer " +
      "for Metric #{metric.id}")
  end

  SCHEDULER.to_report_a.each { |l| logger.info("xxx sc #{l}") }
  logger.info("xxx initializer over.")
end

# models/metric.rb

class Metric < ApplicationRecord

  after_save :update_job

  private

  def update_job

    lip = "Metric #{id}" # logger info prefix
    logger.info(
      "xxx #{lip} #update_job " +
      "changed: #{self.changed.inspect} active: #{self.active.inspect}")

    return unless self.changed.present?

    SCHEDULER.to_report_a
      .each { |l| logger.info("xxx #{lip} sc #update_job 0 #{l}") }

    jobs = SCHEDULER.jobs(tag: self.id)

    jobs.each_with_index do |job, i|
      logger.info(
        "xxx #{lip} found Job #{job.to_report_s} #{i == 0 ? 'first' : ''}")
    end

    if job = jobs.first
      #SCHEDULER.unschedule(job.id)
      job.unschedule
      logger.info "xxx #{lip} Job #{job.id} unscheduled"
      logger.info "xxx #{lip} Job #{job.id} scheduled? #{SCHEDULER.scheduled?(job.id)}"
    end

    SCHEDULER.to_report_a
      .each { |l| logger.info("xxx #{lip} sc #update_job 1 #{l}") }

    return unless self.active

    metric = self
    job_id = SCHEDULER.every(
      self.frequency,
      :tags => self.id,
      :overlap => false,
      :timeout => self.try(:timeout) || '3m'
    ) do |job|
      logger.info "xxx trigger (scheduled in #update_job) #{job.to_report_s}"
      metric.add_value
    end
    logger.info "xxx #{lip} Job #{job_id} scheduled in #update_job"

    SCHEDULER.to_report_a
      .each { |l| logger.info("xxx #{lip} sc #update_job 2 #{l}") }
  end
end

第 4 阶段 - 2016-12-22

Added phase 3 logs to question. It looks like some how there is a new scheduler process that is subsequently created and then destroyed inside the model code. Thanks again for your diligence on this!

模型代码中真的会发生这种情况吗?您的日志告诉我们它发生在另一个进程中。您的初始 Ruby 进程实例化 rufus-scheduler 然后您的 HTTP 请求在工作进程中提供服务,这些工作进程是您初始进程的分支(没有线程,换句话说,使用不活动的调度程序)。

您正在集群模式下使用 Puma。我应该立即询问您的配置。

https://github.com/puma/puma#configuration

仔细阅读其文档

一个简单的解决方法是不使用集群模式,这样只涉及一个 Ruby 进程,为所有 HTTP 请求提供服务。

另一方面,如果需要集群模式,就得换个思路了。您可能不希望每个工作线程有 1 个 rufus-scheduler 实例。您可以专注于在主流程中拥有核心(实时)rufus-scheduler。它可能有一个 "management" 作业来检查最近更新的指标和 unschedules/schedules 个作业。

SCHEDULER.every '10s', overlap: false do
  Metric.recently_updated.each do |metric|
    SCHEDULER.jobs(tags: metric.id).each(&:unschedule)
    SCHEDULER.every(metric.frequency, tags: self.id) { metric.add_value }
  end
end
  # or something like that...

玩得开心!