Live video on the AT Protocol
1package bus
2
3import (
4 "sync"
5)
6
7type Message any
8type Subscription chan Message
9
10// Bus is a simple pub/sub system for backing websocket connections
11type Bus struct {
12 mu sync.Mutex
13 clients map[string][]Subscription
14}
15
16func NewBus() *Bus {
17 return &Bus{
18 clients: make(map[string][]Subscription),
19 }
20}
21
22func (b *Bus) Subscribe(user string) <-chan Message {
23 b.mu.Lock()
24 defer b.mu.Unlock()
25 ch := make(chan Message, 100)
26 b.clients[user] = append(b.clients[user], ch)
27 return ch
28}
29
30func (b *Bus) Unsubscribe(user string, ch <-chan Message) {
31 b.mu.Lock()
32 defer b.mu.Unlock()
33
34 subs, ok := b.clients[user]
35 if !ok {
36 return
37 }
38
39 for i, sub := range subs {
40 if sub == ch {
41 // Remove the subscription by replacing it with the last element
42 // and then truncating the slice
43 subs[i] = subs[len(subs)-1]
44 b.clients[user] = subs[:len(subs)-1]
45 break
46 }
47 }
48}
49
50func (b *Bus) Publish(user string, msg Message) {
51 b.mu.Lock()
52 defer b.mu.Unlock()
53 for _, sub := range b.clients[user] {
54 go func(sub Subscription) {
55 sub <- msg
56 }(sub)
57 }
58}