如何同步不断的写入和周期性的读取更新

How to synchronize constant writing and periodically reading and updating

定义问题:

我们有这个物联网设备,每个设备都会向我们发送有关汽车位置的日志。我们想计算汽车在线行驶的距离!因此,每当日志到来时(将其放入队列等之后),我们都会这样做:

type Delta struct {
    DeviceId string
    time     int64
    Distance float64
}
var LastLogs = make(map[string]FullLog)
var Distances = make(map[string]Delta)


func addLastLog(l FullLog) {
    LastLogs[l.DeviceID] = l
}
func AddToLogPerDay(l FullLog) {
    //mutex.Lock()
    if val, ok := LastLogs[l.DeviceID]; ok {
        if distance, exist := Distances[l.DeviceID]; exist {
            x := computingDistance(val, l)
            Distances[l.DeviceID] = Delta{
                DeviceId: l.DeviceID,
                time:     distance.time + 1,
                Distance: distance.Distance + x,
            }
        } else {
            Distances[l.DeviceID] = Delta{
                DeviceId: l.DeviceID,
                time:     1,
                Distance: 0,
            }
        }
    }
    addLastLog(l)

}

这基本上是使用效用函数计算距离!所以在 Distances 中每个设备 ID 都映射到一定的行驶距离!现在这里是问题开始的地方:虽然这个距离被添加到 Distances map,但我想要一个 go 例程将这个数据放入数据库但是因为有很多设备和很多日志等等对每个日志执行这个查询这不是一个好主意。所以我需要每 5 秒执行一次,这意味着每 5 秒尝试清空添加到地图的所有最后距离的列表。我写了这个函数:

func UpdateLogPerDayTable() {
    for {
        for _, distance := range Distances {
            logs := model.HourPerDay{}
            result := services.CarDBProvider.DB.Table(model.HourPerDay{}.TableName()).
                Where("created_at >?  AND device_id = ?", getCurrentData(), distance.DeviceId).
                Find(&logs)
            if result.Error != nil && !result.RecordNotFound() {
                log.Infof("Something went wrong while checking the log: %v", result.Error)
            } else {
                if !result.RecordNotFound() {
                    logs.CountDistance = distance.Distance

                    logs.CountSecond = distance.time

                    err := services.CarDBProvider.DB.Model(&logs).
                        Update(map[string]interface{}{
                            "count_second":   logs.CountSecond,
                            "count_distance": logs.CountDistance,
                        })
                    if err.Error != nil {
                        log.Infof("Something went wrong while updating the log: %v", err.Error)
                    }

                } else if result.RecordNotFound() {
                    dayLog := model.HourPerDay{
                        Model:         gorm.Model{},
                        DeviceId:      distance.DeviceId,
                        CountSecond:   int64(distance.time),
                        CountDistance: distance.Distance,
                    }
                    err := services.CarDBProvider.DB.Create(&dayLog)
                    if err.Error != nil {
                        log.Infof("Something went wrong while adding the log: %v", err.Error)
                    }
                }
            }
        }
        time.Sleep(time.Second * 5)
    }
}

它在另一个 go 例程中被调用 go utlis.UpdateLogPerDayTable()。但是这里有很多问题:

  1. 我不知道如何保护 Distances 所以当我将它添加到另一个例程中时,我在其他地方阅读它,一切正常!(问题是我想使用 go channels 而不是不知道该怎么做)
  2. 如何在 go 中安排任务来解决这个问题?
  3. 可能我会添加一个 redis 来存储所有在线或在线设备,这样我就可以更快地执行 select 查询并只更新实际的数据库。还为 redis 添加了一个过期时间,所以如果设备在一段时间内没有发送和数据,它就会消失!我应该把这段代码放在哪里?

抱歉,如果我的解释还不够,但我真的需要一些帮助。专门用于代码实现

Go 在多个渠道上使用 for / select 有一个非常酷的模式。这允许您使用超时和最大记录大小来批处理距离写入。使用此模式需要使用通道。

首先是将您的距离建模为通道:

distances := make(chan Delta)

然后您跟踪当前批次

var deltas []Delta

然后

ticker := time.NewTicker(time.Second * 5)

var deltas []Delta

for {
  select {
     case <-ticker.C:
        // 5 seconds up flush to db
        // reset deltas
     case d := <-distances:
        deltas = append(deltas, d)
        if len(deltas) >= maxDeltasPerFlush {
           // flush
           // reset deltas
        }
  }
}

I don't know how to secure Distances so when I add it in another routine I read it somewhere else ,every thing is ok!(The problem is that I want to use go channels and don't have any idea how to do it)

如果您打算保留地图并共享内存,您需要使用 mutual exclusion (mutex) to synchronize access between go routines. Using a channel allows you to send a copy to a channel, removing the need for synchronizing across the Delta Object. Depending on your architecture you could also create a pipeline of go routines connected by channels, which could make it so only a single go routine (monitor go routine 来保护它)正在访问 Delta,也不需要同步。

How can I schedule tasks in go for this problem?

使用通道作为将 Deltas 传递给不同 go 例程的原语:)

Probably I will add a redis to store all the devices that or online so I could do the select query faster and just update the actual database. also add an expire time for redis so if a device didn't send and data for some time, it vanishes! where should I put this code?

这取决于您完成的架构。你可以为 select 操作写一个 decorator ,它会先检查 redis 然后去数据库。此函数的客户端不必知道这一点。写操作可以以相同的方式完成:写入持久存储,然后使用缓存值和过期时间写回 Redis。使用装饰器,客户端不需要知道这一点,他们只需执行读取和写入,缓存逻辑将在装饰器内部实现。有很多方法可以做到这一点,这在很大程度上取决于您的实施位置。