为什么这个 websocket 服务器不广播到连接的客户端?

Why this websocket server does not broadcast to connected client?

下面的代码是一个 websocket 广播服务器,它从一个特定的连接中读取,然后将它广播到连接的客户端。

但是尽管没有错误和警告,但此服务器不广播。 为什么这个服务器不广播?

在此代码中 self.KabucomConn 是原始套接字,因此从该套接字读取,然后广播到存储在 Hub.RClients 中的客户端。

当建立新的连接时,将带有注册通道的连接对象传递给 Hub,然后 Hub 添加一个客户端到 RClient 存储的客户端对象。

package main

import (
    "log"
    "net/http"
    "net/url"

    "github.com/gorilla/websocket"
)

var upgrader = websocket.Upgrader{
    CheckOrigin: func(r *http.Request) bool {
        return true
    },
}

type Client struct {
    hub  *Hub
    Conn *websocket.Conn
    Send chan []byte
}

func (self *Client) writepump() {
    for {
        select {
        case message := <-self.Send:
            w, err := self.Conn.NextWriter(websocket.TextMessage)
            if err != nil {
                log.Print("writepump: nextwriter error")
            }
            w.Write(message)
            w.Close()
        }
    }
}

type Hub struct {
    RClients    map[*Client]bool
    KabucomConn *websocket.Conn
    register    chan *Client
    unregister  chan *Client
    kabucomchan chan []byte
    url         url.URL
}

func (self *Hub) KabucomRun() {
    for {
        _, b, err := self.KabucomConn.ReadMessage() // read data from origin data connection
        log.Println("read message")
        if err != nil {
            log.Println(err)
            self.KabucomConn.Close()
            for i := 1; i <= 5; i++ { //retry re-connect up to 5 times
                self.KabucomConn, _, err = websocket.DefaultDialer.Dial(self.url.String(), nil)
                if i >= 5 && err != nil {
                    log.Fatal(err)
                } else if err != nil {
                    log.Println(err, "try", i)
                    continue
                } else {
                    break
                }
            }
            log.Println("conti")
            continue
        }
        log.Println(b)
        self.kabucomchan <- b
    }
}

func (self *Hub) Run() {
    defer func() {
        for c, _ := range self.RClients {
            close(c.Send)
            c.Conn.Close()
        }
    }()
    for {
        select {
        case message := <-self.kabucomchan:
            log.Println("kabucomchan")
            log.Println(message)
            for c, _ := range self.RClients {
                c.Send <- message
            }
        case c := <-self.register:
            log.Println("reg")
            self.RClients[c] = true

        case c := <-self.unregister:
            log.Println("unreg")
            delete(self.RClients, c)
            close(c.Send)
        }
    }
}

func newHub() *Hub {
    u := url.URL{Scheme: "ws", Host: "192.168.1.8:20063", Path: "/ws"}
    var conn *websocket.Conn
    for i := 1; i <= 5; i++ {
        d, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
        if err != nil && i < 5 {
            log.Println(err)
            continue
        } else if i >= 5 {
            log.Println("Hub: Kabucom connection error")
        }
        conn = d
        break
    }
    return &Hub{RClients: make(map[*Client]bool), register: make(chan *Client), KabucomConn: conn}
}

func handler(w http.ResponseWriter, r *http.Request, hub *Hub) {
    conn, err := upgrader.Upgrade(w, r, nil)
    if err != nil {
        return
    }
    client := &Client{Conn: conn, hub: hub, Send: make(chan []byte, 256)}
    go client.writepump()
    hub.register <- client
}

func main() {
    hub := newHub()
    go hub.Run()
    go hub.KabucomRun()
    http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
        handler(w, r, hub)
    })
    log.Println(":20021/ws")
    http.ListenAndServe(":20021", nil)
}
  

因为你没有初始化Hub.kabucomchan

func newHub() *Hub {
    //...
    return &Hub{RClients: make(map[*Client]bool), register: make(chan *Client), KabucomConn: conn, /* kabucomchan ??? */}
}

通道上的发送和接收操作假定发送方 c<- 和接收方 <-c 都持有对同一通道的引用,但是当通道为 nil 时,此引用不会存在,发送和接收永远阻塞。

Hub构造函数中正确初始化通道:

return &Hub{
   RClients: make(map[*Client]bool), 
   register: make(chan *Client), 
   KabucomConn: conn,
   kabucomchan: make(chan []byte, /* buffered? */), // <--- !
}