Live video on the AT Protocol
at eli/postgres 74 lines 1.4 kB view raw
1package bus 2 3import ( 4 "sync" 5) 6 7type Message any 8type Subscription chan Message 9 10// Bus is a simple pub/sub system for backing websocket connections 11type Bus struct { 12 mu sync.Mutex 13 clients map[string][]Subscription 14 segChans map[string][]*SegChan 15 segChansMutex sync.Mutex 16 segBuf map[string][]*Seg 17 segBufMutex sync.RWMutex 18} 19 20func NewBus() *Bus { 21 return &Bus{ 22 clients: make(map[string][]Subscription), 23 segChans: make(map[string][]*SegChan), 24 segBuf: make(map[string][]*Seg), 25 } 26} 27 28func (b *Bus) Subscribe(user string) <-chan Message { 29 if b == nil { 30 return make(<-chan Message) 31 } 32 b.mu.Lock() 33 defer b.mu.Unlock() 34 ch := make(chan Message, 100) 35 b.clients[user] = append(b.clients[user], ch) 36 return ch 37} 38 39func (b *Bus) Unsubscribe(user string, ch <-chan Message) { 40 if b == nil { 41 return 42 } 43 b.mu.Lock() 44 defer b.mu.Unlock() 45 46 subs, ok := b.clients[user] 47 if !ok { 48 return 49 } 50 51 for i, sub := range subs { 52 if sub == ch { 53 // Remove the subscription by replacing it with the last element 54 // and then truncating the slice 55 subs[i] = subs[len(subs)-1] 56 b.clients[user] = subs[:len(subs)-1] 57 break 58 } 59 } 60} 61 62func (b *Bus) Publish(user string, msg Message) { 63 b.mu.Lock() 64 defer b.mu.Unlock() 65 subs, ok := b.clients[user] 66 if !ok { 67 return 68 } 69 for _, sub := range subs { 70 go func(sub Subscription) { 71 sub <- msg 72 }(sub) 73 } 74}