Live video on the AT Protocol
at natb/command-errors 149 lines 3.6 kB view raw
1package bus 2 3import ( 4 "sync" 5) 6 7type Message any 8type Subscription chan Message 9 10type ViewerCountUpdate struct { 11 Streamer string 12 Count int 13 Origin string 14} 15 16// Bus is a simple pub/sub system for backing websocket connections 17type Bus struct { 18 mu sync.Mutex 19 clients map[string][]Subscription 20 segChans map[string][]*SegChan 21 segChansMutex sync.Mutex 22 segBuf map[string][]*Seg 23 segBufMutex sync.RWMutex 24 viewerCounts map[string]map[string]int 25 viewerCountsMutex sync.RWMutex 26 viewerCountSubscriptions []chan ViewerCountUpdate 27} 28 29func NewBus() *Bus { 30 return &Bus{ 31 clients: make(map[string][]Subscription), 32 segChans: make(map[string][]*SegChan), 33 segBuf: make(map[string][]*Seg), 34 viewerCounts: make(map[string]map[string]int), 35 viewerCountSubscriptions: []chan ViewerCountUpdate{}, 36 } 37} 38 39func (b *Bus) Subscribe(user string) <-chan Message { 40 if b == nil { 41 return make(<-chan Message) 42 } 43 b.mu.Lock() 44 defer b.mu.Unlock() 45 ch := make(chan Message, 100) 46 b.clients[user] = append(b.clients[user], ch) 47 return ch 48} 49 50func (b *Bus) Unsubscribe(user string, ch <-chan Message) { 51 if b == nil { 52 return 53 } 54 b.mu.Lock() 55 defer b.mu.Unlock() 56 57 subs, ok := b.clients[user] 58 if !ok { 59 return 60 } 61 62 for i, sub := range subs { 63 if sub == ch { 64 // Remove the subscription by replacing it with the last element 65 // and then truncating the slice 66 subs[i] = subs[len(subs)-1] 67 b.clients[user] = subs[:len(subs)-1] 68 break 69 } 70 } 71} 72 73func (b *Bus) SubscribeToViewerCount() <-chan ViewerCountUpdate { 74 b.viewerCountsMutex.Lock() 75 defer b.viewerCountsMutex.Unlock() 76 ch := make(chan ViewerCountUpdate, 100) 77 b.viewerCountSubscriptions = append(b.viewerCountSubscriptions, ch) 78 return ch 79} 80 81func (b *Bus) Publish(user string, msg Message) { 82 b.mu.Lock() 83 defer b.mu.Unlock() 84 subs, ok := b.clients[user] 85 if !ok { 86 return 87 } 88 for _, sub := range subs { 89 go func(sub Subscription) { 90 sub <- msg 91 }(sub) 92 } 93} 94 95func (b *Bus) GetViewerCount(user string) int { 96 b.viewerCountsMutex.RLock() 97 defer b.viewerCountsMutex.RUnlock() 98 streamerCounts, ok := b.viewerCounts[user] 99 if !ok { 100 return 0 101 } 102 count := 0 103 for _, viewers := range streamerCounts { 104 count += viewers 105 } 106 return count 107} 108 109func (b *Bus) SetViewerCount(user string, origin string, count int) { 110 b.viewerCountsMutex.Lock() 111 defer b.viewerCountsMutex.Unlock() 112 _, ok := b.viewerCounts[user] 113 if !ok { 114 b.viewerCounts[user] = make(map[string]int) 115 } 116 b.viewerCounts[user][origin] = count 117 b.notifyViewerCountSubscribers(user, count, origin) 118} 119 120func (b *Bus) IncrementViewerCount(user string, origin string) { 121 b.viewerCountsMutex.Lock() 122 defer b.viewerCountsMutex.Unlock() 123 _, ok := b.viewerCounts[user] 124 if !ok { 125 b.viewerCounts[user] = make(map[string]int) 126 } 127 b.viewerCounts[user][origin] += 1 128 b.notifyViewerCountSubscribers(user, b.viewerCounts[user][origin], origin) 129} 130 131func (b *Bus) DecrementViewerCount(user string, origin string) { 132 b.viewerCountsMutex.Lock() 133 defer b.viewerCountsMutex.Unlock() 134 _, ok := b.viewerCounts[user] 135 if !ok { 136 b.viewerCounts[user] = make(map[string]int) 137 } 138 b.viewerCounts[user][origin] -= 1 139 b.notifyViewerCountSubscribers(user, b.viewerCounts[user][origin], origin) 140} 141 142// only call if you're holding viewerCountsMutex 143func (b *Bus) notifyViewerCountSubscribers(user string, count int, origin string) { 144 for _, sub := range b.viewerCountSubscriptions { 145 go func() { 146 sub <- ViewerCountUpdate{Streamer: user, Count: count, Origin: origin} 147 }() 148 } 149}