正确关闭运行无限循环的 Go 例程
Properly close a Go routine which runs an infinite loop
我有一个 go 例程,它基本上充当 KafkaConsumer
,它从主题中读取消息,然后为它收到的每条消息生成另一个 go routine
。现在这个 Consumer go routine
应该在 main go routine
应用程序关闭时关闭。但是我在正确关闭它时遇到困难。
下面是Kafka Consumer
定义
package svc
import (
"event-service/pkg/pb"
"fmt"
"github.com/gogo/protobuf/proto"
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
"log"
"os"
"sync"
)
type EventConsumer func(event eventService.Event)
type KafkaConsumer struct {
done chan bool
eventChannels []string
consumer *kafka.Consumer
consumerMapping map[string]EventConsumer
wg *sync.WaitGroup
}
func getKafkaConsumerConfigMap(config map[string]interface{}) *kafka.ConfigMap {
configMap := &kafka.ConfigMap{}
for key, value := range config {
err := configMap.SetKey(key, value)
if err != nil {
log.Println(fmt.Sprintf("An error %v occurred while setting %v: %v", err, key, value))
}
}
return configMap
}
func NewKafkaConsumer(channels []string, config map[string]interface{}, consumerMapping map[string]EventConsumer) *KafkaConsumer {
var wg sync.WaitGroup
consumer, err := kafka.NewConsumer(getKafkaConsumerConfigMap(config))
done := make(chan bool, 1)
if err != nil {
log.Fatalf("An error %v occurred while starting kafka consumer.", err)
}
err = consumer.SubscribeTopics(channels, nil)
if err != nil {
log.Fatalf("An error %v occurred while subscribing to kafka topics %v.", err, channels)
}
return &KafkaConsumer{eventChannels: channels, done: done, wg: &wg, consumer: consumer, consumerMapping: consumerMapping}
}
func (kc *KafkaConsumer) getEvent(eventData []byte) *eventService.Event {
event := eventService.Event{}
err := proto.Unmarshal(eventData, &event)
if err != nil {
log.Println(fmt.Sprintf("An error %v occurred while un marshalling data from kafka.", err))
}
return &event
}
func (kc *KafkaConsumer) Consume() {
go func() {
run := true
for run == true {
select {
case sig := <-kc.done:
log.Println(fmt.Sprintf("Caught signal %v: terminating \n", sig))
run = false
return
default:
}
e := <-kc.consumer.Events()
switch event := e.(type) {
case kafka.AssignedPartitions:
_, _ = fmt.Fprintf(os.Stderr, "%% %v\n", event)
err := kc.consumer.Assign(event.Partitions)
if err != nil {
log.Println(fmt.Sprintf("An error %v occurred while assigning partitions.", err))
}
case kafka.RevokedPartitions:
_, _ = fmt.Fprintf(os.Stderr, "%% %v\n", event)
err := kc.consumer.Unassign()
if err != nil {
log.Println(fmt.Sprintf("An error %v occurred while unassigning partitions.", err))
}
case *kafka.Message:
domainEvent := kc.getEvent(event.Value)
kc.wg.Add(1)
go func(event *eventService.Event) {
defer kc.wg.Done()
if eventConsumer := kc.consumerMapping[domainEvent.EntityType]; eventConsumer != nil {
eventConsumer(*domainEvent)
} else {
log.Println(fmt.Sprintf("Event consumer not found for %v event type", domainEvent.EntityType))
}
}(domainEvent)
case kafka.PartitionEOF:
fmt.Printf("%% Reached %v\n", e)
case kafka.Error:
_, _ = fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
}
}
}()
}
func (kc *KafkaConsumer) Close() {
log.Println("Waiting")
kc.wg.Wait()
kc.done <- true
log.Println("Done waiting")
err := kc.consumer.Close()
if err != nil {
log.Println(fmt.Sprintf("An error %v occurred while closing kafka consumer.", err))
}
}
下面是主线程代码
package main
import (
"event-service/pkg/pb"
"event-service/pkg/svc"
"fmt"
"log"
)
func main() {
eventConsumerMapping := map[string]svc.EventConsumer{"doctor-created": func(event eventService.Event) {
log.Println(fmt.Sprintf("Got event %v from kafka", event))
}}
consumerConfig := map[string]interface{}{
"bootstrap.servers": "localhost:9092",
"group.id": "catalog",
"go.events.channel.enable": true,
"go.application.rebalance.enable": true,
"enable.partition.eof": true,
"auto.offset.reset": "earliest",
}
kafkaConsumer := svc.NewKafkaConsumer([]string{"doctor-created"}, consumerConfig, eventConsumerMapping)
kafkaConsumer.Consume()
kafkaConsumer.Close()
}
这里的问题是应用程序有时根本没有结束,并且在某些运行中它没有执行 consume
函数,我在这里缺少什么?
好的,解决方案在这里,
1. 因为只要 go 主例程还活着,consumer go routine 就应该存在,而且 go 主例程也是一个无尽的 go 例程,当 go routine 是 运行 时关闭 consumer go routine,是不是正确的做法。
所以下面的解决方案有效
package main
import (
"event-service/pkg/pb"
"event-service/pkg/svc"
"fmt"
"log"
"sync"
)
func main() {
eventConsumerMapping := map[string]svc.EventConsumer{"doctor-created": func(event eventService.Event) {
log.Println(fmt.Sprintf("Got event %v from kafka", event))
}}
consumerConfig := map[string]interface{}{
"bootstrap.servers": "localhost:9092",
"group.id": "catalog-2",
"session.timeout.ms": 6000,
"go.events.channel.enable": true,
"go.application.rebalance.enable": true,
"enable.partition.eof": true,
"auto.offset.reset": "earliest",
}
var wg sync.WaitGroup
kafkaConsumer := svc.NewKafkaConsumer([]string{"doctor-created"}, consumerConfig, eventConsumerMapping)
kafkaConsumer.Consume(&wg)
wg.Wait()
kafkaConsumer.Close()
}
服务定义
package svc
import (
"event-service/pkg/pb"
"fmt"
"github.com/gogo/protobuf/proto"
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
"log"
"os"
"os/signal"
"sync"
"syscall"
)
type EventConsumer func(event eventService.Event)
type KafkaConsumer struct {
done chan bool
consumer *kafka.Consumer
consumerMapping map[string]EventConsumer
sigChan chan os.Signal
channels []string
}
func getKafkaConsumerConfigMap(config map[string]interface{}) *kafka.ConfigMap {
configMap := &kafka.ConfigMap{}
for key, value := range config {
err := configMap.SetKey(key, value)
if err != nil {
log.Println(fmt.Sprintf("An error %v occurred while setting %v: %v", err, key, value))
}
}
return configMap
}
func NewKafkaConsumer(channels []string, config map[string]interface{}, consumerMapping map[string]EventConsumer) *KafkaConsumer {
sigChan := make(chan os.Signal, 1)
consumer, err := kafka.NewConsumer(getKafkaConsumerConfigMap(config))
done := make(chan bool, 1)
if err != nil {
log.Fatalf("An error %v occurred while starting kafka consumer.", err)
}
err = consumer.SubscribeTopics(channels, nil)
if err != nil {
log.Fatalf("An error %v occurred while subscribing to kafka topics %v.", err, channels)
}
return &KafkaConsumer{channels: channels, sigChan: sigChan, done: done, consumer: consumer, consumerMapping: consumerMapping}
}
func (kc *KafkaConsumer) getEvent(eventData []byte) *eventService.Event {
event := eventService.Event{}
err := proto.Unmarshal(eventData, &event)
if err != nil {
log.Println(fmt.Sprintf("An error %v occurred while un marshalling data from kafka.", err))
}
return &event
}
func (kc *KafkaConsumer) Consume(wg *sync.WaitGroup) {
signal.Notify(kc.sigChan, syscall.SIGINT, syscall.SIGTERM)
wg.Add(1)
go func() {
run := true
defer wg.Done()
for run == true {
select {
case sig := <-kc.sigChan:
fmt.Printf("Caught signal %v: terminating\n", sig)
run = false
case ev := <-kc.consumer.Events():
switch e := ev.(type) {
case kafka.AssignedPartitions:
_, _ = fmt.Fprintf(os.Stderr, "%% %v\n", e)
_ = kc.consumer.Assign(e.Partitions)
case kafka.RevokedPartitions:
_, _ = fmt.Fprintf(os.Stderr, "%% %v\n", e)
_ = kc.consumer.Unassign()
case *kafka.Message:
domainEvent := kc.getEvent(e.Value)
wg.Add(1)
go func(event *eventService.Event) {
defer wg.Done()
if eventConsumer := kc.consumerMapping[domainEvent.EntityType]; eventConsumer != nil {
eventConsumer(*domainEvent)
} else {
log.Println(fmt.Sprintf("Event consumer not found for %v event type", domainEvent.EntityType))
}
}(domainEvent)
case kafka.PartitionEOF:
fmt.Printf("%% Reached %v\n", e)
case kafka.Error:
// Errors should generally be considered as informational, the client will try to automatically recover
_, _ = fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
}
}
}
}()
}
func (kc *KafkaConsumer) Close() {
err := kc.consumer.Close()
if err != nil {
log.Println(fmt.Sprintf("An error %v occurred while closing kafka consumer.", err))
}
}
我有一个 go 例程,它基本上充当 KafkaConsumer
,它从主题中读取消息,然后为它收到的每条消息生成另一个 go routine
。现在这个 Consumer go routine
应该在 main go routine
应用程序关闭时关闭。但是我在正确关闭它时遇到困难。
下面是Kafka Consumer
定义
package svc
import (
"event-service/pkg/pb"
"fmt"
"github.com/gogo/protobuf/proto"
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
"log"
"os"
"sync"
)
type EventConsumer func(event eventService.Event)
type KafkaConsumer struct {
done chan bool
eventChannels []string
consumer *kafka.Consumer
consumerMapping map[string]EventConsumer
wg *sync.WaitGroup
}
func getKafkaConsumerConfigMap(config map[string]interface{}) *kafka.ConfigMap {
configMap := &kafka.ConfigMap{}
for key, value := range config {
err := configMap.SetKey(key, value)
if err != nil {
log.Println(fmt.Sprintf("An error %v occurred while setting %v: %v", err, key, value))
}
}
return configMap
}
func NewKafkaConsumer(channels []string, config map[string]interface{}, consumerMapping map[string]EventConsumer) *KafkaConsumer {
var wg sync.WaitGroup
consumer, err := kafka.NewConsumer(getKafkaConsumerConfigMap(config))
done := make(chan bool, 1)
if err != nil {
log.Fatalf("An error %v occurred while starting kafka consumer.", err)
}
err = consumer.SubscribeTopics(channels, nil)
if err != nil {
log.Fatalf("An error %v occurred while subscribing to kafka topics %v.", err, channels)
}
return &KafkaConsumer{eventChannels: channels, done: done, wg: &wg, consumer: consumer, consumerMapping: consumerMapping}
}
func (kc *KafkaConsumer) getEvent(eventData []byte) *eventService.Event {
event := eventService.Event{}
err := proto.Unmarshal(eventData, &event)
if err != nil {
log.Println(fmt.Sprintf("An error %v occurred while un marshalling data from kafka.", err))
}
return &event
}
func (kc *KafkaConsumer) Consume() {
go func() {
run := true
for run == true {
select {
case sig := <-kc.done:
log.Println(fmt.Sprintf("Caught signal %v: terminating \n", sig))
run = false
return
default:
}
e := <-kc.consumer.Events()
switch event := e.(type) {
case kafka.AssignedPartitions:
_, _ = fmt.Fprintf(os.Stderr, "%% %v\n", event)
err := kc.consumer.Assign(event.Partitions)
if err != nil {
log.Println(fmt.Sprintf("An error %v occurred while assigning partitions.", err))
}
case kafka.RevokedPartitions:
_, _ = fmt.Fprintf(os.Stderr, "%% %v\n", event)
err := kc.consumer.Unassign()
if err != nil {
log.Println(fmt.Sprintf("An error %v occurred while unassigning partitions.", err))
}
case *kafka.Message:
domainEvent := kc.getEvent(event.Value)
kc.wg.Add(1)
go func(event *eventService.Event) {
defer kc.wg.Done()
if eventConsumer := kc.consumerMapping[domainEvent.EntityType]; eventConsumer != nil {
eventConsumer(*domainEvent)
} else {
log.Println(fmt.Sprintf("Event consumer not found for %v event type", domainEvent.EntityType))
}
}(domainEvent)
case kafka.PartitionEOF:
fmt.Printf("%% Reached %v\n", e)
case kafka.Error:
_, _ = fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
}
}
}()
}
func (kc *KafkaConsumer) Close() {
log.Println("Waiting")
kc.wg.Wait()
kc.done <- true
log.Println("Done waiting")
err := kc.consumer.Close()
if err != nil {
log.Println(fmt.Sprintf("An error %v occurred while closing kafka consumer.", err))
}
}
下面是主线程代码
package main
import (
"event-service/pkg/pb"
"event-service/pkg/svc"
"fmt"
"log"
)
func main() {
eventConsumerMapping := map[string]svc.EventConsumer{"doctor-created": func(event eventService.Event) {
log.Println(fmt.Sprintf("Got event %v from kafka", event))
}}
consumerConfig := map[string]interface{}{
"bootstrap.servers": "localhost:9092",
"group.id": "catalog",
"go.events.channel.enable": true,
"go.application.rebalance.enable": true,
"enable.partition.eof": true,
"auto.offset.reset": "earliest",
}
kafkaConsumer := svc.NewKafkaConsumer([]string{"doctor-created"}, consumerConfig, eventConsumerMapping)
kafkaConsumer.Consume()
kafkaConsumer.Close()
}
这里的问题是应用程序有时根本没有结束,并且在某些运行中它没有执行 consume
函数,我在这里缺少什么?
好的,解决方案在这里, 1. 因为只要 go 主例程还活着,consumer go routine 就应该存在,而且 go 主例程也是一个无尽的 go 例程,当 go routine 是 运行 时关闭 consumer go routine,是不是正确的做法。
所以下面的解决方案有效
package main
import (
"event-service/pkg/pb"
"event-service/pkg/svc"
"fmt"
"log"
"sync"
)
func main() {
eventConsumerMapping := map[string]svc.EventConsumer{"doctor-created": func(event eventService.Event) {
log.Println(fmt.Sprintf("Got event %v from kafka", event))
}}
consumerConfig := map[string]interface{}{
"bootstrap.servers": "localhost:9092",
"group.id": "catalog-2",
"session.timeout.ms": 6000,
"go.events.channel.enable": true,
"go.application.rebalance.enable": true,
"enable.partition.eof": true,
"auto.offset.reset": "earliest",
}
var wg sync.WaitGroup
kafkaConsumer := svc.NewKafkaConsumer([]string{"doctor-created"}, consumerConfig, eventConsumerMapping)
kafkaConsumer.Consume(&wg)
wg.Wait()
kafkaConsumer.Close()
}
服务定义
package svc
import (
"event-service/pkg/pb"
"fmt"
"github.com/gogo/protobuf/proto"
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
"log"
"os"
"os/signal"
"sync"
"syscall"
)
type EventConsumer func(event eventService.Event)
type KafkaConsumer struct {
done chan bool
consumer *kafka.Consumer
consumerMapping map[string]EventConsumer
sigChan chan os.Signal
channels []string
}
func getKafkaConsumerConfigMap(config map[string]interface{}) *kafka.ConfigMap {
configMap := &kafka.ConfigMap{}
for key, value := range config {
err := configMap.SetKey(key, value)
if err != nil {
log.Println(fmt.Sprintf("An error %v occurred while setting %v: %v", err, key, value))
}
}
return configMap
}
func NewKafkaConsumer(channels []string, config map[string]interface{}, consumerMapping map[string]EventConsumer) *KafkaConsumer {
sigChan := make(chan os.Signal, 1)
consumer, err := kafka.NewConsumer(getKafkaConsumerConfigMap(config))
done := make(chan bool, 1)
if err != nil {
log.Fatalf("An error %v occurred while starting kafka consumer.", err)
}
err = consumer.SubscribeTopics(channels, nil)
if err != nil {
log.Fatalf("An error %v occurred while subscribing to kafka topics %v.", err, channels)
}
return &KafkaConsumer{channels: channels, sigChan: sigChan, done: done, consumer: consumer, consumerMapping: consumerMapping}
}
func (kc *KafkaConsumer) getEvent(eventData []byte) *eventService.Event {
event := eventService.Event{}
err := proto.Unmarshal(eventData, &event)
if err != nil {
log.Println(fmt.Sprintf("An error %v occurred while un marshalling data from kafka.", err))
}
return &event
}
func (kc *KafkaConsumer) Consume(wg *sync.WaitGroup) {
signal.Notify(kc.sigChan, syscall.SIGINT, syscall.SIGTERM)
wg.Add(1)
go func() {
run := true
defer wg.Done()
for run == true {
select {
case sig := <-kc.sigChan:
fmt.Printf("Caught signal %v: terminating\n", sig)
run = false
case ev := <-kc.consumer.Events():
switch e := ev.(type) {
case kafka.AssignedPartitions:
_, _ = fmt.Fprintf(os.Stderr, "%% %v\n", e)
_ = kc.consumer.Assign(e.Partitions)
case kafka.RevokedPartitions:
_, _ = fmt.Fprintf(os.Stderr, "%% %v\n", e)
_ = kc.consumer.Unassign()
case *kafka.Message:
domainEvent := kc.getEvent(e.Value)
wg.Add(1)
go func(event *eventService.Event) {
defer wg.Done()
if eventConsumer := kc.consumerMapping[domainEvent.EntityType]; eventConsumer != nil {
eventConsumer(*domainEvent)
} else {
log.Println(fmt.Sprintf("Event consumer not found for %v event type", domainEvent.EntityType))
}
}(domainEvent)
case kafka.PartitionEOF:
fmt.Printf("%% Reached %v\n", e)
case kafka.Error:
// Errors should generally be considered as informational, the client will try to automatically recover
_, _ = fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
}
}
}
}()
}
func (kc *KafkaConsumer) Close() {
err := kc.consumer.Close()
if err != nil {
log.Println(fmt.Sprintf("An error %v occurred while closing kafka consumer.", err))
}
}