使用 Sidekiq 和 Sidekiq-Cron 在 Rails 中获取主键错误

Getting a Primary Key error in Rails using Sidekiq and Sidekiq-Cron

我有一个 Rails 项目,它使用 Sidekiq 来处理工作任务,并使用 Sidekiq-Cron 来处理调度。不过,我 运行 遇到了问题。我构建了一个控制器(如下)来处理我所有的 API 查询、数据验证,然后将数据插入数据库。所有逻辑都正常运行。

然后我撕掉了实际将 API 数据插入数据库的代码部分,并将其移动到作业 class 中。这样 Controller 方法就可以简单地将所有繁重的工作传递给作业。当我测试它时,所有的逻辑都正常工作。

最后,我创建了一个每分钟调用 Controller 方法的作业,进行验证检查,然后启动另一个作业以保存 API 数据(如有必要)。当我这样做时,逻辑的第一部分似乎起作用了,它插入了新的事件数据,但是它检查这是否是我们第一次看到特定对象的事件的逻辑似乎失败了。结果是 PG 中的主键违规。

代码如下:

控制器

require 'date'

class MonnitOpenClosedSensorsController < ApplicationController

    def holderTester()
        #MonnitschedulerJob.perform_later(nil)
    end

    # Create Sidekiq queue to process new sensor readings
    def queueNewSensorEvents(auth_token, network_id)

        m = Monnit.new("iMonnit", 1)

        # Construct the query to select the most recent communication date for each sensor in the network
        lastEventForEachSensor = MonnitOpenClosedSensor.select('"SensorID", MAX("LastCommunicationDate") as "lastCommDate"')
        lastEventForEachSensor = lastEventForEachSensor.group("SensorID")
        lastEventForEachSensor = lastEventForEachSensor.where('"CSNetID" = ?', network_id)

        todaysDate = Date.today
        sevenDaysAgo = (todaysDate - 7)

        lastEventForEachSensor.each do |event|
            # puts event["lastCommDate"]
            recentEvent = MonnitOpenClosedSensor.select('id, "SensorID", "LastCommunicationDate"')
            recentEvent = recentEvent.where('"CSNetID" = ? AND "SensorID" = ? AND "LastCommunicationDate" = ?', network_id, event["SensorID"], event["lastCommDate"])

            recentEvent.each do |recent|
                message = m.get_extended_sensor(auth_token, recent["SensorID"])
                if message["LastDataMessageMessageGUID"] != recent["id"]
                    MonnitopenclosedsensorJob.perform_later(auth_token, network_id, message["SensorID"])
                    # puts "hi inner"
                    # puts message["LastDataMessageMessageGUID"]
                    # puts recent['id']
                    # puts recent["SensorID"]
                    # puts message["SensorID"]
                    # raise message
                end
            end
        end

        # Queue up any Sensor Events for new sensors
        # This would be sensors we've never seen before, from a Postgres standpoint
        sensors = m.get_sensor_ids(auth_token)
        sensors.each do |sensor|
            sensorCheck = MonnitOpenClosedSensor.select(:SensorID)
            # sensorCheck = MonnitOpenClosedSensor.select(:SensorID)
            sensorCheck = sensorCheck.group(:SensorID)
            sensorCheck = sensorCheck.where('"CSNetID" = ? AND "SensorID" = ?', network_id, sensor)
            # sensorCheck = sensorCheck.where('id = "?"', sensor["LastDataMessageMessageGUID"])

            if sensorCheck.any? == false
                MonnitopenclosedsensorJob.perform_later(auth_token, network_id, sensor) 
            end
        end

    end

end

以上代码中断了新传感器的传感器事件。它没有识别出传感器已经存在,第一个问题,然后没有识别出它试图创建的事件已经保存到数据库中(使用 GUID 进行比较)。

持久化数据的作业

class MonnitopenclosedsensorJob < ApplicationJob
  queue_as :default

  def perform(auth_token, network_id, sensor)
    m = Monnit.new("iMonnit", 1)
    newSensor = m.get_extended_sensor(auth_token, sensor)

    sensorRecord = MonnitOpenClosedSensor.new
    sensorRecord.SensorID = newSensor['SensorID']
    sensorRecord.MonnitApplicationID = newSensor['MonnitApplicationID']
    sensorRecord.CSNetID = newSensor['CSNetID']

    lastCommunicationDatePretty = newSensor['LastCommunicationDate'].scan(/[0-9]+/)[0].to_i / 1000.0
    nextCommunicationDatePretty = newSensor['NextCommunicationDate'].scan(/[0-9]+/)[0].to_i / 1000.0
    sensorRecord.LastCommunicationDate = Time.at(lastCommunicationDatePretty)
    sensorRecord.NextCommunicationDate = Time.at(nextCommunicationDatePretty)

    sensorRecord.id = newSensor['LastDataMessageMessageGUID']
    sensorRecord.PowerSourceID = newSensor['PowerSourceID']
    sensorRecord.Status = newSensor['Status']
    sensorRecord.CanUpdate = newSensor['CanUpdate'] == "true" ? 1 : 0
    sensorRecord.ReportInterval = newSensor['ReportInterval']
    sensorRecord.MinimumThreshold = newSensor['MinimumThreshold']
    sensorRecord.MaximumThreshold = newSensor['MaximumThreshold']
    sensorRecord.Hysteresis = newSensor['Hysteresis']
    sensorRecord.Tag = newSensor['Tag']
    sensorRecord.ActiveStateInterval = newSensor['ActiveStateInterval']
    sensorRecord.CurrentReading = newSensor['CurrentReading']
    sensorRecord.BatteryLevel = newSensor['BatteryLevel']
    sensorRecord.SignalStrength = newSensor['SignalStrength']
    sensorRecord.AlertsActive = newSensor['AlertsActive']
    sensorRecord.AccountID = newSensor['AccountID']
    sensorRecord.CreatedOn = Time.now.getutc
    sensorRecord.CreatedBy = "Monnit Open Closed Sensor Job"
    sensorRecord.LastModifiedOn = Time.now.getutc
    sensorRecord.LastModifiedBy = "Monnit Open Closed Sensor Job"

    sensorRecord.save

    sensorRecord = nil
  end
end

作业每分钟调用控制器

class MonnitschedulerJob < ApplicationJob
  queue_as :default

  def perform(*args)
    m = Monnit.new("iMonnit", 1)
    getImonnitUsers = ImonnitCredential.select('"auth_token", "username", "password"')
    getImonnitUsers.each do |user|
        # puts user["auth_token"]
        # puts user["username"]
        # puts user["password"]

        if user["auth_token"] != nil
            m.logon(user["auth_token"])
        else
            auth_token = m.get_auth_token(user["username"], user["password"])
            auth_token = auth_token["Result"]
        end

        network_list = m.get_network_list(auth_token)
        network_list.each do |network|
            # puts network["NetworkID"]
            MonnitOpenClosedSensorsController.new.queueNewSensorEvents(auth_token, network["NetworkID"])
        end
    end
  end
end

抱歉 post 的长度。我试图包含尽可能多的有关所涉及代码的信息。

编辑

这是扩展传感器的代码,以及 JSON 响应:

def get_extended_sensor(auth_token, sensor_id)
        response = self.class.get("/json/SensorGetExtended/#{auth_token}?SensorID=#{sensor_id}")

        if response['Result'] != "Invalid Authorization Token"
            response['Result']
        else
            response['Result']
        end
    end


{
    "Method": "SensorGetExtended",
    "Result": {
        "ReportInterval": 180,
        "ActiveStateInterval": 180,
        "InactivityAlert": 365,
        "MeasurementsPerTransmission": 1,
        "MinimumThreshold": 4294967295,
        "MaximumThreshold": 4294967295,
        "Hysteresis": 0,
        "Tag": "",
        "SensorID": 189092,
        "MonnitApplicationID": 9,
        "CSNetID": 24391,
        "SensorName": "Open / Closed - 189092",
        "LastCommunicationDate": "/Date(1500999632000)/",
        "NextCommunicationDate": "/Date(1501010432000)/",
        "LastDataMessageMessageGUID": "d474b3db-d843-40ba-8e0e-8c4726b61ec2",
        "PowerSourceID": 1,
        "Status": 0,
        "CanUpdate": true,
        "CurrentReading": "Open",
        "BatteryLevel": 100,
        "SignalStrength": 84,
        "AlertsActive": true,
        "CheckDigit": "QOLP",
        "AccountID": 14728
    }
}

一些想法:

recentEvent = MonnitOpenClosedSensor.select('id, "SensorID", "LastCommunicationDate"') - 

这不是在做任何订购;您假定您在此处检索到的记录是最新记录。

m = Monnit.new("iMonnit", 1)
newSensor = m.get_extended_sensor(auth_token, sensor)

如果没有 get_extended_sensor 的实现细节,就不可能告诉你如何

sensorRecord.id = newSensor['LastDataMessageMessageGUID']

正在解决。

您很可能收到了重复的消息。使用输入数据作为主键几乎不是一个好主意——而是在您的工作中自动生成一个 GUID,将其用作主键,然后使用 LastDataMessageMessageGUID 作为相关 ID。

所以我 运行 遇到的问题如下所示:

  1. 从 API 中提取了一个传感器事件,并在 Sidekiq 中作为辅助作业排队。
  2. 如果队列 运行 有点慢,API 速度或者只是有很多作业要处理,1 分钟的轮询可能会再次命中并拉下相同的传感器事件并将其排队向上。
  3. 随着队列的处理,传感器事件被插入到数据库中,其 GUID 是主键
  4. 随着队列继续追赶自身,它遇到了被安排为次要作业的同一事件。然后这项工作失败了。

我的解决方案是将我的 "does this SensorID and GUID exist in the database" 转移到实际工作中。因此,当作业 运行 时,它要做的第一件事就是再次检查记录是否已经存在。这意味着我要检查两次,但这种快速检查的开销很小。

仍然存在检查可能发生并通过的风险,而另一个作业正在插入记录,然后再将其提交到数据库,然后它可能会失败。但是重试会捕获它,然后在第二轮检查未验证时将其清除为成功过程。然而,话虽如此,检查发生在 API 数据被提取之后。因为从理论上讲,数据库从 API 数据中持久化单个记录会发生得非常快(比 API 调用发生的速度快得多),它确实降低了你必须命中的机会重试任何工作....我的意思是,与第二次检查失败并触发重试相比,你中奖的机会更大。

如果其他人有更好或更干净的解决方案,请随时将其作为辅助答案!