Rails ActiveJob 显示过时的数据库数据
Rails ActiveJob showing outdated DB data
我要解决的问题:
多个项目connect/make更新到同一个数据库。 “Project A”的任务是更新数据库,“Project B”是客户端界面,负责显示更新的数据。
解决方案:
当项目 A 计划更新记录时,项目 B 将创建一个后台作业到 运行 15 分钟捕获所做的更新,通过 ActionCable(webhooks)每 15 秒更新一次界面。
问题:
ActiveJob 卡在“3 remaining records
”,没有注意到任何数据库更改。如果直接调用模型(在作业之外),一切都会按预期进行;注意到数据库更改,客户端界面得到更新,大家都很高兴。
但是一旦从作业中调用它(到 运行 后台任务),它就会卡住。
调用模型逻辑的作业:
class BatchWatchJob < ApplicationJob
queue_as :default
def perform(**args)
if args[:batch_number]
p "=== Begin - BatchWatch for batch_number: #{args[:batch_number]} ==="
begin
# Set 15 minute timeout
# ReImport.watch_batch() not updating
# it's count when DB data changes :/
# BUT if I call the method directly (not from this Job), it works
Timeout.timeout(900) { ReImport.watch_batch(args[:batch_number]) }
rescue Timeout::Error
puts '=== TIMEOUT ERROR ==='
puts 'BatchWatch job failed to finish in the requisite amount of time (15 minutes)'
end
else
p "=== Unable to start BatchWatch, batch_number not specified ==="
end
end
end
包含我的逻辑的模型:
class ReImport < ApplicationRecord
[...]
def self.watch_batch(num)
batch = ReImport.where(batch: num)
total = batch.count
remaining = batch.where(completed: false) # NOTE: starts with 3 remaining
remaining_count = remaining.count
p "initial remaining: #{remaining_count}"
while !remaining.empty? do
p 'wait 10 seconds...'
sleep(10) # wait 10 seconds
p '...seconds finished...'
# batch.reload # doesn't work
# remaining.reload # doesn't work
batch = ReImport.where(batch: num)
remaining = batch.where(completed: false) # NOTE: this should now be 2
p "remaining_count: #{remaining_count}"
p "remaining . count: #{remaining.count}"
p "(remaining_count > remaining.count): #{(remaining_count > remaining.count)}"
if remaining_count > remaining.count
p '=== WATCH_BATCH: update found! ==='
# Update count
remaining_count = remaining.count
# Broadcast DataTables to update records
ReImportBatchChannel.broadcast_to("re_import_batch_#{num}", {update: true})
else
p '=== WATCH_BATCH: nothing changed yet ==='
end
end
p '=== WATCH_BATCH COMPLETED (or timeout reached) ==='
end
[...]
end
Rails 控制台 - 输出:
> BatchWatchJob.perform_later(batch_number: 7)
Enqueued BatchWatchJob (Job ID: 7d1beeaf-c4d8-4489-885c-13b44d1037cf) to Async(default) with arguments: {:batch_number=>7}
=> #<BatchWatchJob:0x000000055c2100 @arguments=[{:batch_number=>7}], @job_id="7d1beeaf-c4d8-4489-885c-13b44d1037cf", @queue_name="default", @priority=nil, @executions=0, @provider_job_id="011e004c-34f5-4c7a-9925-052df1aa1774">
Performing BatchWatchJob (Job ID: 7d1beeaf-c4d8-4489-885c-13b44d1037cf) from Async(default) with arguments: {:batch_number=>7}
"=== Begin - BatchWatch for batch_number: 7 ==="
(2.1ms) SET @@SESSION.sql_mode = CONCAT(CONCAT(@@sql_mode, ',STRICT_ALL_TABLES'), ',NO_AUTO_VALUE_ON_ZERO'), @@SESSION.sql_auto_is_null = 0, @@SESSION.wait_timeout = 2147483
(0.9ms) SELECT COUNT(*) FROM `re_imports` WHERE `re_imports`.`batch` = 7
(0.7ms) SELECT COUNT(*) FROM `re_imports` WHERE `re_imports`.`batch` = 7 AND `re_imports`.`completed` = 0
"initial remaining: 3"
ReImport Exists (0.5ms) SELECT 1 AS one FROM `re_imports` WHERE `re_imports`.`batch` = 7 AND `re_imports`.`completed` = 0 LIMIT 1
"wait 10 seconds..."
"...seconds finished..."
"remaining_count: 3"
CACHE (0.0ms) SELECT COUNT(*) FROM `re_imports` WHERE `re_imports`.`batch` = 7 AND `re_imports`.`completed` = 0 [["batch", 7], ["completed", 0]]
"remaining . count: 3"
CACHE (0.0ms) SELECT COUNT(*) FROM `re_imports` WHERE `re_imports`.`batch` = 7 AND `re_imports`.`completed` = 0 [["batch", 7], ["completed", 0]]
"(remaining_count > remaining.count): false"
CACHE (0.0ms) SELECT COUNT(*) FROM `re_imports` WHERE `re_imports`.`batch` = 7 AND `re_imports`.`completed` = 0 [["batch", 7], ["completed", 0]]
"=== WATCH_BATCH: nothing changed yet ==="
CACHE ReImport Exists (0.0ms) SELECT 1 AS one FROM `re_imports` WHERE `re_imports`.`batch` = 7 AND `re_imports`.`completed` = 0 LIMIT 1 [["batch", 7], ["completed", 0], ["LIMIT", 1]]
"wait 10 seconds..."
"...seconds finished..."
"remaining_count: 3"
CACHE (0.0ms) SELECT COUNT(*) FROM `re_imports` WHERE `re_imports`.`batch` = 7 AND `re_imports`.`completed` = 0 [["batch", 7], ["completed", 0]]
"remaining . count: 3"
CACHE (0.0ms) SELECT COUNT(*) FROM `re_imports` WHERE `re_imports`.`batch` = 7 AND `re_imports`.`completed` = 0 [["batch", 7], ["completed", 0]]
"(remaining_count > remaining.count): false"
CACHE (0.0ms) SELECT COUNT(*) FROM `re_imports` WHERE `re_imports`.`batch` = 7 AND `re_imports`.`completed` = 0 [["batch", 7], ["completed", 0]]
"=== WATCH_BATCH: nothing changed yet ==="
CACHE ReImport Exists (0.0ms) SELECT 1 AS one FROM `re_imports` WHERE `re_imports`.`batch` = 7 AND `re_imports`.`completed` = 0 LIMIT 1 [["batch", 7], ["completed", 0], ["LIMIT", 1]]
"wait 10 seconds..."
[...]
Performed BatchWatchJob (Job ID: 7d1beeaf-c4d8-4489-885c-13b44d1037cf) from Async(default) in 22863.44ms
我不只是注意到 CACHE ReImport Exists
,还在想办法关闭它...
我发现问题是 SQL 缓存 ,并通过在 while 循环中使用 ActiveRecords 未缓存方法 ReImport.uncached do
修复了它 像这样:
型号
class ReImport < ApplicationRecord
[...]
def self.watch_batch(num)
batch = ReImport.where(batch: num)
total = batch.count
remaining = batch.where(completed: false)
remaining_count = remaining.count
p "initial remaining: #{remaining_count}"
while !remaining.empty? do
ReImport.uncached do # <--- UNCACHE SQL QUERIES
p 'wait 10 seconds...'
sleep(10) # wait 10 seconds
p '...seconds finished...'
batch = ReImport.where(batch: num)
remaining = batch.where(completed: false)
p "remaining_count: #{remaining_count}"
p "remaining . count: #{remaining.count}"
p "(remaining_count > remaining.count): #{(remaining_count > remaining.count)}"
if remaining_count > remaining.count
p '=== WATCH_BATCH: update found! ==='
# Update count
remaining_count = remaining.count
# Broadcast DataTables to update records
ReImportBatchChannel.broadcast_to("re_import_batch_#{num}", {update: true})
else
p '=== WATCH_BATCH: nothing changed yet ==='
end
end
end
p '=== WATCH_BATCH COMPLETED (or timeout reached) ==='
end
[...]
end
我要解决的问题:
多个项目connect/make更新到同一个数据库。 “Project A”的任务是更新数据库,“Project B”是客户端界面,负责显示更新的数据。
解决方案:
当项目 A 计划更新记录时,项目 B 将创建一个后台作业到 运行 15 分钟捕获所做的更新,通过 ActionCable(webhooks)每 15 秒更新一次界面。
问题:
ActiveJob 卡在“3 remaining records
”,没有注意到任何数据库更改。如果直接调用模型(在作业之外),一切都会按预期进行;注意到数据库更改,客户端界面得到更新,大家都很高兴。
但是一旦从作业中调用它(到 运行 后台任务),它就会卡住。
调用模型逻辑的作业:
class BatchWatchJob < ApplicationJob
queue_as :default
def perform(**args)
if args[:batch_number]
p "=== Begin - BatchWatch for batch_number: #{args[:batch_number]} ==="
begin
# Set 15 minute timeout
# ReImport.watch_batch() not updating
# it's count when DB data changes :/
# BUT if I call the method directly (not from this Job), it works
Timeout.timeout(900) { ReImport.watch_batch(args[:batch_number]) }
rescue Timeout::Error
puts '=== TIMEOUT ERROR ==='
puts 'BatchWatch job failed to finish in the requisite amount of time (15 minutes)'
end
else
p "=== Unable to start BatchWatch, batch_number not specified ==="
end
end
end
包含我的逻辑的模型:
class ReImport < ApplicationRecord
[...]
def self.watch_batch(num)
batch = ReImport.where(batch: num)
total = batch.count
remaining = batch.where(completed: false) # NOTE: starts with 3 remaining
remaining_count = remaining.count
p "initial remaining: #{remaining_count}"
while !remaining.empty? do
p 'wait 10 seconds...'
sleep(10) # wait 10 seconds
p '...seconds finished...'
# batch.reload # doesn't work
# remaining.reload # doesn't work
batch = ReImport.where(batch: num)
remaining = batch.where(completed: false) # NOTE: this should now be 2
p "remaining_count: #{remaining_count}"
p "remaining . count: #{remaining.count}"
p "(remaining_count > remaining.count): #{(remaining_count > remaining.count)}"
if remaining_count > remaining.count
p '=== WATCH_BATCH: update found! ==='
# Update count
remaining_count = remaining.count
# Broadcast DataTables to update records
ReImportBatchChannel.broadcast_to("re_import_batch_#{num}", {update: true})
else
p '=== WATCH_BATCH: nothing changed yet ==='
end
end
p '=== WATCH_BATCH COMPLETED (or timeout reached) ==='
end
[...]
end
Rails 控制台 - 输出:
> BatchWatchJob.perform_later(batch_number: 7)
Enqueued BatchWatchJob (Job ID: 7d1beeaf-c4d8-4489-885c-13b44d1037cf) to Async(default) with arguments: {:batch_number=>7}
=> #<BatchWatchJob:0x000000055c2100 @arguments=[{:batch_number=>7}], @job_id="7d1beeaf-c4d8-4489-885c-13b44d1037cf", @queue_name="default", @priority=nil, @executions=0, @provider_job_id="011e004c-34f5-4c7a-9925-052df1aa1774">
Performing BatchWatchJob (Job ID: 7d1beeaf-c4d8-4489-885c-13b44d1037cf) from Async(default) with arguments: {:batch_number=>7}
"=== Begin - BatchWatch for batch_number: 7 ==="
(2.1ms) SET @@SESSION.sql_mode = CONCAT(CONCAT(@@sql_mode, ',STRICT_ALL_TABLES'), ',NO_AUTO_VALUE_ON_ZERO'), @@SESSION.sql_auto_is_null = 0, @@SESSION.wait_timeout = 2147483
(0.9ms) SELECT COUNT(*) FROM `re_imports` WHERE `re_imports`.`batch` = 7
(0.7ms) SELECT COUNT(*) FROM `re_imports` WHERE `re_imports`.`batch` = 7 AND `re_imports`.`completed` = 0
"initial remaining: 3"
ReImport Exists (0.5ms) SELECT 1 AS one FROM `re_imports` WHERE `re_imports`.`batch` = 7 AND `re_imports`.`completed` = 0 LIMIT 1
"wait 10 seconds..."
"...seconds finished..."
"remaining_count: 3"
CACHE (0.0ms) SELECT COUNT(*) FROM `re_imports` WHERE `re_imports`.`batch` = 7 AND `re_imports`.`completed` = 0 [["batch", 7], ["completed", 0]]
"remaining . count: 3"
CACHE (0.0ms) SELECT COUNT(*) FROM `re_imports` WHERE `re_imports`.`batch` = 7 AND `re_imports`.`completed` = 0 [["batch", 7], ["completed", 0]]
"(remaining_count > remaining.count): false"
CACHE (0.0ms) SELECT COUNT(*) FROM `re_imports` WHERE `re_imports`.`batch` = 7 AND `re_imports`.`completed` = 0 [["batch", 7], ["completed", 0]]
"=== WATCH_BATCH: nothing changed yet ==="
CACHE ReImport Exists (0.0ms) SELECT 1 AS one FROM `re_imports` WHERE `re_imports`.`batch` = 7 AND `re_imports`.`completed` = 0 LIMIT 1 [["batch", 7], ["completed", 0], ["LIMIT", 1]]
"wait 10 seconds..."
"...seconds finished..."
"remaining_count: 3"
CACHE (0.0ms) SELECT COUNT(*) FROM `re_imports` WHERE `re_imports`.`batch` = 7 AND `re_imports`.`completed` = 0 [["batch", 7], ["completed", 0]]
"remaining . count: 3"
CACHE (0.0ms) SELECT COUNT(*) FROM `re_imports` WHERE `re_imports`.`batch` = 7 AND `re_imports`.`completed` = 0 [["batch", 7], ["completed", 0]]
"(remaining_count > remaining.count): false"
CACHE (0.0ms) SELECT COUNT(*) FROM `re_imports` WHERE `re_imports`.`batch` = 7 AND `re_imports`.`completed` = 0 [["batch", 7], ["completed", 0]]
"=== WATCH_BATCH: nothing changed yet ==="
CACHE ReImport Exists (0.0ms) SELECT 1 AS one FROM `re_imports` WHERE `re_imports`.`batch` = 7 AND `re_imports`.`completed` = 0 LIMIT 1 [["batch", 7], ["completed", 0], ["LIMIT", 1]]
"wait 10 seconds..."
[...]
Performed BatchWatchJob (Job ID: 7d1beeaf-c4d8-4489-885c-13b44d1037cf) from Async(default) in 22863.44ms
我不只是注意到 CACHE ReImport Exists
,还在想办法关闭它...
我发现问题是 SQL 缓存 ,并通过在 while 循环中使用 ActiveRecords 未缓存方法 ReImport.uncached do
修复了它 像这样:
型号
class ReImport < ApplicationRecord
[...]
def self.watch_batch(num)
batch = ReImport.where(batch: num)
total = batch.count
remaining = batch.where(completed: false)
remaining_count = remaining.count
p "initial remaining: #{remaining_count}"
while !remaining.empty? do
ReImport.uncached do # <--- UNCACHE SQL QUERIES
p 'wait 10 seconds...'
sleep(10) # wait 10 seconds
p '...seconds finished...'
batch = ReImport.where(batch: num)
remaining = batch.where(completed: false)
p "remaining_count: #{remaining_count}"
p "remaining . count: #{remaining.count}"
p "(remaining_count > remaining.count): #{(remaining_count > remaining.count)}"
if remaining_count > remaining.count
p '=== WATCH_BATCH: update found! ==='
# Update count
remaining_count = remaining.count
# Broadcast DataTables to update records
ReImportBatchChannel.broadcast_to("re_import_batch_#{num}", {update: true})
else
p '=== WATCH_BATCH: nothing changed yet ==='
end
end
end
p '=== WATCH_BATCH COMPLETED (or timeout reached) ==='
end
[...]
end