Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at eli/revert-dev-env 139 lines 3.4 kB view raw
1package bus 2 3import ( 4 "context" 5 "fmt" 6 "time" 7 8 "go.opentelemetry.io/otel" 9 "stream.place/streamplace/pkg/log" 10 "stream.place/streamplace/pkg/spmetrics" 11) 12 13// it's a segment channel manager, you see 14 15type Seg struct { 16 Filepath string 17 Data []byte 18 PacketizedData *PacketizedSegment 19} 20 21type PacketizedSegment struct { 22 Video [][]byte 23 Audio [][]byte 24 Duration time.Duration 25} 26 27var chanSize = 1024 28 29type SegChan struct { 30 C chan *Seg 31 Context context.Context 32} 33 34var bufSize = 10 35 36func segChanKey(user string, rendition string) string { 37 return fmt.Sprintf("%s::%s", user, rendition) 38} 39 40// get a channel to subscribe to new segments for a given user and rendition 41func (b *Bus) SubscribeSegment(ctx context.Context, user string, rendition string) *SegChan { 42 return b.SubscribeSegmentBuf(ctx, user, rendition, 0) 43} 44 45// get a channel to subscribe to new segments for a given user and rendition, 46// starting with bufSize cached segments that we already have 47func (b *Bus) SubscribeSegmentBuf(ctx context.Context, user string, rendition string, bufSize int) *SegChan { 48 key := segChanKey(user, rendition) 49 b.segChansMutex.Lock() 50 defer b.segChansMutex.Unlock() 51 chs, ok := b.segChans[key] 52 if !ok { 53 chs = []*SegChan{} 54 b.segChans[key] = chs 55 } 56 ch := make(chan *Seg) 57 b.segBufMutex.RLock() 58 defer b.segBufMutex.RUnlock() 59 curBuf, ok := b.segBuf[key] 60 myCh := make(chan *Seg, chanSize) 61 if ok { 62 if bufSize > len(curBuf) { 63 bufSize = len(curBuf) 64 } 65 for i := 0; i < bufSize; i += 1 { 66 myCh <- curBuf[len(curBuf)-bufSize+i] 67 } 68 } 69 segChan := &SegChan{C: ch, Context: ctx} 70 chs = append(chs, segChan) 71 b.segChans[key] = chs 72 spmetrics.SegmentSubscriptionsOpen.WithLabelValues(user, rendition).Set(float64(len(chs))) 73 return segChan 74} 75 76// unsubscribe from a channel for a given user and rendition 77func (b *Bus) UnsubscribeSegment(ctx context.Context, user string, rendition string, ch *SegChan) { 78 key := segChanKey(user, rendition) 79 b.segChansMutex.Lock() 80 defer b.segChansMutex.Unlock() 81 chs, ok := b.segChans[key] 82 if !ok { 83 return 84 } 85 for i, c := range chs { 86 if c == ch { 87 chs = append(chs[:i], chs[i+1:]...) 88 break 89 } 90 } 91 spmetrics.SegmentSubscriptionsOpen.WithLabelValues(user, rendition).Set(float64(len(chs))) 92 b.segChans[key] = chs 93} 94 95func (b *Bus) PublishSegment(ctx context.Context, user string, rendition string, seg *Seg) { 96 ctx, span := otel.Tracer("signer").Start(ctx, "PublishSegment") 97 defer span.End() 98 key := segChanKey(user, rendition) 99 b.segChansMutex.Lock() 100 defer b.segChansMutex.Unlock() 101 b.segBufMutex.Lock() 102 defer b.segBufMutex.Unlock() 103 curBuf, ok := b.segBuf[key] 104 if !ok { 105 curBuf = []*Seg{} 106 b.segBuf[key] = curBuf 107 } 108 curBuf = append(curBuf, seg) 109 if len(curBuf) > bufSize { 110 curBuf = curBuf[1:] 111 } 112 b.segBuf[key] = curBuf 113 chs, ok := b.segChans[key] 114 if !ok { 115 return 116 } 117 for _, ch := range chs { 118 go func(segChan *SegChan) { 119 select { 120 case segChan.C <- seg: 121 case <-segChan.Context.Done(): 122 return 123 case <-time.After(1 * time.Minute): 124 log.Warn(ctx, "failed to send segment to channel, timing out", "user", user, "rendition", rendition) 125 } 126 127 }(ch) 128 } 129} 130 131func (b *Bus) EndSession(ctx context.Context, user string, rendition string) { 132 b.segChansMutex.Lock() 133 defer b.segChansMutex.Unlock() 134 b.segBufMutex.Lock() 135 defer b.segBufMutex.Unlock() 136 137 key := segChanKey(user, rendition) 138 delete(b.segBuf, key) 139}