Live video on the AT Protocol
1package bus
2
3import (
4 "sync"
5)
6
7type Message any
8type Subscription chan Message
9
10// Bus is a simple pub/sub system for backing websocket connections
11type Bus struct {
12 mu sync.Mutex
13 clients map[string][]Subscription
14 segChans map[string][]*SegChan
15 segChansMutex sync.Mutex
16 segBuf map[string][]*Seg
17 segBufMutex sync.RWMutex
18}
19
20func NewBus() *Bus {
21 return &Bus{
22 clients: make(map[string][]Subscription),
23 segChans: make(map[string][]*SegChan),
24 segBuf: make(map[string][]*Seg),
25 }
26}
27
28func (b *Bus) Subscribe(user string) <-chan Message {
29 if b == nil {
30 return make(<-chan Message)
31 }
32 b.mu.Lock()
33 defer b.mu.Unlock()
34 ch := make(chan Message, 100)
35 b.clients[user] = append(b.clients[user], ch)
36 return ch
37}
38
39func (b *Bus) Unsubscribe(user string, ch <-chan Message) {
40 if b == nil {
41 return
42 }
43 b.mu.Lock()
44 defer b.mu.Unlock()
45
46 subs, ok := b.clients[user]
47 if !ok {
48 return
49 }
50
51 for i, sub := range subs {
52 if sub == ch {
53 // Remove the subscription by replacing it with the last element
54 // and then truncating the slice
55 subs[i] = subs[len(subs)-1]
56 b.clients[user] = subs[:len(subs)-1]
57 break
58 }
59 }
60}
61
62func (b *Bus) Publish(user string, msg Message) {
63 b.mu.Lock()
64 defer b.mu.Unlock()
65 subs, ok := b.clients[user]
66 if !ok {
67 return
68 }
69 for _, sub := range subs {
70 go func(sub Subscription) {
71 sub <- msg
72 }(sub)
73 }
74}