Rails ActiveJob 显示过时的数据库数据

Rails ActiveJob showing outdated DB data

我要解决的问题:

多个项目connect/make更新到同一个数据库。 “Project A”的任务是更新数据库,“Project B”是客户端界面,负责显示更新的数据。

解决方案:

项目 A 计划更新记录时,项目 B 将创建一个后台作业到 运行 15 分钟捕获所做的更新,通过 ActionCable(webhooks)每 15 秒更新一次界面。

问题:

ActiveJob 卡在“3 remaining records”,没有注意到任何数据库更改。如果直接调用模型(在作业之外),一切都会按预期进行;注意到数据库更改,客户端界面得到更新,大家都很高兴。

但是一旦从作业中调用它(到 运行 后台任务),它就会卡住。


调用模型逻辑的作业:

class BatchWatchJob < ApplicationJob
    queue_as :default

    def perform(**args)
        if args[:batch_number]
            p "=== Begin - BatchWatch for batch_number: #{args[:batch_number]} ==="
            begin
                # Set 15 minute timeout
                # ReImport.watch_batch() not updating 
                # it's count when DB data changes :/
                #   BUT if I call the method directly (not from this Job), it works
                Timeout.timeout(900) { ReImport.watch_batch(args[:batch_number]) }
            rescue Timeout::Error
                puts '=== TIMEOUT ERROR ==='
                puts 'BatchWatch job failed to finish in the requisite amount of time (15 minutes)'
            end
        else
            p "=== Unable to start BatchWatch, batch_number not specified ==="
        end
    end
end

包含我的逻辑的模型:

class ReImport < ApplicationRecord

        [...]

        def self.watch_batch(num)
                batch = ReImport.where(batch: num)
                total = batch.count
                remaining = batch.where(completed: false) # NOTE: starts with 3 remaining
                remaining_count = remaining.count
                p "initial remaining: #{remaining_count}"
                while !remaining.empty? do
                        p 'wait 10 seconds...'

                        sleep(10) # wait 10 seconds

                        p '...seconds finished...'
                        # batch.reload # doesn't work
                        # remaining.reload # doesn't work
                        batch = ReImport.where(batch: num)
                        remaining = batch.where(completed: false) # NOTE: this should now be 2
                        p "remaining_count: #{remaining_count}"
                        p "remaining . count: #{remaining.count}"
                        p "(remaining_count > remaining.count): #{(remaining_count > remaining.count)}"
                        if remaining_count > remaining.count
                                p '=== WATCH_BATCH: update found! ==='
                                # Update count
                                remaining_count = remaining.count

                                # Broadcast DataTables to update records
                                ReImportBatchChannel.broadcast_to("re_import_batch_#{num}", {update: true})
                        else
                                p '=== WATCH_BATCH: nothing changed yet ==='
                        end 
                end 
                p '=== WATCH_BATCH COMPLETED (or timeout reached) ==='
        end

        [...]

end

Rails 控制台 - 输出:

 > BatchWatchJob.perform_later(batch_number: 7)

Enqueued BatchWatchJob (Job ID: 7d1beeaf-c4d8-4489-885c-13b44d1037cf) to Async(default) with arguments: {:batch_number=>7}

 => #<BatchWatchJob:0x000000055c2100 @arguments=[{:batch_number=>7}], @job_id="7d1beeaf-c4d8-4489-885c-13b44d1037cf", @queue_name="default", @priority=nil, @executions=0, @provider_job_id="011e004c-34f5-4c7a-9925-052df1aa1774"> 

Performing BatchWatchJob (Job ID: 7d1beeaf-c4d8-4489-885c-13b44d1037cf) from Async(default) with arguments: {:batch_number=>7}

"=== Begin - BatchWatch for batch_number: 7 ==="
     (2.1ms)  SET  @@SESSION.sql_mode = CONCAT(CONCAT(@@sql_mode, ',STRICT_ALL_TABLES'), ',NO_AUTO_VALUE_ON_ZERO'),  @@SESSION.sql_auto_is_null = 0, @@SESSION.wait_timeout = 2147483
     (0.9ms)  SELECT COUNT(*) FROM `re_imports` WHERE `re_imports`.`batch` = 7
     (0.7ms)  SELECT COUNT(*) FROM `re_imports` WHERE `re_imports`.`batch` = 7 AND `re_imports`.`completed` = 0
"initial remaining: 3"
    ReImport Exists (0.5ms)  SELECT  1 AS one FROM `re_imports` WHERE `re_imports`.`batch` = 7 AND `re_imports`.`completed` = 0 LIMIT 1
"wait 10 seconds..."
"...seconds finished..."

"remaining_count: 3"
    CACHE  (0.0ms)  SELECT COUNT(*) FROM `re_imports` WHERE `re_imports`.`batch` = 7 AND `re_imports`.`completed` = 0  [["batch", 7], ["completed", 0]]
"remaining . count: 3"
    CACHE  (0.0ms)  SELECT COUNT(*) FROM `re_imports` WHERE `re_imports`.`batch` = 7 AND `re_imports`.`completed` = 0  [["batch", 7], ["completed", 0]]
"(remaining_count > remaining.count): false"
    CACHE  (0.0ms)  SELECT COUNT(*) FROM `re_imports` WHERE `re_imports`.`batch` = 7 AND `re_imports`.`completed` = 0  [["batch", 7], ["completed", 0]]
"=== WATCH_BATCH: nothing changed yet ==="
    CACHE ReImport Exists (0.0ms)  SELECT  1 AS one FROM `re_imports` WHERE `re_imports`.`batch` = 7 AND `re_imports`.`completed` = 0 LIMIT 1  [["batch", 7], ["completed", 0], ["LIMIT", 1]]

"wait 10 seconds..."
"...seconds finished..."
"remaining_count: 3"
    CACHE  (0.0ms)  SELECT COUNT(*) FROM `re_imports` WHERE `re_imports`.`batch` = 7 AND `re_imports`.`completed` = 0  [["batch", 7], ["completed", 0]]
"remaining . count: 3"
    CACHE  (0.0ms)  SELECT COUNT(*) FROM `re_imports` WHERE `re_imports`.`batch` = 7 AND `re_imports`.`completed` = 0  [["batch", 7], ["completed", 0]]
"(remaining_count > remaining.count): false"
    CACHE  (0.0ms)  SELECT COUNT(*) FROM `re_imports` WHERE `re_imports`.`batch` = 7 AND `re_imports`.`completed` = 0  [["batch", 7], ["completed", 0]]
"=== WATCH_BATCH: nothing changed yet ==="
    CACHE ReImport Exists (0.0ms)  SELECT  1 AS one FROM `re_imports` WHERE `re_imports`.`batch` = 7 AND `re_imports`.`completed` = 0 LIMIT 1  [["batch", 7], ["completed", 0], ["LIMIT", 1]]

"wait 10 seconds..."
[...]
Performed BatchWatchJob (Job ID: 7d1beeaf-c4d8-4489-885c-13b44d1037cf) from Async(default) in 22863.44ms

我不只是注意到 CACHE ReImport Exists,还在想办法关闭它...

我发现问题是 SQL 缓存 ,并通过在 while 循环中使用 ActiveRecords 未缓存方法 ReImport.uncached do 修复了它 像这样:

型号

class ReImport < ApplicationRecord

    [...]

    def self.watch_batch(num)
        batch = ReImport.where(batch: num)
        total = batch.count
        remaining = batch.where(completed: false)
        remaining_count = remaining.count
        p "initial remaining: #{remaining_count}"
        while !remaining.empty? do
            ReImport.uncached do # <--- UNCACHE SQL QUERIES
                p 'wait 10 seconds...'

                sleep(10) # wait 10 seconds

                p '...seconds finished...'
                batch = ReImport.where(batch: num)
                remaining = batch.where(completed: false)
                p "remaining_count: #{remaining_count}"
                p "remaining . count: #{remaining.count}"
                p "(remaining_count > remaining.count): #{(remaining_count > remaining.count)}"
                if remaining_count > remaining.count
                    p '=== WATCH_BATCH: update found! ==='
                    # Update count
                    remaining_count = remaining.count

                    # Broadcast DataTables to update records
                    ReImportBatchChannel.broadcast_to("re_import_batch_#{num}", {update: true})
                else
                    p '=== WATCH_BATCH: nothing changed yet ==='
                end
            end
        end
        p '=== WATCH_BATCH COMPLETED (or timeout reached) ==='
    end

    [...]

end

Source