Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at v0.8.3 149 lines 3.6 kB view raw
1package bus 2 3import ( 4 "sync" 5) 6 7type Message any 8type Subscription chan Message 9 10type ViewerCountUpdate struct { 11 Streamer string 12 Count int 13 Origin string 14} 15 16// Bus is a simple pub/sub system for backing websocket connections 17type Bus struct { 18 mu sync.Mutex 19 clients map[string][]Subscription 20 segChans map[string][]*SegChan 21 segChansMutex sync.Mutex 22 segBuf map[string][]*Seg 23 segBufMutex sync.RWMutex 24 viewerCounts map[string]map[string]int 25 viewerCountsMutex sync.RWMutex 26 viewerCountSubscriptions []chan ViewerCountUpdate 27} 28 29func NewBus() *Bus { 30 return &Bus{ 31 clients: make(map[string][]Subscription), 32 segChans: make(map[string][]*SegChan), 33 segBuf: make(map[string][]*Seg), 34 viewerCounts: make(map[string]map[string]int), 35 viewerCountSubscriptions: []chan ViewerCountUpdate{}, 36 } 37} 38 39func (b *Bus) Subscribe(user string) <-chan Message { 40 if b == nil { 41 return make(<-chan Message) 42 } 43 b.mu.Lock() 44 defer b.mu.Unlock() 45 ch := make(chan Message, 100) 46 b.clients[user] = append(b.clients[user], ch) 47 return ch 48} 49 50func (b *Bus) Unsubscribe(user string, ch <-chan Message) { 51 if b == nil { 52 return 53 } 54 b.mu.Lock() 55 defer b.mu.Unlock() 56 57 subs, ok := b.clients[user] 58 if !ok { 59 return 60 } 61 62 for i, sub := range subs { 63 if sub == ch { 64 // Remove the subscription by replacing it with the last element 65 // and then truncating the slice 66 subs[i] = subs[len(subs)-1] 67 b.clients[user] = subs[:len(subs)-1] 68 break 69 } 70 } 71} 72 73func (b *Bus) SubscribeToViewerCount() <-chan ViewerCountUpdate { 74 b.viewerCountsMutex.Lock() 75 defer b.viewerCountsMutex.Unlock() 76 ch := make(chan ViewerCountUpdate, 100) 77 b.viewerCountSubscriptions = append(b.viewerCountSubscriptions, ch) 78 return ch 79} 80 81func (b *Bus) Publish(user string, msg Message) { 82 b.mu.Lock() 83 defer b.mu.Unlock() 84 subs, ok := b.clients[user] 85 if !ok { 86 return 87 } 88 for _, sub := range subs { 89 go func(sub Subscription) { 90 sub <- msg 91 }(sub) 92 } 93} 94 95func (b *Bus) GetViewerCount(user string) int { 96 b.viewerCountsMutex.RLock() 97 defer b.viewerCountsMutex.RUnlock() 98 streamerCounts, ok := b.viewerCounts[user] 99 if !ok { 100 return 0 101 } 102 count := 0 103 for _, viewers := range streamerCounts { 104 count += viewers 105 } 106 return count 107} 108 109func (b *Bus) SetViewerCount(user string, origin string, count int) { 110 b.viewerCountsMutex.Lock() 111 defer b.viewerCountsMutex.Unlock() 112 _, ok := b.viewerCounts[user] 113 if !ok { 114 b.viewerCounts[user] = make(map[string]int) 115 } 116 b.viewerCounts[user][origin] = count 117 b.notifyViewerCountSubscribers(user, count, origin) 118} 119 120func (b *Bus) IncrementViewerCount(user string, origin string) { 121 b.viewerCountsMutex.Lock() 122 defer b.viewerCountsMutex.Unlock() 123 _, ok := b.viewerCounts[user] 124 if !ok { 125 b.viewerCounts[user] = make(map[string]int) 126 } 127 b.viewerCounts[user][origin] += 1 128 b.notifyViewerCountSubscribers(user, b.viewerCounts[user][origin], origin) 129} 130 131func (b *Bus) DecrementViewerCount(user string, origin string) { 132 b.viewerCountsMutex.Lock() 133 defer b.viewerCountsMutex.Unlock() 134 _, ok := b.viewerCounts[user] 135 if !ok { 136 b.viewerCounts[user] = make(map[string]int) 137 } 138 b.viewerCounts[user][origin] -= 1 139 b.notifyViewerCountSubscribers(user, b.viewerCounts[user][origin], origin) 140} 141 142// only call if you're holding viewerCountsMutex 143func (b *Bus) notifyViewerCountSubscribers(user string, count int, origin string) { 144 for _, sub := range b.viewerCountSubscriptions { 145 go func() { 146 sub <- ViewerCountUpdate{Streamer: user, Count: count, Origin: origin} 147 }() 148 } 149}