Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at v0.7.29 74 lines 1.4 kB view raw
1package bus 2 3import ( 4 "sync" 5) 6 7type Message any 8type Subscription chan Message 9 10// Bus is a simple pub/sub system for backing websocket connections 11type Bus struct { 12 mu sync.Mutex 13 clients map[string][]Subscription 14 segChans map[string][]*SegChan 15 segChansMutex sync.Mutex 16 segBuf map[string][]*Seg 17 segBufMutex sync.RWMutex 18} 19 20func NewBus() *Bus { 21 return &Bus{ 22 clients: make(map[string][]Subscription), 23 segChans: make(map[string][]*SegChan), 24 segBuf: make(map[string][]*Seg), 25 } 26} 27 28func (b *Bus) Subscribe(user string) <-chan Message { 29 if b == nil { 30 return make(<-chan Message) 31 } 32 b.mu.Lock() 33 defer b.mu.Unlock() 34 ch := make(chan Message, 100) 35 b.clients[user] = append(b.clients[user], ch) 36 return ch 37} 38 39func (b *Bus) Unsubscribe(user string, ch <-chan Message) { 40 if b == nil { 41 return 42 } 43 b.mu.Lock() 44 defer b.mu.Unlock() 45 46 subs, ok := b.clients[user] 47 if !ok { 48 return 49 } 50 51 for i, sub := range subs { 52 if sub == ch { 53 // Remove the subscription by replacing it with the last element 54 // and then truncating the slice 55 subs[i] = subs[len(subs)-1] 56 b.clients[user] = subs[:len(subs)-1] 57 break 58 } 59 } 60} 61 62func (b *Bus) Publish(user string, msg Message) { 63 b.mu.Lock() 64 defer b.mu.Unlock() 65 subs, ok := b.clients[user] 66 if !ok { 67 return 68 } 69 for _, sub := range subs { 70 go func(sub Subscription) { 71 sub <- msg 72 }(sub) 73 } 74}