去没有互斥体的 PubSub?
Go PubSub without mutexes?
我将在网站后端实施通知系统,每次页面访问都会为用户订阅页面上显示的一些数据,当系统发生变化时,他会收到通知。例如,有人正在查看包含新闻文章的页面,当发布新文章时,我想通知用户,以便他可以通过 js 或重新加载页面来获取这些新文章。手动或自动。
为了做到这一点,我将以 pub/sub 的方式使用频道。因此,例如将有一个“新闻”频道。创建新文章时,该频道将收到该文章的 ID。当用户打开一个页面并订阅“新闻”频道(可能通过 websocket)时,必须有这个新闻频道的订阅者列表,他将被添加为订阅者以得到通知。
类似于:
type Channel struct {
ingres <-chan int // news article id
subs [] chan<- int
mx sync.Mutex
}
其中每一个都会有一个 goroutine,它将分配进入 subs 列表的内容。
现在我正在研究的问题(可能是过早的优化)是会有很多频道和很多来来往往的订阅者。这意味着会有很多带有互斥体的世界停止事件。例如,如果有 10,000 个在线用户订阅新闻频道,goroutine 将不得不发送 10k 通知,同时 subs 切片将被锁定,因此新订阅者将不得不等待互斥体解锁。现在将其乘以 100 个频道,我认为我们遇到了问题。
所以我正在寻找一种方法来添加和删除订阅者,而不会阻止其他订阅者被添加或删除,或者可能只是为了在可接受的时间内全面接收通知。
“等待所有 sub 接收”的问题可以通过 goroutine 为每个 sub 超时解决,因此在收到 id 后,将创建 10k goroutine 并且可以立即解锁 mutex。但是,它仍然可以添加多个频道。
根据链接的评论,我想出了这段代码:
package notif
import (
"context"
"sync"
"time"
"unsafe"
)
type Client struct {
recv chan interface{}
ch *Channel
o sync.Once
ctx context.Context
cancel context.CancelFunc
}
// will be nil if this client is write-only
func (c *Client) Listen() <-chan interface{} {
return c.recv
}
func (c *Client) Close() {
select {
case <-c.ctx.Done():
case c.ch.unsubscribe <- c:
}
}
func (c *Client) Done() <-chan struct{} {
return c.ctx.Done()
}
func (c *Client) doClose() {
c.o.Do(func() {
c.cancel()
if c.recv != nil {
close(c.recv)
}
})
}
func (c *Client) send(msg interface{}) {
// write-only clients will not handle any messages
if c.recv == nil {
return
}
t := time.NewTimer(c.ch.sc)
select {
case <-c.ctx.Done():
case c.recv <- msg:
case <-t.C:
// time out/slow consumer, close the connection
c.Close()
}
}
func (c *Client) Broadcast(payload interface{}) bool {
select {
case <-c.ctx.Done():
return false
default:
c.ch.Broadcast() <- &envelope{Message: payload, Sender: uintptr(unsafe.Pointer(c))}
return true
}
}
type envelope struct {
Message interface{}
Sender uintptr
}
// leech is channel-blocking so goroutine should be called internally to make it non-blocking
// this is to ensure proper order of leeched messages.
func NewChannel(ctx context.Context, name string, slowConsumer time.Duration, emptyCh chan string, leech func(interface{})) *Channel {
return &Channel{
name: name,
ingres: make(chan interface{}, 1000),
subscribe: make(chan *Client, 1000),
unsubscribe: make(chan *Client, 1000),
aud: make(map[*Client]struct{}, 1000),
ctx: ctx,
sc: slowConsumer,
empty: emptyCh,
leech: leech,
}
}
type Channel struct {
name string
ingres chan interface{}
subscribe chan *Client
unsubscribe chan *Client
aud map[*Client]struct{}
ctx context.Context
sc time.Duration
empty chan string
leech func(interface{})
}
func (ch *Channel) Id() string {
return ch.name
}
// subscription is read-write by default. by providing "writeOnly=true", it can be switched into write-only mode
// in which case the client will not be disconnected for being slow reader.
func (ch *Channel) Subscribe(writeOnly ...bool) *Client {
c := &Client{
ch: ch,
}
if len(writeOnly) == 0 || writeOnly[0] == false {
c.recv = make(chan interface{})
}
c.ctx, c.cancel = context.WithCancel(ch.ctx)
ch.subscribe <- c
return c
}
func (ch *Channel) Broadcast() chan<- interface{} {
return ch.ingres
}
// returns once context is cancelled
func (ch *Channel) Start() {
for {
select {
case <-ch.ctx.Done():
for cl := range ch.aud {
delete(ch.aud, cl)
cl.doClose()
}
return
case cl := <-ch.subscribe:
ch.aud[cl] = struct{}{}
case cl := <-ch.unsubscribe:
delete(ch.aud, cl)
cl.doClose()
if len(ch.aud) == 0 {
ch.signalEmpty()
}
case msg := <-ch.ingres:
e, ok := msg.(*envelope)
if ok {
msg = e.Message
}
for cl := range ch.aud {
if ok == false || uintptr(unsafe.Pointer(cl)) != e.Sender {
go cl.send(e.Message)
}
}
if ch.leech != nil {
ch.leech(msg)
}
}
}
}
func (ch *Channel) signalEmpty() {
if ch.empty == nil {
return
}
select {
case ch.empty <- ch.name:
default:
}
}
type subscribeRequest struct {
name string
recv chan *Client
wo bool
}
type broadcastRequest struct {
name string
recv chan *Channel
}
type brokeredChannel struct {
ch *Channel
cancel context.CancelFunc
}
type brokerLeech interface {
Match(string) func(interface{})
}
func NewBroker(ctx context.Context, slowConsumer time.Duration, leech brokerLeech) *Broker {
return &Broker{
chans: make(map[string]*brokeredChannel, 100),
subscribe: make(chan *subscribeRequest, 10),
broadcast: make(chan *broadcastRequest, 10),
ctx: ctx,
sc: slowConsumer,
empty: make(chan string, 10),
leech: leech,
}
}
type Broker struct {
chans map[string]*brokeredChannel
subscribe chan *subscribeRequest
broadcast chan *broadcastRequest
ctx context.Context
sc time.Duration
empty chan string
leech brokerLeech
}
// returns once context is cancelled
func (b *Broker) Start() {
for {
select {
case <-b.ctx.Done():
return
case req := <-b.subscribe:
ch, ok := b.chans[req.name]
if ok == false {
ctx, cancel := context.WithCancel(b.ctx)
var l func(interface{})
if b.leech != nil {
l = b.leech.Match(req.name)
}
ch = &brokeredChannel{
ch: NewChannel(ctx, req.name, b.sc, b.empty, l),
cancel: cancel,
}
b.chans[req.name] = ch
go ch.ch.Start()
}
req.recv <- ch.ch.Subscribe(req.wo)
case req := <-b.broadcast:
ch, ok := b.chans[req.name]
if ok == false {
ctx, cancel := context.WithCancel(b.ctx)
var l func(interface{})
if b.leech != nil {
l = b.leech.Match(req.name)
}
ch = &brokeredChannel{
ch: NewChannel(ctx, req.name, b.sc, b.empty, l),
cancel: cancel,
}
b.chans[req.name] = ch
go ch.ch.Start()
}
req.recv <- ch.ch
case name := <-b.empty:
if ch, ok := b.chans[name]; ok {
ch.cancel()
delete(b.chans, name)
}
}
}
}
// subscription is read-write by default. by providing "writeOnly=true", it can be switched into write-only mode
// in which case the client will not be disconnected for being slow reader.
func (b *Broker) Subscribe(name string, writeOnly ...bool) *Client {
req := &subscribeRequest{
name: name,
recv: make(chan *Client),
wo: len(writeOnly) > 0 && writeOnly[0] == true,
}
b.subscribe <- req
c := <-req.recv
close(req.recv)
return c
}
func (b *Broker) Broadcast(name string) chan<- interface{} {
req := &broadcastRequest{name: name, recv: make(chan *Channel)}
b.broadcast <- req
ch := <-req.recv
close(req.recv)
return ch.ingres
}
我将在网站后端实施通知系统,每次页面访问都会为用户订阅页面上显示的一些数据,当系统发生变化时,他会收到通知。例如,有人正在查看包含新闻文章的页面,当发布新文章时,我想通知用户,以便他可以通过 js 或重新加载页面来获取这些新文章。手动或自动。
为了做到这一点,我将以 pub/sub 的方式使用频道。因此,例如将有一个“新闻”频道。创建新文章时,该频道将收到该文章的 ID。当用户打开一个页面并订阅“新闻”频道(可能通过 websocket)时,必须有这个新闻频道的订阅者列表,他将被添加为订阅者以得到通知。
类似于:
type Channel struct {
ingres <-chan int // news article id
subs [] chan<- int
mx sync.Mutex
}
其中每一个都会有一个 goroutine,它将分配进入 subs 列表的内容。
现在我正在研究的问题(可能是过早的优化)是会有很多频道和很多来来往往的订阅者。这意味着会有很多带有互斥体的世界停止事件。例如,如果有 10,000 个在线用户订阅新闻频道,goroutine 将不得不发送 10k 通知,同时 subs 切片将被锁定,因此新订阅者将不得不等待互斥体解锁。现在将其乘以 100 个频道,我认为我们遇到了问题。
所以我正在寻找一种方法来添加和删除订阅者,而不会阻止其他订阅者被添加或删除,或者可能只是为了在可接受的时间内全面接收通知。
“等待所有 sub 接收”的问题可以通过 goroutine 为每个 sub 超时解决,因此在收到 id 后,将创建 10k goroutine 并且可以立即解锁 mutex。但是,它仍然可以添加多个频道。
根据链接的评论,我想出了这段代码:
package notif
import (
"context"
"sync"
"time"
"unsafe"
)
type Client struct {
recv chan interface{}
ch *Channel
o sync.Once
ctx context.Context
cancel context.CancelFunc
}
// will be nil if this client is write-only
func (c *Client) Listen() <-chan interface{} {
return c.recv
}
func (c *Client) Close() {
select {
case <-c.ctx.Done():
case c.ch.unsubscribe <- c:
}
}
func (c *Client) Done() <-chan struct{} {
return c.ctx.Done()
}
func (c *Client) doClose() {
c.o.Do(func() {
c.cancel()
if c.recv != nil {
close(c.recv)
}
})
}
func (c *Client) send(msg interface{}) {
// write-only clients will not handle any messages
if c.recv == nil {
return
}
t := time.NewTimer(c.ch.sc)
select {
case <-c.ctx.Done():
case c.recv <- msg:
case <-t.C:
// time out/slow consumer, close the connection
c.Close()
}
}
func (c *Client) Broadcast(payload interface{}) bool {
select {
case <-c.ctx.Done():
return false
default:
c.ch.Broadcast() <- &envelope{Message: payload, Sender: uintptr(unsafe.Pointer(c))}
return true
}
}
type envelope struct {
Message interface{}
Sender uintptr
}
// leech is channel-blocking so goroutine should be called internally to make it non-blocking
// this is to ensure proper order of leeched messages.
func NewChannel(ctx context.Context, name string, slowConsumer time.Duration, emptyCh chan string, leech func(interface{})) *Channel {
return &Channel{
name: name,
ingres: make(chan interface{}, 1000),
subscribe: make(chan *Client, 1000),
unsubscribe: make(chan *Client, 1000),
aud: make(map[*Client]struct{}, 1000),
ctx: ctx,
sc: slowConsumer,
empty: emptyCh,
leech: leech,
}
}
type Channel struct {
name string
ingres chan interface{}
subscribe chan *Client
unsubscribe chan *Client
aud map[*Client]struct{}
ctx context.Context
sc time.Duration
empty chan string
leech func(interface{})
}
func (ch *Channel) Id() string {
return ch.name
}
// subscription is read-write by default. by providing "writeOnly=true", it can be switched into write-only mode
// in which case the client will not be disconnected for being slow reader.
func (ch *Channel) Subscribe(writeOnly ...bool) *Client {
c := &Client{
ch: ch,
}
if len(writeOnly) == 0 || writeOnly[0] == false {
c.recv = make(chan interface{})
}
c.ctx, c.cancel = context.WithCancel(ch.ctx)
ch.subscribe <- c
return c
}
func (ch *Channel) Broadcast() chan<- interface{} {
return ch.ingres
}
// returns once context is cancelled
func (ch *Channel) Start() {
for {
select {
case <-ch.ctx.Done():
for cl := range ch.aud {
delete(ch.aud, cl)
cl.doClose()
}
return
case cl := <-ch.subscribe:
ch.aud[cl] = struct{}{}
case cl := <-ch.unsubscribe:
delete(ch.aud, cl)
cl.doClose()
if len(ch.aud) == 0 {
ch.signalEmpty()
}
case msg := <-ch.ingres:
e, ok := msg.(*envelope)
if ok {
msg = e.Message
}
for cl := range ch.aud {
if ok == false || uintptr(unsafe.Pointer(cl)) != e.Sender {
go cl.send(e.Message)
}
}
if ch.leech != nil {
ch.leech(msg)
}
}
}
}
func (ch *Channel) signalEmpty() {
if ch.empty == nil {
return
}
select {
case ch.empty <- ch.name:
default:
}
}
type subscribeRequest struct {
name string
recv chan *Client
wo bool
}
type broadcastRequest struct {
name string
recv chan *Channel
}
type brokeredChannel struct {
ch *Channel
cancel context.CancelFunc
}
type brokerLeech interface {
Match(string) func(interface{})
}
func NewBroker(ctx context.Context, slowConsumer time.Duration, leech brokerLeech) *Broker {
return &Broker{
chans: make(map[string]*brokeredChannel, 100),
subscribe: make(chan *subscribeRequest, 10),
broadcast: make(chan *broadcastRequest, 10),
ctx: ctx,
sc: slowConsumer,
empty: make(chan string, 10),
leech: leech,
}
}
type Broker struct {
chans map[string]*brokeredChannel
subscribe chan *subscribeRequest
broadcast chan *broadcastRequest
ctx context.Context
sc time.Duration
empty chan string
leech brokerLeech
}
// returns once context is cancelled
func (b *Broker) Start() {
for {
select {
case <-b.ctx.Done():
return
case req := <-b.subscribe:
ch, ok := b.chans[req.name]
if ok == false {
ctx, cancel := context.WithCancel(b.ctx)
var l func(interface{})
if b.leech != nil {
l = b.leech.Match(req.name)
}
ch = &brokeredChannel{
ch: NewChannel(ctx, req.name, b.sc, b.empty, l),
cancel: cancel,
}
b.chans[req.name] = ch
go ch.ch.Start()
}
req.recv <- ch.ch.Subscribe(req.wo)
case req := <-b.broadcast:
ch, ok := b.chans[req.name]
if ok == false {
ctx, cancel := context.WithCancel(b.ctx)
var l func(interface{})
if b.leech != nil {
l = b.leech.Match(req.name)
}
ch = &brokeredChannel{
ch: NewChannel(ctx, req.name, b.sc, b.empty, l),
cancel: cancel,
}
b.chans[req.name] = ch
go ch.ch.Start()
}
req.recv <- ch.ch
case name := <-b.empty:
if ch, ok := b.chans[name]; ok {
ch.cancel()
delete(b.chans, name)
}
}
}
}
// subscription is read-write by default. by providing "writeOnly=true", it can be switched into write-only mode
// in which case the client will not be disconnected for being slow reader.
func (b *Broker) Subscribe(name string, writeOnly ...bool) *Client {
req := &subscribeRequest{
name: name,
recv: make(chan *Client),
wo: len(writeOnly) > 0 && writeOnly[0] == true,
}
b.subscribe <- req
c := <-req.recv
close(req.recv)
return c
}
func (b *Broker) Broadcast(name string) chan<- interface{} {
req := &broadcastRequest{name: name, recv: make(chan *Channel)}
b.broadcast <- req
ch := <-req.recv
close(req.recv)
return ch.ingres
}