Rufus Scheduler - 帮助启动/停止模型中的作业
Rufus Scheduler - Help Starting / Stopping Jobs in Model
阅读 Rufus Scheduler 文档大约 100 遍后,我能找到的每个 Whosebug 问题,查看源代码,最后阅读 how to report bugs effectively 两遍。我已经到了@jmettraux 需要你帮助的地步:
应用程序
我正在编写一个内部应用程序,它允许我的团队创建定期 运行 将值保存到数据库的代码的指标。我有以下设置。
schema.rb
create_table "metrics", force: :cascade do |t|
t.string "frequency"
t.string "name"
t.text "data"
t.datetime "created_at", null: false
t.datetime "updated_at", null: false
t.enter code hereboolean "active"
end
initializers/scheduler.rb
require 'rufus-scheduler'
class Rufus::Scheduler::Job
def report
logger = Logger.new(STDOUT)
logger.info "Job: #{@id} Tags: #{@tags} Frequency: #{@frequency}"
end
end
SCHEDULER = Rufus::Scheduler.new(:lockfile => ".rufus-scheduler.lock")
unless defined?(Rails::Console) || File.split([=11=]).last == 'rake'
#Launch All Matrics Jobs that are active
@metrics = Metric.active
unless SCHEDULER.down?
@metrics.each_with_index do | metric, index |
SCHEDULER.every metric.frequency, :tags => metric.id, :overlap => false, :timeout => metric.try(:timeout) || '3m' do | job |
metric.add_value
job.report
end
end
end
end
什么有效
上面的代码完美运行,Rufus 正确初始化,所有作业都按预期启动。
问题
我 运行 遇到的一个问题是,一旦应用 运行 我需要能够取消安排作业并在保存指标时重新安排它们。我使用 Rufus 标签在创建标签时分配一个带有指标 ID 的标签,并使用它来加载它们以取消计划。
这是我当前的带有相关代码的度量模型。
models/metric.rb
class Metric < ApplicationRecord
...
after_save :update_job
private
def update_job
if self.changed.present?
job = SCHEDULER.jobs(tag: self.id).first
if job.present?
logger.info "Job #{job.id}, for Metric #{self.id} unscheduled."
SCHEDULER.unschedule(job.id)
end
if self.active
metric = self
new_job = SCHEDULER.every self.frequency, :tags => self.id, :overlap => false, :timeout => self.try(:timeout) || '3m' do | job |
metric.add_value
end
logger.info "Job #{new_job} for Metric #{self.id} scheduled."
end
end
end
end
真正的问题!
Scenario 1:
- Application Starts
- scheduler initilizer loads all active metrics
- scheduler initilizer starts all active metrics as 'every' job
- User edits an active metric setting it to inactive
- The update_job method is ran after the metric is saved
- inside the update_job method the job for the current metric is found successfully
- the unscheudle method is called on the job
- PROBLEM HERE: The job runs again even after being unscheduled
Scenario 2:
- Application Starts
- scheduler initilizer loads all active metrics
- scheduler initilizer starts all active metrics as 'every' job
- User edits an inactive metric setting it to active
- The update_job method is ran after the metric is saved
- inside the update_job method a new 'every' job is scheduled for the metric
- PROBLEM HERE: The job never gets started
我试过的
我尝试将以下代码添加到初始化程序的底部,以确保 unschedule 方法在我的应用程序中正常工作。
sleep 50
job = SCHEDULER.jobs(tag: 2).first
SCHEDULER.unschedule(job)
ID 为 2 的指标的频率为 20 秒。它 运行s 两次然后正确地未安排,所以我知道它在模型代码之外正常工作。
System/Application 信息
- Ruby 2.3.0
- Rails 5.0.0.1
- 彪马 3.6.0
- OSX 10.12.1 塞拉
- rufus-调度程序 3.2.2
第 2 阶段日志 - 12/20/2016
场景 1:
xxx SCHEDULER started: 70168058449780
xxx trigger (scheduled in initializer) Job: Rufus::Scheduler::EveryJob every_1482273018.01852_1684784125738549751 (70168045492220) Tags: ["2"] Frequency: 20.0
xxx Metric 2 #update_job active: false
xxx Metric 2 #update_job 0 SCHEDULER Rufus::Scheduler 3.2.2 70168058449780 down? false at_jobs: 0 in_jobs: 0 every_jobs: 2 interval_jobs: 0 cron_jobs: 0
xxx Metric 2 found Job Job: Rufus::Scheduler::EveryJob every_1482273018.01852_1684784125738549751 (70168045492220) Tags: ["2"] Frequency: 20.0
xxx Metric 2 Job every_1482273018.01852_1684784125738549751 unscheduled
xxx Metric 2 #update_job 1 SCHEDULER Rufus::Scheduler 3.2.2 70168058449780 down? false at_jobs: 0 in_jobs: 0 every_jobs: 1 interval_jobs: 0 cron_jobs: 0
xxx Metric 2 #update_job 2 SCHEDULER Rufus::Scheduler 3.2.2 70168058449780 down? false at_jobs: 0 in_jobs: 0 every_jobs: 1 interval_jobs: 0 cron_jobs: 0
xxx trigger (scheduled in initializer) Job: Rufus::Scheduler::EveryJob every_1482273018.01852_1684784125738549751 (70168045492220) Tags: ["2"] Frequency: 20.0
xxx trigger (scheduled in initializer) Job: Rufus::Scheduler::EveryJob every_1482273018.01852_1684784125738549751 (70168045492220) Tags: ["2"] Frequency: 20.0
场景 2:
xxx SCHEDULER started: 70272158805840
xxx Metric 2 #update_job active: true
xxx Metric 2 #update_job 0 SCHEDULER Rufus::Scheduler 3.2.2 70272158805840 down? false at_jobs: 0 in_jobs: 0 every_jobs: 1 interval_jobs: 0 cron_jobs: 0
xxx Metric 2 #update_job 1 SCHEDULER Rufus::Scheduler 3.2.2 70272158805840 down? false at_jobs: 0 in_jobs: 0 every_jobs: 1 interval_jobs: 0 cron_jobs: 0
xxx Metric 2 Job every_1482273159.799382_1489585858424637616 scheduled
xxx Metric 2 #update_job 2 SCHEDULER Rufus::Scheduler 3.2.2 70272158805840 down? false at_jobs: 0 in_jobs: 0 every_jobs: 2 interval_jobs: 0 cron_jobs: 0
第 3 阶段日志 - 2016 年 12 月 20 日
场景 1:
xxx SCHEDULER started: 70321240613760
xxx sc Rufus::Scheduler 3.2.2 70321240613760
xxx sc down? false
xxx sc Process.pid 39975
xxx sc jobs:
xxx sc 0: j: Rufus::Scheduler::EveryJob i: "every_1482335616.453129_4198855636205920237" oi: 70321198587560 ts: ["2"] frq: 20.0 ua: nil c: 0 nt: 2016-12-21 09:53:56 -0600
xxx initializer over.
xxx Metric 2 Job every_1482335616.453129_4198855636205920237 scheduled in initializer for Metric 2
xxx Metric 2 Job every_1482335616.453129_4198855636205920237 scheduled in initializer for Metric 2
xxx Metric 2 Job every_1482335616.453129_4198855636205920237 scheduled in initializer for Metric 2
xxx Metric 2 #update_job changed: ["active", "updated_at"] active: false
xxx Metric 2 sc #update_job 0 Rufus::Scheduler 3.2.2 70321240613760
xxx Metric 2 sc #update_job 0 down? false
xxx Metric 2 sc #update_job 0 Process.pid 40037
xxx Metric 2 sc #update_job 0 jobs:
xxx Metric 2 sc #update_job 0 0: j: Rufus::Scheduler::EveryJob i: "every_1482335616.453129_4198855636205920237" oi: 70321198587560 ts: ["2"] frq: 20.0 ua: nil c: 0 nt: 2016-12-21 09:53:56 -0600
xxx Metric 2 found Job j: Rufus::Scheduler::EveryJob i: "every_1482335616.453129_4198855636205920237" oi: 70321198587560 ts: ["2"] frq: 20.0 ua: nil c: 0 nt: 2016-12-21 09:53:56 -0600 first
xxx Metric 2 Job every_1482335616.453129_4198855636205920237 unscheduled
xxx Metric 2 Job every_1482335616.453129_4198855636205920237 scheduled? false
xxx Metric 2 sc #update_job 1 Rufus::Scheduler 3.2.2 70321240613760
xxx Metric 2 sc #update_job 1 down? false
xxx Metric 2 sc #update_job 1 Process.pid 40037
xxx Metric 2 sc #update_job 1 jobs:
xxx Metric 2 Job every_1482335616.453129_4198855636205920237 scheduled in initializer for Metric 2
场景 2:
xxx SCHEDULER started: 70152969118240
xxx sc Rufus::Scheduler 3.2.2 70152969118240
xxx sc down? false
xxx sc Process.pid 40811
xxx sc jobs:
xxx initializer over.
xxx Metric 2 #update_job changed: ["active", "updated_at"] active: true
xxx Metric 2 sc #update_job 0 Rufus::Scheduler 3.2.2 70152969118240
xxx Metric 2 sc #update_job 0 down? false
xxx Metric 2 sc #update_job 0 Process.pid 40872
xxx Metric 2 sc #update_job 0 jobs:
xxx Metric 2 sc #update_job 1 Rufus::Scheduler 3.2.2 70152969118240
xxx Metric 2 sc #update_job 1 down? false
xxx Metric 2 sc #update_job 1 Process.pid 40872
xxx Metric 2 sc #update_job 1 jobs:
xxx Metric 2 Job every_1482336051.9823449_3069051954259909338 scheduled in #update_job
xxx Metric 2 sc #update_job 2 Rufus::Scheduler 3.2.2 70152969118240
xxx Metric 2 sc #update_job 2 down? false
xxx Metric 2 sc #update_job 2 Process.pid 40872
xxx Metric 2 sc #update_job 2 jobs:
xxx Metric 2 sc #update_job 2 0: j: Rufus::Scheduler::EveryJob i: "every_1482336051.9823449_3069051954259909338" oi: 70152971745840 ts: ["2"] frq: 20.0 ua: nil c: 0 nt: 2016-12-21 10:01:11 -0600
When the above code runs for an active metric going inactive I successfully get the Logger messages and load the right job. As well, when I use better errors to inspect the code at that area the job itself sets the @unscheduled_at attribute correctly however the Job keeps getting rescheduled to run again.
On the other side of it when the update_job method runs and is trying to reschedule the job it never starts.
所以你是想说作业被重新安排了,但新的、重新安排的作业永远不会触发?
如果我天真地查看您的代码(为了更好地理解而重新散列):
def update_job
if self.changed.present?
job = SCHEDULER.jobs(tag: self.id).first
if job.present?
logger.info "Job #{job.id}, for Metric #{self.id} unscheduled."
job.unschedule
end
if self.active
metric = self
new_job_id =
SCHEDULER.every(
self.frequency,
:tags => self.id,
:overlap => false,
:timeout => self.try(:timeout) || '3m'
) { |job| metric.add_value }
logger.info "Job #{new_job_id} for Metric #{self.id} scheduled."
end
end
end
您描述的症状可以恢复到"application never enters the reschedule block of the update_job method"吗?
The Metric with ID 2 has a frequency of 20s. It runs twice then gets unscheduled correctly so I know its working properly outside of the model code.
所以它在您的代码模型中。小概率跟rufus-scheduler有关
请仔细改写您的"actual question"。在当前版本中,它让我感到困惑。
第 2 阶段 - 2016-12-21
你能试试下面的代码吗?
警告:它未经测试,它可能包含您一定会发现并修复以使其可用的错误。
然后尝试您的两个场景并报告,使用 "xxx " 此代码发出的日志 grep。它可能会告诉我们出了什么问题。
提前致谢。
# initializers/scheduler.rb
require 'rufus-scheduler'
class Rufus::Scheduler
def to_report_s
a = []
a << self.class
a << Rufus::Scheduler::VERSION
a << self.object_id
a << "down? #{self.down?}"
%w[ at in every internal cron ].each do |flav|
m = "#{flav}_jobs".to_sym
a << "#{m}: #{self.send(m).size}"
end
a.collect(&:to_s).join(' ')
end
end
class Rufus::Scheduler::Job
def to_report_s
"Job: #{self.class} #{@id} (#{self.object_id}) " +
"Tags: #{@tags.inspect} Frequency: #{@frequency}"
end
end
#SCHEDULER = Rufus::Scheduler.new(:lockfile => ".rufus-scheduler.lock")
SCHEDULER = Rufus::Scheduler.new
# Add "global" error handler to the rufus-scheduler instance
#
def SCHEDULER.on_error(job, error)
Rails.logger.error(
"xxx err#{error.object_id} rufus-scheduler intercepted #{error.inspect}" +
" in job #{job.to_report_s}")
error.backtrace.each_with_index do |line, i|
Rails.logger.error(
"xxx err#{error.object_id} #{i}: #{line}")
end
end
logger.info("xxx SCHEDULER started: #{SCHEDULER.object_id}")
unless (
defined?(Rails::Console) || File.split([=11=]).last == 'rake' ||
SCHEDULER.down?
)
# launch All Matrics Jobs that are active
Metric.active.each do |metric|
SCHEDULER.every(
metric.frequency,
:tags => metric.id,
:overlap => false,
:timeout => metric.try(:timeout) || '3m'
) do |job|
logger.info "xxx trigger (scheduled in initializer) #{job.to_report_s}"
metric.add_value
end
end
end
和
# models/metric.rb
class Metric < ApplicationRecord
after_save :update_job
private
def update_job
lip = "Metric #{id}" # logger info prefix
logger.info("xxx #{lip} #update_job active: #{self.active.inspect}")
if self.changed.present?
logger.info("xxx #{lip} #update_job 0 SCHEDULER #{SCHEDULER.to_report_s}")
#job = SCHEDULER.jobs(tag: self.id).first
#if job.present?
if job = SCHEDULER.jobs(tag: self.id).first
logger.info "xxx #{lip} found Job #{job.to_report_s}"
#SCHEDULER.unschedule(job.id)
job.unschedule
logger.info "xxx #{lip} Job #{job.id} unscheduled"
end
logger.info("xxx #{lip} #update_job 1 SCHEDULER #{SCHEDULER.to_report_s}")
if self.active
metric = self
job_id = SCHEDULER.every(
self.frequency,
:tags => self.id,
:overlap => false,
:timeout => self.try(:timeout) || '3m'
) do |job|
logger.info "xxx trigger (scheduled in #updateJob) #{job.to_report_s}"
metric.add_value
end
logger.info "xxx #{lip} Job #{job_id} scheduled"
end
logger.info("xxx #{lip} #update_job 2 SCHEDULER #{SCHEDULER.to_report_s}")
end
end
end
第 3 阶段 - 2016-12-21
能否请您使用以下代码重试:
# initializers/scheduler.rb
require 'rufus-scheduler'
class Rufus::Scheduler
def to_report_a
a = []
a << "#{self.class} #{Rufus::Scheduler::VERSION} #{object_id}"
a << "down? #{self.down?}"
a << "Process.pid #{Process.pid}"
a << "jobs:"
jobs.each_with_index { |job, i| a << " #{i}: #{job.to_report_s}" }
a.collect(&:to_s)
end
end
class Rufus::Scheduler::Job
def to_report_s
{
j: self.class, i: @id, oi: object_id, ts: @tags,
frq: @frequency, ua: @unscheduled_at, c: count, nt: next_time
}
.collect { |k, v| "#{k}: #{v.inspect}" }
.join(' ')
end
end
#SCHEDULER = Rufus::Scheduler.new(:lockfile => ".rufus-scheduler.lock")
SCHEDULER = Rufus::Scheduler.new
# Add "global" error handler to the rufus-scheduler instance
#
def SCHEDULER.on_error(job, error)
lep = "xxx err#{error.object_id}"
Rails.logger.error(
"xxx #{lep} rufus-scheduler intercepted #{error.inspect}" +
" in job #{job.to_report_s}")
error.backtrace.each_with_index do |line, i|
Rails.logger.error(
"xxx #{lep} #{i}: #{line}")
end
Rails.logger.error("xxx #{lep} scheduler:")
SCHEDULER.to_report_a.each { |l| Rails.logger.error("xxx #{lep} #{l}") }
end
logger.info("xxx SCHEDULER started: #{SCHEDULER.object_id}")
unless (
defined?(Rails::Console) || File.split([=13=]).last == 'rake' ||
SCHEDULER.down?
)
# launch All Matrics Jobs that are active
Metric.active.each do |metric|
lip = "Metric #{metric.id}" # logger info prefix
job_id =
SCHEDULER.every(
metric.frequency,
:tags => metric.id,
:overlap => false,
:timeout => metric.try(:timeout) || '3m'
) do |job|
logger.info(
"xxx #{lip} trigger (scheduled in initializer) " +
"#{job.to_report_s}")
metric.add_value
end
logger.info(
"xxx #{lip} Job #{job_id} scheduled in initializer " +
"for Metric #{metric.id}")
end
SCHEDULER.to_report_a.each { |l| logger.info("xxx sc #{l}") }
logger.info("xxx initializer over.")
end
和
# models/metric.rb
class Metric < ApplicationRecord
after_save :update_job
private
def update_job
lip = "Metric #{id}" # logger info prefix
logger.info(
"xxx #{lip} #update_job " +
"changed: #{self.changed.inspect} active: #{self.active.inspect}")
return unless self.changed.present?
SCHEDULER.to_report_a
.each { |l| logger.info("xxx #{lip} sc #update_job 0 #{l}") }
jobs = SCHEDULER.jobs(tag: self.id)
jobs.each_with_index do |job, i|
logger.info(
"xxx #{lip} found Job #{job.to_report_s} #{i == 0 ? 'first' : ''}")
end
if job = jobs.first
#SCHEDULER.unschedule(job.id)
job.unschedule
logger.info "xxx #{lip} Job #{job.id} unscheduled"
logger.info "xxx #{lip} Job #{job.id} scheduled? #{SCHEDULER.scheduled?(job.id)}"
end
SCHEDULER.to_report_a
.each { |l| logger.info("xxx #{lip} sc #update_job 1 #{l}") }
return unless self.active
metric = self
job_id = SCHEDULER.every(
self.frequency,
:tags => self.id,
:overlap => false,
:timeout => self.try(:timeout) || '3m'
) do |job|
logger.info "xxx trigger (scheduled in #update_job) #{job.to_report_s}"
metric.add_value
end
logger.info "xxx #{lip} Job #{job_id} scheduled in #update_job"
SCHEDULER.to_report_a
.each { |l| logger.info("xxx #{lip} sc #update_job 2 #{l}") }
end
end
第 4 阶段 - 2016-12-22
Added phase 3 logs to question. It looks like some how there is a new scheduler process that is subsequently created and then destroyed inside the model code. Thanks again for your diligence on this!
模型代码中真的会发生这种情况吗?您的日志告诉我们它发生在另一个进程中。您的初始 Ruby 进程实例化 rufus-scheduler 然后您的 HTTP 请求在工作进程中提供服务,这些工作进程是您初始进程的分支(没有线程,换句话说,使用不活动的调度程序)。
您正在集群模式下使用 Puma。我应该立即询问您的配置。
在 https://github.com/puma/puma#configuration
仔细阅读其文档
一个简单的解决方法是不使用集群模式,这样只涉及一个 Ruby 进程,为所有 HTTP 请求提供服务。
另一方面,如果需要集群模式,就得换个思路了。您可能不希望每个工作线程有 1 个 rufus-scheduler 实例。您可以专注于在主流程中拥有核心(实时)rufus-scheduler。它可能有一个 "management" 作业来检查最近更新的指标和 unschedules/schedules 个作业。
SCHEDULER.every '10s', overlap: false do
Metric.recently_updated.each do |metric|
SCHEDULER.jobs(tags: metric.id).each(&:unschedule)
SCHEDULER.every(metric.frequency, tags: self.id) { metric.add_value }
end
end
# or something like that...
玩得开心!
阅读 Rufus Scheduler 文档大约 100 遍后,我能找到的每个 Whosebug 问题,查看源代码,最后阅读 how to report bugs effectively 两遍。我已经到了@jmettraux 需要你帮助的地步:
应用程序
我正在编写一个内部应用程序,它允许我的团队创建定期 运行 将值保存到数据库的代码的指标。我有以下设置。
schema.rb
create_table "metrics", force: :cascade do |t|
t.string "frequency"
t.string "name"
t.text "data"
t.datetime "created_at", null: false
t.datetime "updated_at", null: false
t.enter code hereboolean "active"
end
initializers/scheduler.rb
require 'rufus-scheduler'
class Rufus::Scheduler::Job
def report
logger = Logger.new(STDOUT)
logger.info "Job: #{@id} Tags: #{@tags} Frequency: #{@frequency}"
end
end
SCHEDULER = Rufus::Scheduler.new(:lockfile => ".rufus-scheduler.lock")
unless defined?(Rails::Console) || File.split([=11=]).last == 'rake'
#Launch All Matrics Jobs that are active
@metrics = Metric.active
unless SCHEDULER.down?
@metrics.each_with_index do | metric, index |
SCHEDULER.every metric.frequency, :tags => metric.id, :overlap => false, :timeout => metric.try(:timeout) || '3m' do | job |
metric.add_value
job.report
end
end
end
end
什么有效
上面的代码完美运行,Rufus 正确初始化,所有作业都按预期启动。
问题
我 运行 遇到的一个问题是,一旦应用 运行 我需要能够取消安排作业并在保存指标时重新安排它们。我使用 Rufus 标签在创建标签时分配一个带有指标 ID 的标签,并使用它来加载它们以取消计划。 这是我当前的带有相关代码的度量模型。
models/metric.rb
class Metric < ApplicationRecord
...
after_save :update_job
private
def update_job
if self.changed.present?
job = SCHEDULER.jobs(tag: self.id).first
if job.present?
logger.info "Job #{job.id}, for Metric #{self.id} unscheduled."
SCHEDULER.unschedule(job.id)
end
if self.active
metric = self
new_job = SCHEDULER.every self.frequency, :tags => self.id, :overlap => false, :timeout => self.try(:timeout) || '3m' do | job |
metric.add_value
end
logger.info "Job #{new_job} for Metric #{self.id} scheduled."
end
end
end
end
真正的问题!
Scenario 1:
- Application Starts
- scheduler initilizer loads all active metrics
- scheduler initilizer starts all active metrics as 'every' job
- User edits an active metric setting it to inactive
- The update_job method is ran after the metric is saved
- inside the update_job method the job for the current metric is found successfully
- the unscheudle method is called on the job
- PROBLEM HERE: The job runs again even after being unscheduled
Scenario 2:
- Application Starts
- scheduler initilizer loads all active metrics
- scheduler initilizer starts all active metrics as 'every' job
- User edits an inactive metric setting it to active
- The update_job method is ran after the metric is saved
- inside the update_job method a new 'every' job is scheduled for the metric
- PROBLEM HERE: The job never gets started
我试过的
我尝试将以下代码添加到初始化程序的底部,以确保 unschedule 方法在我的应用程序中正常工作。
sleep 50 job = SCHEDULER.jobs(tag: 2).first SCHEDULER.unschedule(job)
ID 为 2 的指标的频率为 20 秒。它 运行s 两次然后正确地未安排,所以我知道它在模型代码之外正常工作。
System/Application 信息
- Ruby 2.3.0
- Rails 5.0.0.1
- 彪马 3.6.0
- OSX 10.12.1 塞拉
- rufus-调度程序 3.2.2
第 2 阶段日志 - 12/20/2016
场景 1:
xxx SCHEDULER started: 70168058449780
xxx trigger (scheduled in initializer) Job: Rufus::Scheduler::EveryJob every_1482273018.01852_1684784125738549751 (70168045492220) Tags: ["2"] Frequency: 20.0
xxx Metric 2 #update_job active: false
xxx Metric 2 #update_job 0 SCHEDULER Rufus::Scheduler 3.2.2 70168058449780 down? false at_jobs: 0 in_jobs: 0 every_jobs: 2 interval_jobs: 0 cron_jobs: 0
xxx Metric 2 found Job Job: Rufus::Scheduler::EveryJob every_1482273018.01852_1684784125738549751 (70168045492220) Tags: ["2"] Frequency: 20.0
xxx Metric 2 Job every_1482273018.01852_1684784125738549751 unscheduled
xxx Metric 2 #update_job 1 SCHEDULER Rufus::Scheduler 3.2.2 70168058449780 down? false at_jobs: 0 in_jobs: 0 every_jobs: 1 interval_jobs: 0 cron_jobs: 0
xxx Metric 2 #update_job 2 SCHEDULER Rufus::Scheduler 3.2.2 70168058449780 down? false at_jobs: 0 in_jobs: 0 every_jobs: 1 interval_jobs: 0 cron_jobs: 0
xxx trigger (scheduled in initializer) Job: Rufus::Scheduler::EveryJob every_1482273018.01852_1684784125738549751 (70168045492220) Tags: ["2"] Frequency: 20.0
xxx trigger (scheduled in initializer) Job: Rufus::Scheduler::EveryJob every_1482273018.01852_1684784125738549751 (70168045492220) Tags: ["2"] Frequency: 20.0
场景 2:
xxx SCHEDULER started: 70272158805840
xxx Metric 2 #update_job active: true
xxx Metric 2 #update_job 0 SCHEDULER Rufus::Scheduler 3.2.2 70272158805840 down? false at_jobs: 0 in_jobs: 0 every_jobs: 1 interval_jobs: 0 cron_jobs: 0
xxx Metric 2 #update_job 1 SCHEDULER Rufus::Scheduler 3.2.2 70272158805840 down? false at_jobs: 0 in_jobs: 0 every_jobs: 1 interval_jobs: 0 cron_jobs: 0
xxx Metric 2 Job every_1482273159.799382_1489585858424637616 scheduled
xxx Metric 2 #update_job 2 SCHEDULER Rufus::Scheduler 3.2.2 70272158805840 down? false at_jobs: 0 in_jobs: 0 every_jobs: 2 interval_jobs: 0 cron_jobs: 0
第 3 阶段日志 - 2016 年 12 月 20 日
场景 1:
xxx SCHEDULER started: 70321240613760
xxx sc Rufus::Scheduler 3.2.2 70321240613760
xxx sc down? false
xxx sc Process.pid 39975
xxx sc jobs:
xxx sc 0: j: Rufus::Scheduler::EveryJob i: "every_1482335616.453129_4198855636205920237" oi: 70321198587560 ts: ["2"] frq: 20.0 ua: nil c: 0 nt: 2016-12-21 09:53:56 -0600
xxx initializer over.
xxx Metric 2 Job every_1482335616.453129_4198855636205920237 scheduled in initializer for Metric 2
xxx Metric 2 Job every_1482335616.453129_4198855636205920237 scheduled in initializer for Metric 2
xxx Metric 2 Job every_1482335616.453129_4198855636205920237 scheduled in initializer for Metric 2
xxx Metric 2 #update_job changed: ["active", "updated_at"] active: false
xxx Metric 2 sc #update_job 0 Rufus::Scheduler 3.2.2 70321240613760
xxx Metric 2 sc #update_job 0 down? false
xxx Metric 2 sc #update_job 0 Process.pid 40037
xxx Metric 2 sc #update_job 0 jobs:
xxx Metric 2 sc #update_job 0 0: j: Rufus::Scheduler::EveryJob i: "every_1482335616.453129_4198855636205920237" oi: 70321198587560 ts: ["2"] frq: 20.0 ua: nil c: 0 nt: 2016-12-21 09:53:56 -0600
xxx Metric 2 found Job j: Rufus::Scheduler::EveryJob i: "every_1482335616.453129_4198855636205920237" oi: 70321198587560 ts: ["2"] frq: 20.0 ua: nil c: 0 nt: 2016-12-21 09:53:56 -0600 first
xxx Metric 2 Job every_1482335616.453129_4198855636205920237 unscheduled
xxx Metric 2 Job every_1482335616.453129_4198855636205920237 scheduled? false
xxx Metric 2 sc #update_job 1 Rufus::Scheduler 3.2.2 70321240613760
xxx Metric 2 sc #update_job 1 down? false
xxx Metric 2 sc #update_job 1 Process.pid 40037
xxx Metric 2 sc #update_job 1 jobs:
xxx Metric 2 Job every_1482335616.453129_4198855636205920237 scheduled in initializer for Metric 2
场景 2:
xxx SCHEDULER started: 70152969118240
xxx sc Rufus::Scheduler 3.2.2 70152969118240
xxx sc down? false
xxx sc Process.pid 40811
xxx sc jobs:
xxx initializer over.
xxx Metric 2 #update_job changed: ["active", "updated_at"] active: true
xxx Metric 2 sc #update_job 0 Rufus::Scheduler 3.2.2 70152969118240
xxx Metric 2 sc #update_job 0 down? false
xxx Metric 2 sc #update_job 0 Process.pid 40872
xxx Metric 2 sc #update_job 0 jobs:
xxx Metric 2 sc #update_job 1 Rufus::Scheduler 3.2.2 70152969118240
xxx Metric 2 sc #update_job 1 down? false
xxx Metric 2 sc #update_job 1 Process.pid 40872
xxx Metric 2 sc #update_job 1 jobs:
xxx Metric 2 Job every_1482336051.9823449_3069051954259909338 scheduled in #update_job
xxx Metric 2 sc #update_job 2 Rufus::Scheduler 3.2.2 70152969118240
xxx Metric 2 sc #update_job 2 down? false
xxx Metric 2 sc #update_job 2 Process.pid 40872
xxx Metric 2 sc #update_job 2 jobs:
xxx Metric 2 sc #update_job 2 0: j: Rufus::Scheduler::EveryJob i: "every_1482336051.9823449_3069051954259909338" oi: 70152971745840 ts: ["2"] frq: 20.0 ua: nil c: 0 nt: 2016-12-21 10:01:11 -0600
When the above code runs for an active metric going inactive I successfully get the Logger messages and load the right job. As well, when I use better errors to inspect the code at that area the job itself sets the @unscheduled_at attribute correctly however the Job keeps getting rescheduled to run again.
On the other side of it when the update_job method runs and is trying to reschedule the job it never starts.
所以你是想说作业被重新安排了,但新的、重新安排的作业永远不会触发?
如果我天真地查看您的代码(为了更好地理解而重新散列):
def update_job
if self.changed.present?
job = SCHEDULER.jobs(tag: self.id).first
if job.present?
logger.info "Job #{job.id}, for Metric #{self.id} unscheduled."
job.unschedule
end
if self.active
metric = self
new_job_id =
SCHEDULER.every(
self.frequency,
:tags => self.id,
:overlap => false,
:timeout => self.try(:timeout) || '3m'
) { |job| metric.add_value }
logger.info "Job #{new_job_id} for Metric #{self.id} scheduled."
end
end
end
您描述的症状可以恢复到"application never enters the reschedule block of the update_job method"吗?
The Metric with ID 2 has a frequency of 20s. It runs twice then gets unscheduled correctly so I know its working properly outside of the model code.
所以它在您的代码模型中。小概率跟rufus-scheduler有关
请仔细改写您的"actual question"。在当前版本中,它让我感到困惑。
第 2 阶段 - 2016-12-21
你能试试下面的代码吗?
警告:它未经测试,它可能包含您一定会发现并修复以使其可用的错误。
然后尝试您的两个场景并报告,使用 "xxx " 此代码发出的日志 grep。它可能会告诉我们出了什么问题。
提前致谢。
# initializers/scheduler.rb
require 'rufus-scheduler'
class Rufus::Scheduler
def to_report_s
a = []
a << self.class
a << Rufus::Scheduler::VERSION
a << self.object_id
a << "down? #{self.down?}"
%w[ at in every internal cron ].each do |flav|
m = "#{flav}_jobs".to_sym
a << "#{m}: #{self.send(m).size}"
end
a.collect(&:to_s).join(' ')
end
end
class Rufus::Scheduler::Job
def to_report_s
"Job: #{self.class} #{@id} (#{self.object_id}) " +
"Tags: #{@tags.inspect} Frequency: #{@frequency}"
end
end
#SCHEDULER = Rufus::Scheduler.new(:lockfile => ".rufus-scheduler.lock")
SCHEDULER = Rufus::Scheduler.new
# Add "global" error handler to the rufus-scheduler instance
#
def SCHEDULER.on_error(job, error)
Rails.logger.error(
"xxx err#{error.object_id} rufus-scheduler intercepted #{error.inspect}" +
" in job #{job.to_report_s}")
error.backtrace.each_with_index do |line, i|
Rails.logger.error(
"xxx err#{error.object_id} #{i}: #{line}")
end
end
logger.info("xxx SCHEDULER started: #{SCHEDULER.object_id}")
unless (
defined?(Rails::Console) || File.split([=11=]).last == 'rake' ||
SCHEDULER.down?
)
# launch All Matrics Jobs that are active
Metric.active.each do |metric|
SCHEDULER.every(
metric.frequency,
:tags => metric.id,
:overlap => false,
:timeout => metric.try(:timeout) || '3m'
) do |job|
logger.info "xxx trigger (scheduled in initializer) #{job.to_report_s}"
metric.add_value
end
end
end
和
# models/metric.rb
class Metric < ApplicationRecord
after_save :update_job
private
def update_job
lip = "Metric #{id}" # logger info prefix
logger.info("xxx #{lip} #update_job active: #{self.active.inspect}")
if self.changed.present?
logger.info("xxx #{lip} #update_job 0 SCHEDULER #{SCHEDULER.to_report_s}")
#job = SCHEDULER.jobs(tag: self.id).first
#if job.present?
if job = SCHEDULER.jobs(tag: self.id).first
logger.info "xxx #{lip} found Job #{job.to_report_s}"
#SCHEDULER.unschedule(job.id)
job.unschedule
logger.info "xxx #{lip} Job #{job.id} unscheduled"
end
logger.info("xxx #{lip} #update_job 1 SCHEDULER #{SCHEDULER.to_report_s}")
if self.active
metric = self
job_id = SCHEDULER.every(
self.frequency,
:tags => self.id,
:overlap => false,
:timeout => self.try(:timeout) || '3m'
) do |job|
logger.info "xxx trigger (scheduled in #updateJob) #{job.to_report_s}"
metric.add_value
end
logger.info "xxx #{lip} Job #{job_id} scheduled"
end
logger.info("xxx #{lip} #update_job 2 SCHEDULER #{SCHEDULER.to_report_s}")
end
end
end
第 3 阶段 - 2016-12-21
能否请您使用以下代码重试:
# initializers/scheduler.rb
require 'rufus-scheduler'
class Rufus::Scheduler
def to_report_a
a = []
a << "#{self.class} #{Rufus::Scheduler::VERSION} #{object_id}"
a << "down? #{self.down?}"
a << "Process.pid #{Process.pid}"
a << "jobs:"
jobs.each_with_index { |job, i| a << " #{i}: #{job.to_report_s}" }
a.collect(&:to_s)
end
end
class Rufus::Scheduler::Job
def to_report_s
{
j: self.class, i: @id, oi: object_id, ts: @tags,
frq: @frequency, ua: @unscheduled_at, c: count, nt: next_time
}
.collect { |k, v| "#{k}: #{v.inspect}" }
.join(' ')
end
end
#SCHEDULER = Rufus::Scheduler.new(:lockfile => ".rufus-scheduler.lock")
SCHEDULER = Rufus::Scheduler.new
# Add "global" error handler to the rufus-scheduler instance
#
def SCHEDULER.on_error(job, error)
lep = "xxx err#{error.object_id}"
Rails.logger.error(
"xxx #{lep} rufus-scheduler intercepted #{error.inspect}" +
" in job #{job.to_report_s}")
error.backtrace.each_with_index do |line, i|
Rails.logger.error(
"xxx #{lep} #{i}: #{line}")
end
Rails.logger.error("xxx #{lep} scheduler:")
SCHEDULER.to_report_a.each { |l| Rails.logger.error("xxx #{lep} #{l}") }
end
logger.info("xxx SCHEDULER started: #{SCHEDULER.object_id}")
unless (
defined?(Rails::Console) || File.split([=13=]).last == 'rake' ||
SCHEDULER.down?
)
# launch All Matrics Jobs that are active
Metric.active.each do |metric|
lip = "Metric #{metric.id}" # logger info prefix
job_id =
SCHEDULER.every(
metric.frequency,
:tags => metric.id,
:overlap => false,
:timeout => metric.try(:timeout) || '3m'
) do |job|
logger.info(
"xxx #{lip} trigger (scheduled in initializer) " +
"#{job.to_report_s}")
metric.add_value
end
logger.info(
"xxx #{lip} Job #{job_id} scheduled in initializer " +
"for Metric #{metric.id}")
end
SCHEDULER.to_report_a.each { |l| logger.info("xxx sc #{l}") }
logger.info("xxx initializer over.")
end
和
# models/metric.rb
class Metric < ApplicationRecord
after_save :update_job
private
def update_job
lip = "Metric #{id}" # logger info prefix
logger.info(
"xxx #{lip} #update_job " +
"changed: #{self.changed.inspect} active: #{self.active.inspect}")
return unless self.changed.present?
SCHEDULER.to_report_a
.each { |l| logger.info("xxx #{lip} sc #update_job 0 #{l}") }
jobs = SCHEDULER.jobs(tag: self.id)
jobs.each_with_index do |job, i|
logger.info(
"xxx #{lip} found Job #{job.to_report_s} #{i == 0 ? 'first' : ''}")
end
if job = jobs.first
#SCHEDULER.unschedule(job.id)
job.unschedule
logger.info "xxx #{lip} Job #{job.id} unscheduled"
logger.info "xxx #{lip} Job #{job.id} scheduled? #{SCHEDULER.scheduled?(job.id)}"
end
SCHEDULER.to_report_a
.each { |l| logger.info("xxx #{lip} sc #update_job 1 #{l}") }
return unless self.active
metric = self
job_id = SCHEDULER.every(
self.frequency,
:tags => self.id,
:overlap => false,
:timeout => self.try(:timeout) || '3m'
) do |job|
logger.info "xxx trigger (scheduled in #update_job) #{job.to_report_s}"
metric.add_value
end
logger.info "xxx #{lip} Job #{job_id} scheduled in #update_job"
SCHEDULER.to_report_a
.each { |l| logger.info("xxx #{lip} sc #update_job 2 #{l}") }
end
end
第 4 阶段 - 2016-12-22
Added phase 3 logs to question. It looks like some how there is a new scheduler process that is subsequently created and then destroyed inside the model code. Thanks again for your diligence on this!
模型代码中真的会发生这种情况吗?您的日志告诉我们它发生在另一个进程中。您的初始 Ruby 进程实例化 rufus-scheduler 然后您的 HTTP 请求在工作进程中提供服务,这些工作进程是您初始进程的分支(没有线程,换句话说,使用不活动的调度程序)。
您正在集群模式下使用 Puma。我应该立即询问您的配置。
在 https://github.com/puma/puma#configuration
仔细阅读其文档一个简单的解决方法是不使用集群模式,这样只涉及一个 Ruby 进程,为所有 HTTP 请求提供服务。
另一方面,如果需要集群模式,就得换个思路了。您可能不希望每个工作线程有 1 个 rufus-scheduler 实例。您可以专注于在主流程中拥有核心(实时)rufus-scheduler。它可能有一个 "management" 作业来检查最近更新的指标和 unschedules/schedules 个作业。
SCHEDULER.every '10s', overlap: false do
Metric.recently_updated.each do |metric|
SCHEDULER.jobs(tags: metric.id).each(&:unschedule)
SCHEDULER.every(metric.frequency, tags: self.id) { metric.add_value }
end
end
# or something like that...
玩得开心!