Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at eli/fix-type-export 70 lines 1.3 kB view raw
1package bus 2 3import ( 4 "sync" 5) 6 7type Message any 8type Subscription chan Message 9 10// Bus is a simple pub/sub system for backing websocket connections 11type Bus struct { 12 mu sync.Mutex 13 clients map[string][]Subscription 14 segChans map[string][]*SegChan 15 segChansMutex sync.Mutex 16 segBuf map[string][]*Seg 17 segBufMutex sync.RWMutex 18} 19 20func NewBus() *Bus { 21 return &Bus{ 22 clients: make(map[string][]Subscription), 23 segChans: make(map[string][]*SegChan), 24 segBuf: make(map[string][]*Seg), 25 } 26} 27 28func (b *Bus) Subscribe(user string) <-chan Message { 29 if b == nil { 30 return make(<-chan Message) 31 } 32 b.mu.Lock() 33 defer b.mu.Unlock() 34 ch := make(chan Message, 100) 35 b.clients[user] = append(b.clients[user], ch) 36 return ch 37} 38 39func (b *Bus) Unsubscribe(user string, ch <-chan Message) { 40 if b == nil { 41 return 42 } 43 b.mu.Lock() 44 defer b.mu.Unlock() 45 46 subs, ok := b.clients[user] 47 if !ok { 48 return 49 } 50 51 for i, sub := range subs { 52 if sub == ch { 53 // Remove the subscription by replacing it with the last element 54 // and then truncating the slice 55 subs[i] = subs[len(subs)-1] 56 b.clients[user] = subs[:len(subs)-1] 57 break 58 } 59 } 60} 61 62func (b *Bus) Publish(user string, msg Message) { 63 b.mu.Lock() 64 defer b.mu.Unlock() 65 for _, sub := range b.clients[user] { 66 go func(sub Subscription) { 67 sub <- msg 68 }(sub) 69 } 70}