Live video on the AT Protocol
1package bus
2
3import (
4 "sync"
5)
6
7type Message any
8type Subscription chan Message
9
10type ViewerCountUpdate struct {
11 Streamer string
12 Count int
13 Origin string
14}
15
16// Bus is a simple pub/sub system for backing websocket connections
17type Bus struct {
18 mu sync.Mutex
19 clients map[string][]Subscription
20 segChans map[string][]*SegChan
21 segChansMutex sync.Mutex
22 segBuf map[string][]*Seg
23 segBufMutex sync.RWMutex
24 viewerCounts map[string]map[string]int
25 viewerCountsMutex sync.RWMutex
26 viewerCountSubscriptions []chan ViewerCountUpdate
27}
28
29func NewBus() *Bus {
30 return &Bus{
31 clients: make(map[string][]Subscription),
32 segChans: make(map[string][]*SegChan),
33 segBuf: make(map[string][]*Seg),
34 viewerCounts: make(map[string]map[string]int),
35 viewerCountSubscriptions: []chan ViewerCountUpdate{},
36 }
37}
38
39func (b *Bus) Subscribe(user string) <-chan Message {
40 if b == nil {
41 return make(<-chan Message)
42 }
43 b.mu.Lock()
44 defer b.mu.Unlock()
45 ch := make(chan Message, 100)
46 b.clients[user] = append(b.clients[user], ch)
47 return ch
48}
49
50func (b *Bus) Unsubscribe(user string, ch <-chan Message) {
51 if b == nil {
52 return
53 }
54 b.mu.Lock()
55 defer b.mu.Unlock()
56
57 subs, ok := b.clients[user]
58 if !ok {
59 return
60 }
61
62 for i, sub := range subs {
63 if sub == ch {
64 // Remove the subscription by replacing it with the last element
65 // and then truncating the slice
66 subs[i] = subs[len(subs)-1]
67 b.clients[user] = subs[:len(subs)-1]
68 break
69 }
70 }
71}
72
73func (b *Bus) SubscribeToViewerCount() <-chan ViewerCountUpdate {
74 b.viewerCountsMutex.Lock()
75 defer b.viewerCountsMutex.Unlock()
76 ch := make(chan ViewerCountUpdate, 100)
77 b.viewerCountSubscriptions = append(b.viewerCountSubscriptions, ch)
78 return ch
79}
80
81func (b *Bus) Publish(user string, msg Message) {
82 b.mu.Lock()
83 defer b.mu.Unlock()
84 subs, ok := b.clients[user]
85 if !ok {
86 return
87 }
88 for _, sub := range subs {
89 go func(sub Subscription) {
90 sub <- msg
91 }(sub)
92 }
93}
94
95func (b *Bus) GetViewerCount(user string) int {
96 b.viewerCountsMutex.RLock()
97 defer b.viewerCountsMutex.RUnlock()
98 streamerCounts, ok := b.viewerCounts[user]
99 if !ok {
100 return 0
101 }
102 count := 0
103 for _, viewers := range streamerCounts {
104 count += viewers
105 }
106 return count
107}
108
109func (b *Bus) SetViewerCount(user string, origin string, count int) {
110 b.viewerCountsMutex.Lock()
111 defer b.viewerCountsMutex.Unlock()
112 _, ok := b.viewerCounts[user]
113 if !ok {
114 b.viewerCounts[user] = make(map[string]int)
115 }
116 b.viewerCounts[user][origin] = count
117 b.notifyViewerCountSubscribers(user, count, origin)
118}
119
120func (b *Bus) IncrementViewerCount(user string, origin string) {
121 b.viewerCountsMutex.Lock()
122 defer b.viewerCountsMutex.Unlock()
123 _, ok := b.viewerCounts[user]
124 if !ok {
125 b.viewerCounts[user] = make(map[string]int)
126 }
127 b.viewerCounts[user][origin] += 1
128 b.notifyViewerCountSubscribers(user, b.viewerCounts[user][origin], origin)
129}
130
131func (b *Bus) DecrementViewerCount(user string, origin string) {
132 b.viewerCountsMutex.Lock()
133 defer b.viewerCountsMutex.Unlock()
134 _, ok := b.viewerCounts[user]
135 if !ok {
136 b.viewerCounts[user] = make(map[string]int)
137 }
138 b.viewerCounts[user][origin] -= 1
139 b.notifyViewerCountSubscribers(user, b.viewerCounts[user][origin], origin)
140}
141
142// only call if you're holding viewerCountsMutex
143func (b *Bus) notifyViewerCountSubscribers(user string, count int, origin string) {
144 for _, sub := range b.viewerCountSubscriptions {
145 go func() {
146 sub <- ViewerCountUpdate{Streamer: user, Count: count, Origin: origin}
147 }()
148 }
149}