Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at v0.9.4 139 lines 3.4 kB view raw
1package bus 2 3import ( 4 "context" 5 "fmt" 6 "time" 7 8 "go.opentelemetry.io/otel" 9 "stream.place/streamplace/pkg/log" 10 "stream.place/streamplace/pkg/spmetrics" 11) 12 13// it's a segment channel manager, you see 14 15type Seg struct { 16 Filepath string 17 Data []byte 18 PacketizedData *PacketizedSegment 19} 20 21type PacketizedSegment struct { 22 Video [][]byte 23 Audio [][]byte 24 Duration time.Duration 25} 26 27var chanSize = 1024 28 29type SegChan struct { 30 C chan *Seg 31 Context context.Context 32} 33 34var bufSize = 10 35 36func segChanKey(user string, rendition string) string { 37 return fmt.Sprintf("%s::%s", user, rendition) 38} 39 40// get a channel to subscribe to new segments for a given user and rendition 41func (b *Bus) SubscribeSegment(ctx context.Context, user string, rendition string) *SegChan { 42 return b.SubscribeSegmentBuf(ctx, user, rendition, 0) 43} 44 45// get a channel to subscribe to new segments for a given user and rendition, 46// starting with bufSize cached segments that we already have 47func (b *Bus) SubscribeSegmentBuf(ctx context.Context, user string, rendition string, bufSize int) *SegChan { 48 key := segChanKey(user, rendition) 49 b.segChansMutex.Lock() 50 defer b.segChansMutex.Unlock() 51 chs, ok := b.segChans[key] 52 if !ok { 53 chs = []*SegChan{} 54 b.segChans[key] = chs 55 } 56 ch := make(chan *Seg) 57 b.segBufMutex.RLock() 58 defer b.segBufMutex.RUnlock() 59 curBuf, ok := b.segBuf[key] 60 myCh := make(chan *Seg, chanSize) 61 if ok { 62 if bufSize > len(curBuf) { 63 bufSize = len(curBuf) 64 } 65 for i := 0; i < bufSize; i += 1 { 66 myCh <- curBuf[len(curBuf)-bufSize+i] 67 } 68 } 69 segChan := &SegChan{C: ch, Context: ctx} 70 chs = append(chs, segChan) 71 b.segChans[key] = chs 72 spmetrics.SegmentSubscriptionsOpen.WithLabelValues(user, rendition).Set(float64(len(chs))) 73 return segChan 74} 75 76// unsubscribe from a channel for a given user and rendition 77func (b *Bus) UnsubscribeSegment(ctx context.Context, user string, rendition string, ch *SegChan) { 78 key := segChanKey(user, rendition) 79 b.segChansMutex.Lock() 80 defer b.segChansMutex.Unlock() 81 chs, ok := b.segChans[key] 82 if !ok { 83 return 84 } 85 for i, c := range chs { 86 if c == ch { 87 chs = append(chs[:i], chs[i+1:]...) 88 break 89 } 90 } 91 spmetrics.SegmentSubscriptionsOpen.WithLabelValues(user, rendition).Set(float64(len(chs))) 92 b.segChans[key] = chs 93} 94 95func (b *Bus) PublishSegment(ctx context.Context, user string, rendition string, seg *Seg) { 96 ctx, span := otel.Tracer("signer").Start(ctx, "PublishSegment") 97 defer span.End() 98 key := segChanKey(user, rendition) 99 b.segChansMutex.Lock() 100 defer b.segChansMutex.Unlock() 101 b.segBufMutex.Lock() 102 defer b.segBufMutex.Unlock() 103 curBuf, ok := b.segBuf[key] 104 if !ok { 105 curBuf = []*Seg{} 106 b.segBuf[key] = curBuf 107 } 108 curBuf = append(curBuf, seg) 109 if len(curBuf) > bufSize { 110 curBuf = curBuf[1:] 111 } 112 b.segBuf[key] = curBuf 113 chs, ok := b.segChans[key] 114 if !ok { 115 return 116 } 117 for _, ch := range chs { 118 go func(segChan *SegChan) { 119 select { 120 case segChan.C <- seg: 121 case <-segChan.Context.Done(): 122 return 123 case <-time.After(1 * time.Minute): 124 log.Warn(ctx, "failed to send segment to channel, timing out", "user", user, "rendition", rendition) 125 } 126 127 }(ch) 128 } 129} 130 131func (b *Bus) EndSession(ctx context.Context, user string, rendition string) { 132 b.segChansMutex.Lock() 133 defer b.segChansMutex.Unlock() 134 b.segBufMutex.Lock() 135 defer b.segBufMutex.Unlock() 136 137 key := segChanKey(user, rendition) 138 delete(b.segBuf, key) 139}