合并频道中的项目
Coalescing items in channel
我有一个函数可以接收任务并将它们放入一个频道。每个任务都有 ID、一些属性和一个放置结果的通道。看起来像这样
task.Result = make(chan *TaskResult)
queue <- task
result := <-task.Result
sendReponse(result)
另一个goroutine从通道中取出一个任务,处理它并将结果放入任务的通道
task := <-queue
task.Result <- doExpensiveComputation(task)
这段代码工作正常。但现在我想合并 queue
中的任务。任务处理是一个非常昂贵的操作,所以我想处理一次队列中具有相同 ID 的所有任务。我看到两种方法。
第一个是不要将具有相同ID的任务放入队列,所以当现有任务到达时,它会等待它的副本完成。这是伪代码
if newTask in queue {
existing := queue.getById(newTask.ID)
existing.waitForComplete()
sendResponse(existing.ProcessingResult)
} else {
queue.enqueue(newTask)
}
所以,我可以使用 go channel 和 map 进行随机访问 + 一些同步方式(如互斥锁)来实现它。我不喜欢这种方式的地方是我必须在代码中携带地图和通道并保持它们的内容同步。
第二种方式是将所有任务放入队列,但是当结果到达时,从队列中提取任务和所有具有相同ID的任务,然后将结果发送给所有任务。这是伪代码
someTask := queue.dequeue()
result := doExpensiveComputation(someTask)
someTask.Result <- result
moreTasks := queue.getAllWithID(someTask.ID)
for _,theSameTask := range moreTasks {
theSameTask.Result <- result
}
而且我知道如何以与上述相同的方式使用 chan + map + mutex 来实现它。
问题是:是否有一些 builtin/existing 数据结构可用于此类问题?还有其他(更好的)方法吗?
如果我对问题的理解正确,我想到的最简单的解决方案是在任务发送者(放入 queue
)和工作人员(从 queue
获取)之间添加一个中间层。这可能是例行公事,负责存储当前任务(按 ID)并将结果广播给每个匹配的任务。
伪代码:
go func() {
active := make(map[TaskID][]Task)
for {
select {
case task := <-queue:
tasks := active[task.ID]
// No tasks with such ID, start heavy work
if len(tasks) == 0 {
worker <- task
}
// Save task for the result
active[task.ID] = append(active[task.ID], task)
case r := <-response:
// Broadcast to all tasks
for _, task := range active[r.ID] {
task.Result <- r.Result
}
}
}
}()
不需要互斥体,也可能不需要携带任何东西,工作人员只需将所有结果放入这个中间层,然后正确路由响应。如果冲突 ID 有可能在一段时间内到达,您甚至可以轻松地在此处添加缓存。
编辑: 我梦见上面的代码导致了死锁。如果您一次发送大量请求并阻塞 worker
通道,则会出现一个严重的问题——这个中间层例程卡在 worker <- task
等待工作人员完成,但所有工作人员都可能被阻止发送到响应通道(因为我们的例程无法收集它)。 Playable proof.
人们可以考虑在通道中添加一些缓冲区,但这不是一个合适的解决方案(除非您可以将系统设计成缓冲区永远不会填满的方式)。有几种方法可以解决这个问题;例如,您可以 运行 一个单独的例程来收集响应,但是您需要使用互斥锁来保护 active
映射。可行的。您还可以将 worker <- task
放入 select,它会尝试将任务发送给工作人员、接收新任务(如果没有要发送的任务)或收集响应。人们可以利用 nil 通道永远不会准备好进行通信这一事实(被 select 忽略),因此您可以在单个 select 中交替接收和发送任务。示例:
go func() {
var next Task // received task which needs to be passed to a worker
in := queue // incoming channel (new tasks) -- active
var out chan Task // outgoing channel (to workers) -- inactive
for {
select {
case t := <-in:
next = t // store task, so we can pass to worker
in, out = nil, worker // deactivate incoming channel, activate outgoing
case out <- next:
in, out = queue, nil // deactivate outgoing channel, activate incoming
case r := <-response:
collect <- r
}
}
}()
我有一个函数可以接收任务并将它们放入一个频道。每个任务都有 ID、一些属性和一个放置结果的通道。看起来像这样
task.Result = make(chan *TaskResult)
queue <- task
result := <-task.Result
sendReponse(result)
另一个goroutine从通道中取出一个任务,处理它并将结果放入任务的通道
task := <-queue
task.Result <- doExpensiveComputation(task)
这段代码工作正常。但现在我想合并 queue
中的任务。任务处理是一个非常昂贵的操作,所以我想处理一次队列中具有相同 ID 的所有任务。我看到两种方法。
第一个是不要将具有相同ID的任务放入队列,所以当现有任务到达时,它会等待它的副本完成。这是伪代码
if newTask in queue {
existing := queue.getById(newTask.ID)
existing.waitForComplete()
sendResponse(existing.ProcessingResult)
} else {
queue.enqueue(newTask)
}
所以,我可以使用 go channel 和 map 进行随机访问 + 一些同步方式(如互斥锁)来实现它。我不喜欢这种方式的地方是我必须在代码中携带地图和通道并保持它们的内容同步。
第二种方式是将所有任务放入队列,但是当结果到达时,从队列中提取任务和所有具有相同ID的任务,然后将结果发送给所有任务。这是伪代码
someTask := queue.dequeue()
result := doExpensiveComputation(someTask)
someTask.Result <- result
moreTasks := queue.getAllWithID(someTask.ID)
for _,theSameTask := range moreTasks {
theSameTask.Result <- result
}
而且我知道如何以与上述相同的方式使用 chan + map + mutex 来实现它。
问题是:是否有一些 builtin/existing 数据结构可用于此类问题?还有其他(更好的)方法吗?
如果我对问题的理解正确,我想到的最简单的解决方案是在任务发送者(放入 queue
)和工作人员(从 queue
获取)之间添加一个中间层。这可能是例行公事,负责存储当前任务(按 ID)并将结果广播给每个匹配的任务。
伪代码:
go func() {
active := make(map[TaskID][]Task)
for {
select {
case task := <-queue:
tasks := active[task.ID]
// No tasks with such ID, start heavy work
if len(tasks) == 0 {
worker <- task
}
// Save task for the result
active[task.ID] = append(active[task.ID], task)
case r := <-response:
// Broadcast to all tasks
for _, task := range active[r.ID] {
task.Result <- r.Result
}
}
}
}()
不需要互斥体,也可能不需要携带任何东西,工作人员只需将所有结果放入这个中间层,然后正确路由响应。如果冲突 ID 有可能在一段时间内到达,您甚至可以轻松地在此处添加缓存。
编辑: 我梦见上面的代码导致了死锁。如果您一次发送大量请求并阻塞 worker
通道,则会出现一个严重的问题——这个中间层例程卡在 worker <- task
等待工作人员完成,但所有工作人员都可能被阻止发送到响应通道(因为我们的例程无法收集它)。 Playable proof.
人们可以考虑在通道中添加一些缓冲区,但这不是一个合适的解决方案(除非您可以将系统设计成缓冲区永远不会填满的方式)。有几种方法可以解决这个问题;例如,您可以 运行 一个单独的例程来收集响应,但是您需要使用互斥锁来保护 active
映射。可行的。您还可以将 worker <- task
放入 select,它会尝试将任务发送给工作人员、接收新任务(如果没有要发送的任务)或收集响应。人们可以利用 nil 通道永远不会准备好进行通信这一事实(被 select 忽略),因此您可以在单个 select 中交替接收和发送任务。示例:
go func() {
var next Task // received task which needs to be passed to a worker
in := queue // incoming channel (new tasks) -- active
var out chan Task // outgoing channel (to workers) -- inactive
for {
select {
case t := <-in:
next = t // store task, so we can pass to worker
in, out = nil, worker // deactivate incoming channel, activate outgoing
case out <- next:
in, out = queue, nil // deactivate outgoing channel, activate incoming
case r := <-response:
collect <- r
}
}
}()