去没有互斥体的 PubSub?

Go PubSub without mutexes?

我将在网站后端实施通知系统,每次页面访问都会为用户订阅页面上显示的一些数据,当系统发生变化时,他会收到通知。例如,有人正在查看包含新闻文章的页面,当发布新文章时,我想通知用户,以便他可以通过 js 或重新加载页面来获取这些新文章。手动或自动。

为了做到这一点,我将以 pub/sub 的方式使用频道。因此,例如将有一个“新闻”频道。创建新文章时,该频道将收到该文章的 ID。当用户打开一个页面并订阅“新闻”频道(可能通过 websocket)时,必须有这个新闻频道的订阅者列表,他将被添加为订阅者以得到通知。

类似于:

type Channel struct {
  ingres <-chan int // news article id
  subs [] chan<- int
  mx sync.Mutex
}

其中每一个都会有一个 goroutine,它将分配进入 subs 列表的内容。

现在我正在研究的问题(可能是过早的优化)是会有很多频道和很多来来往往的订阅者。这意味着会有很多带有互斥体的世界停止事件。例如,如果有 10,000 个在线用户订阅新闻频道,goroutine 将不得不发送 10k 通知,同时 subs 切片将被锁定,因此新订阅者将不得不等待互斥体解锁。现在将其乘以 100 个频道,我认为我们遇到了问题。

所以我正在寻找一种方法来添加和删除订阅者,而不会阻止其他订阅者被添加或删除,或者可能只是为了在可接受的时间内全面接收通知。

“等待所有 sub 接收”的问题可以通过 goroutine 为每个 sub 超时解决,因此在收到 id 后,将创建 10k goroutine 并且可以立即解锁 mutex。但是,它仍然可以添加多个频道。

根据链接的评论,我想出了这段代码:

package notif

import (
    "context"
    "sync"
    "time"
    "unsafe"
)

type Client struct {
    recv   chan interface{}
    ch     *Channel
    o      sync.Once
    ctx    context.Context
    cancel context.CancelFunc
}

// will be nil if this client is write-only
func (c *Client) Listen() <-chan interface{} {
    return c.recv
}

func (c *Client) Close() {
    select {
    case <-c.ctx.Done():
    case c.ch.unsubscribe <- c:
    }
}

func (c *Client) Done() <-chan struct{} {
    return c.ctx.Done()
}

func (c *Client) doClose() {
    c.o.Do(func() {
        c.cancel()
        if c.recv != nil {
            close(c.recv)
        }
    })
}

func (c *Client) send(msg interface{}) {
    // write-only clients will not handle any messages
    if c.recv == nil {
        return
    }
    t := time.NewTimer(c.ch.sc)
    select {
    case <-c.ctx.Done():
    case c.recv <- msg:
    case <-t.C:
        // time out/slow consumer, close the connection
        c.Close()
    }
}

func (c *Client) Broadcast(payload interface{}) bool {
    select {
    case <-c.ctx.Done():
        return false
    default:
        c.ch.Broadcast() <- &envelope{Message: payload, Sender: uintptr(unsafe.Pointer(c))}
        return true
    }
}

type envelope struct {
    Message interface{}
    Sender  uintptr
}

// leech is channel-blocking so goroutine should be called internally to make it non-blocking
// this is to ensure proper order of leeched messages.
func NewChannel(ctx context.Context, name string, slowConsumer time.Duration, emptyCh chan string, leech func(interface{})) *Channel {
    return &Channel{
        name:        name,
        ingres:      make(chan interface{}, 1000),
        subscribe:   make(chan *Client, 1000),
        unsubscribe: make(chan *Client, 1000),
        aud:         make(map[*Client]struct{}, 1000),
        ctx:         ctx,
        sc:          slowConsumer,
        empty:       emptyCh,
        leech:       leech,
    }
}

type Channel struct {
    name        string
    ingres      chan interface{}
    subscribe   chan *Client
    unsubscribe chan *Client
    aud         map[*Client]struct{}
    ctx         context.Context
    sc          time.Duration
    empty       chan string
    leech       func(interface{})
}

func (ch *Channel) Id() string {
    return ch.name
}

// subscription is read-write by default. by providing "writeOnly=true", it can be switched into write-only mode
// in which case the client will not be disconnected for being slow reader.
func (ch *Channel) Subscribe(writeOnly ...bool) *Client {
    c := &Client{
        ch: ch,
    }
    if len(writeOnly) == 0 || writeOnly[0] == false {
        c.recv = make(chan interface{})
    }
    c.ctx, c.cancel = context.WithCancel(ch.ctx)
    ch.subscribe <- c
    return c
}

func (ch *Channel) Broadcast() chan<- interface{} {
    return ch.ingres
}

// returns once context is cancelled
func (ch *Channel) Start() {
    for {
        select {
        case <-ch.ctx.Done():
            for cl := range ch.aud {
                delete(ch.aud, cl)
                cl.doClose()
            }
            return
        case cl := <-ch.subscribe:
            ch.aud[cl] = struct{}{}

        case cl := <-ch.unsubscribe:
            delete(ch.aud, cl)
            cl.doClose()
            if len(ch.aud) == 0 {
                ch.signalEmpty()
            }

        case msg := <-ch.ingres:
            e, ok := msg.(*envelope)
            if ok {
                msg = e.Message
            }
            for cl := range ch.aud {
                if ok == false || uintptr(unsafe.Pointer(cl)) != e.Sender {
                    go cl.send(e.Message)
                }
            }
            if ch.leech != nil {
                ch.leech(msg)
            }
        }
    }
}

func (ch *Channel) signalEmpty() {
    if ch.empty == nil {
        return
    }

    select {
    case ch.empty <- ch.name:
    default:
    }
}

type subscribeRequest struct {
    name string
    recv chan *Client
    wo   bool
}

type broadcastRequest struct {
    name string
    recv chan *Channel
}

type brokeredChannel struct {
    ch     *Channel
    cancel context.CancelFunc
}

type brokerLeech interface {
    Match(string) func(interface{})
}

func NewBroker(ctx context.Context, slowConsumer time.Duration, leech brokerLeech) *Broker {
    return &Broker{
        chans:     make(map[string]*brokeredChannel, 100),
        subscribe: make(chan *subscribeRequest, 10),
        broadcast: make(chan *broadcastRequest, 10),
        ctx:       ctx,
        sc:        slowConsumer,
        empty:     make(chan string, 10),
        leech:     leech,
    }
}

type Broker struct {
    chans     map[string]*brokeredChannel
    subscribe chan *subscribeRequest
    broadcast chan *broadcastRequest
    ctx       context.Context
    sc        time.Duration
    empty     chan string
    leech     brokerLeech
}

// returns once context is cancelled
func (b *Broker) Start() {
    for {
        select {
        case <-b.ctx.Done():
            return
        case req := <-b.subscribe:
            ch, ok := b.chans[req.name]
            if ok == false {
                ctx, cancel := context.WithCancel(b.ctx)
                var l func(interface{})
                if b.leech != nil {
                    l = b.leech.Match(req.name)
                }
                ch = &brokeredChannel{
                    ch:     NewChannel(ctx, req.name, b.sc, b.empty, l),
                    cancel: cancel,
                }
                b.chans[req.name] = ch
                go ch.ch.Start()
            }
            req.recv <- ch.ch.Subscribe(req.wo)

        case req := <-b.broadcast:
            ch, ok := b.chans[req.name]
            if ok == false {
                ctx, cancel := context.WithCancel(b.ctx)
                var l func(interface{})
                if b.leech != nil {
                    l = b.leech.Match(req.name)
                }
                ch = &brokeredChannel{
                    ch:     NewChannel(ctx, req.name, b.sc, b.empty, l),
                    cancel: cancel,
                }
                b.chans[req.name] = ch
                go ch.ch.Start()
            }
            req.recv <- ch.ch

        case name := <-b.empty:
            if ch, ok := b.chans[name]; ok {
                ch.cancel()
                delete(b.chans, name)
            }
        }
    }
}

// subscription is read-write by default. by providing "writeOnly=true", it can be switched into write-only mode
// in which case the client will not be disconnected for being slow reader.
func (b *Broker) Subscribe(name string, writeOnly ...bool) *Client {
    req := &subscribeRequest{
        name: name,
        recv: make(chan *Client),
        wo:   len(writeOnly) > 0 && writeOnly[0] == true,
    }
    b.subscribe <- req
    c := <-req.recv
    close(req.recv)
    return c
}

func (b *Broker) Broadcast(name string) chan<- interface{} {
    req := &broadcastRequest{name: name, recv: make(chan *Channel)}
    b.broadcast <- req
    ch := <-req.recv
    close(req.recv)
    return ch.ingres
}