Live video on the AT Protocol
1package bus
2
3import (
4 "context"
5 "fmt"
6 "time"
7
8 "go.opentelemetry.io/otel"
9 "stream.place/streamplace/pkg/log"
10 "stream.place/streamplace/pkg/spmetrics"
11)
12
13// it's a segment channel manager, you see
14
15type Seg struct {
16 Filepath string
17 Data []byte
18 PacketizedData *PacketizedSegment
19}
20
21type PacketizedSegment struct {
22 Video [][]byte
23 Audio [][]byte
24 Duration time.Duration
25}
26
27var chanSize = 1024
28
29type SegChan struct {
30 C chan *Seg
31 Context context.Context
32}
33
34var bufSize = 10
35
36func segChanKey(user string, rendition string) string {
37 return fmt.Sprintf("%s::%s", user, rendition)
38}
39
40// get a channel to subscribe to new segments for a given user and rendition
41func (b *Bus) SubscribeSegment(ctx context.Context, user string, rendition string) *SegChan {
42 return b.SubscribeSegmentBuf(ctx, user, rendition, 0)
43}
44
45// get a channel to subscribe to new segments for a given user and rendition,
46// starting with bufSize cached segments that we already have
47func (b *Bus) SubscribeSegmentBuf(ctx context.Context, user string, rendition string, bufSize int) *SegChan {
48 key := segChanKey(user, rendition)
49 b.segChansMutex.Lock()
50 defer b.segChansMutex.Unlock()
51 chs, ok := b.segChans[key]
52 if !ok {
53 chs = []*SegChan{}
54 b.segChans[key] = chs
55 }
56 ch := make(chan *Seg)
57 b.segBufMutex.RLock()
58 defer b.segBufMutex.RUnlock()
59 curBuf, ok := b.segBuf[key]
60 myCh := make(chan *Seg, chanSize)
61 if ok {
62 if bufSize > len(curBuf) {
63 bufSize = len(curBuf)
64 }
65 for i := 0; i < bufSize; i += 1 {
66 myCh <- curBuf[len(curBuf)-bufSize+i]
67 }
68 }
69 segChan := &SegChan{C: ch, Context: ctx}
70 chs = append(chs, segChan)
71 b.segChans[key] = chs
72 spmetrics.SegmentSubscriptionsOpen.WithLabelValues(user, rendition).Set(float64(len(chs)))
73 return segChan
74}
75
76// unsubscribe from a channel for a given user and rendition
77func (b *Bus) UnsubscribeSegment(ctx context.Context, user string, rendition string, ch *SegChan) {
78 key := segChanKey(user, rendition)
79 b.segChansMutex.Lock()
80 defer b.segChansMutex.Unlock()
81 chs, ok := b.segChans[key]
82 if !ok {
83 return
84 }
85 for i, c := range chs {
86 if c == ch {
87 chs = append(chs[:i], chs[i+1:]...)
88 break
89 }
90 }
91 spmetrics.SegmentSubscriptionsOpen.WithLabelValues(user, rendition).Set(float64(len(chs)))
92 b.segChans[key] = chs
93}
94
95func (b *Bus) PublishSegment(ctx context.Context, user string, rendition string, seg *Seg) {
96 ctx, span := otel.Tracer("signer").Start(ctx, "PublishSegment")
97 defer span.End()
98 key := segChanKey(user, rendition)
99 b.segChansMutex.Lock()
100 defer b.segChansMutex.Unlock()
101 b.segBufMutex.Lock()
102 defer b.segBufMutex.Unlock()
103 curBuf, ok := b.segBuf[key]
104 if !ok {
105 curBuf = []*Seg{}
106 b.segBuf[key] = curBuf
107 }
108 curBuf = append(curBuf, seg)
109 if len(curBuf) > bufSize {
110 curBuf = curBuf[1:]
111 }
112 b.segBuf[key] = curBuf
113 chs, ok := b.segChans[key]
114 if !ok {
115 return
116 }
117 for _, ch := range chs {
118 go func(segChan *SegChan) {
119 select {
120 case segChan.C <- seg:
121 case <-segChan.Context.Done():
122 return
123 case <-time.After(1 * time.Minute):
124 log.Warn(ctx, "failed to send segment to channel, timing out", "user", user, "rendition", rendition)
125 }
126
127 }(ch)
128 }
129}
130
131func (b *Bus) EndSession(ctx context.Context, user string, rendition string) {
132 b.segChansMutex.Lock()
133 defer b.segChansMutex.Unlock()
134 b.segBufMutex.Lock()
135 defer b.segBufMutex.Unlock()
136
137 key := segChanKey(user, rendition)
138 delete(b.segBuf, key)
139}