在我的代码中安全完成 goroutines 的正确方法是什么?

What is the right way to safely finish goroutines in my code?

我正在写一个简单的 tcp 服务器,goroutine 模型非常简单:

一个goroutine负责接受新的连接;对于每个新连接,启动三个 goroutines :

  1. 一个已读
  2. 一个用于处理和处理应用程序逻辑
  3. 一个用于写入

目前一台服务器将服务不超过 1000 个用户,所以我不尝试限制 goroutine 数量。

for {
    conn, err := listener.Accept()
    // ....
    connHandler := connHandler{
        conn:      conn,
        done:      make(chan struct{}),
        readChan:  make(chan string, 100),
        writeChan: make(chan string, 100),
    }
    // ....
    go connHandler.readAll()    
    go connHandler.processAll() 
    go connHandler.writeAll()   
}

我使用done通道通知所有三个通道完成,当用户注销或发生永久性网络错误时,done通道将关闭(使用sync.Once使确保关闭只发生一次):

func (connHandler *connHandler) Close() {
    connHandler.doOnce.Do(func() {
        connHandler.isClosed = true
        close(connHandler.done)
    })
}

下面是writeAll()方法的代码:

func (connHandler *connHandler) writeAll() {
    writer := bufio.NewWriter(connHandler.conn)

    for {
        select {
        case <-connHandler.done:
            connHandler.conn.Close()
            return
        case msg := <-connHandler.writeChan:
            connHandler.writeOne(msg, writer)
        }
    }
}

有一种 Send 方法可以通过向写入通道发送字符串来向用户发送消息:

func (connHandler *connHandler) Send(msg string) {
    case connHandler.writeChan <- msg:
}

Send方法主要会在processAll() goroutine中调用,但也会在许多其他goroutine中调用,因为不同的用户需要相互通信。

现在的问题是:如果用户 A 退出或网络失败,用户 B 向用户 A 发送消息,用户 B 的 goroutine 可能会被永久阻塞,因为没有人会从该频道收到消息。

我的解决方案:

我的第一个想法是使用布尔值来确保 connHanler 在发送给它时没有关闭:

func (connHandler *connHandler) Send(msg string) {
    if !connHandler.isClosed {
        connHandler.writeChan <- msg
    }
}

不过我觉得connHandler.writeChan <- msgclose(done)还是可以同时发生的,阻塞的可能性还是存在的。所以我必须添加一个超时:

func (connHandler *connHandler) Send(msg string) {
    if !connHandler.isClosed {
        timer := time.NewTimer(10 * time.Second)
        defer timer.Stop()
        select {
        case connHandler.writeChan <- msg:
        case <-timer.C:
            log.Warning(connHandler.Addr() + " send msg timeout:" + msg)
        }
    }

}

现在感觉代码很安全,但是也很丑,而且每次发送消息都要启动一个定时器,感觉是一个不必要的开销。

然后我看了这篇文章:https://go101.org/article/channel-closing.html,我的问题看起来像文章中的第二个例子:

One receiver, N senders, the receiver says "please stop sending more" by closing an additional signal channel

但我认为这种解决方案不能消除我的情况下阻塞的可能性。

也许最简单的解决方案是关闭写入通道并让 Send 方法崩溃,然后使用 recover 来处理崩溃?但这看起来也很丑陋。

那么有没有一种简单直接的方法来完成我想做的事情呢?

(本人英语不好,如有不明白之处,请指出,谢谢。)

你的例子看起来很不错,我认为你已经得到了你需要的 90%。

我认为您看到的问题是发送,而实际上您可能 "done"。

您可以使用 "done" 渠道通知 所有 您已完成的 go 例程。您将始终能够从关闭的通道中读取一个值(它将是零值)。这意味着您可以更新 Send(msg) 方法以考虑已完成的频道。

func (connHandler *connHandler) Send(msg string) {
    select {
    case connHandler.writeChan <- msg:
    case <- connHandler.done:
        log.Debug("connHandler is done, exiting Send without sending.")
    case <-time.After(10 * time.Second):
        log.Warning(connHandler.Addr() + " send msg timeout:" + msg)
    }
}

现在 select 会发生以下情况之一:

  1. 消息发送到writeChan
  2. close(done)已在别处调用,done chan已关闭。您将能够从 done 开始阅读,打破 select.
  3. time.After(...) 将达到 10 秒,您将能够使发送超时。