Live video on the AT Protocol

refactor: move segchanman into the bus

+151 -161
+1 -1
pkg/api/api_internal.go
··· 146 146 errors.WriteHTTPBadRequest(w, "rendition required", nil) 147 147 return 148 148 } 149 - seg := <-a.MediaManager.SubscribeSegment(ctx, user, rendition) 149 + seg := <-a.Bus.SubscribeSegment(ctx, user, rendition) 150 150 base := filepath.Base(seg.Filepath) 151 151 w.Header().Set("Location", fmt.Sprintf("%s/playback/%s/%s/segment/%s\n", a.CLI.OwnInternalURL(), user, rendition, base)) 152 152 w.WriteHeader(301)
+6 -3
pkg/bus/bus.go
··· 9 9 10 10 // Bus is a simple pub/sub system for backing websocket connections 11 11 type Bus struct { 12 - mu sync.Mutex 13 - clients map[string][]Subscription 12 + mu sync.Mutex 13 + clients map[string][]Subscription 14 + segChans map[string][]chan *Seg 15 + segChansMutex sync.Mutex 14 16 } 15 17 16 18 func NewBus() *Bus { 17 19 return &Bus{ 18 - clients: make(map[string][]Subscription), 20 + clients: make(map[string][]Subscription), 21 + segChans: make(map[string][]chan *Seg), 19 22 } 20 23 } 21 24
+87
pkg/bus/segchanman.go
··· 1 + package bus 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "time" 7 + 8 + "go.opentelemetry.io/otel" 9 + "stream.place/streamplace/pkg/log" 10 + "stream.place/streamplace/pkg/spmetrics" 11 + ) 12 + 13 + // it's a segment channel manager, you see 14 + 15 + type Seg struct { 16 + Filepath string 17 + Data []byte 18 + PacketizedData *PacketizedSegment 19 + } 20 + 21 + type PacketizedSegment struct { 22 + Video [][]byte 23 + Audio [][]byte 24 + Duration time.Duration 25 + } 26 + 27 + func segChanKey(user string, rendition string) string { 28 + return fmt.Sprintf("%s::%s", user, rendition) 29 + } 30 + 31 + func (b *Bus) SubscribeSegment(ctx context.Context, user string, rendition string) <-chan *Seg { 32 + key := segChanKey(user, rendition) 33 + b.segChansMutex.Lock() 34 + defer b.segChansMutex.Unlock() 35 + chs, ok := b.segChans[key] 36 + if !ok { 37 + chs = []chan *Seg{} 38 + b.segChans[key] = chs 39 + } 40 + ch := make(chan *Seg) 41 + chs = append(chs, ch) 42 + b.segChans[key] = chs 43 + spmetrics.SegmentSubscriptionsOpen.WithLabelValues(user, rendition).Set(float64(len(chs))) 44 + return ch 45 + } 46 + 47 + func (b *Bus) UnsubscribeSegment(ctx context.Context, user string, rendition string, ch <-chan *Seg) { 48 + key := segChanKey(user, rendition) 49 + b.segChansMutex.Lock() 50 + defer b.segChansMutex.Unlock() 51 + chs, ok := b.segChans[key] 52 + if !ok { 53 + return 54 + } 55 + for i, c := range chs { 56 + if c == ch { 57 + chs = append(chs[:i], chs[i+1:]...) 58 + break 59 + } 60 + } 61 + spmetrics.SegmentSubscriptionsOpen.WithLabelValues(user, rendition).Set(float64(len(chs))) 62 + b.segChans[key] = chs 63 + } 64 + 65 + func (b *Bus) PublishSegment(ctx context.Context, user string, rendition string, seg *Seg) { 66 + ctx, span := otel.Tracer("signer").Start(ctx, "PublishSegment") 67 + defer span.End() 68 + key := segChanKey(user, rendition) 69 + b.segChansMutex.Lock() 70 + defer b.segChansMutex.Unlock() 71 + chs, ok := b.segChans[key] 72 + if !ok { 73 + return 74 + } 75 + for _, ch := range chs { 76 + go func(ch chan *Seg) { 77 + select { 78 + case ch <- seg: 79 + case <-ctx.Done(): 80 + return 81 + case <-time.After(1 * time.Minute): 82 + log.Warn(ctx, "failed to send segment to channel, timing out", "user", user, "rendition", rendition) 83 + } 84 + 85 + }(ch) 86 + } 87 + }
+1
pkg/director/director.go
··· 66 66 bus: d.bus, 67 67 segmentChan: make(chan struct{}), 68 68 op: d.op, 69 + packets: make([]bus.PacketizedSegment, 0), 69 70 } 70 71 d.streamSessions[not.Segment.RepoDID] = ss 71 72 g.Go(func() error {
+26 -7
pkg/director/stream_session.go
··· 19 19 "stream.place/streamplace/pkg/livepeer" 20 20 "stream.place/streamplace/pkg/log" 21 21 "stream.place/streamplace/pkg/media" 22 - "stream.place/streamplace/pkg/media/segchanman" 23 22 "stream.place/streamplace/pkg/model" 24 23 "stream.place/streamplace/pkg/renditions" 25 24 "stream.place/streamplace/pkg/spmetrics" ··· 40 39 lastStatus time.Time 41 40 lastStatusLock sync.Mutex 42 41 g *errgroup.Group 42 + packets []bus.PacketizedSegment 43 43 } 44 44 45 45 func (ss *StreamSession) Start(ctx context.Context, not *media.NewSegmentNotification) error { ··· 139 139 140 140 ss.bus.Publish(spseg.Creator, spseg) 141 141 ss.Go(ctx, func() error { 142 - return ss.AddToHLS(ctx, spseg, "source", not.Data) 142 + return ss.AddPlaybackSegment(ctx, spseg, "source", &bus.Seg{ 143 + Filepath: not.Segment.ID, 144 + Data: not.Data, 145 + }) 143 146 }) 144 147 145 148 if ss.cli.Thumbnail { ··· 390 393 return fmt.Errorf("failed to write transcoded segment file: %w", err) 391 394 } 392 395 ss.Go(ctx, func() error { 393 - return ss.AddToHLS(ctx, spseg, rs[i].Name, seg) 394 - }) 395 - ss.Go(ctx, func() error { 396 - ss.mm.PublishSegment(ctx, spseg.Creator, rs[i].Name, &segchanman.Seg{ 396 + return ss.AddPlaybackSegment(ctx, spseg, rs[i].Name, &bus.Seg{ 397 397 Filepath: fd.Name(), 398 398 Data: seg, 399 399 }) 400 - return nil 401 400 }) 402 401 403 402 } 403 + return nil 404 + } 405 + 406 + func (ss *StreamSession) AddPlaybackSegment(ctx context.Context, spseg *streamplace.Segment, rendition string, seg *bus.Seg) error { 407 + ss.Go(ctx, func() error { 408 + return ss.AddToHLS(ctx, spseg, rendition, seg.Data) 409 + }) 410 + ss.Go(ctx, func() error { 411 + return ss.AddToWebRTC(ctx, spseg, rendition, seg) 412 + }) 413 + return nil 414 + } 415 + 416 + func (ss *StreamSession) AddToWebRTC(ctx context.Context, spseg *streamplace.Segment, rendition string, seg *bus.Seg) error { 417 + packet, err := media.Packetize(ctx, seg) 418 + if err != nil { 419 + return fmt.Errorf("failed to packetize segment: %w", err) 420 + } 421 + seg.PacketizedData = packet 422 + ss.bus.PublishSegment(ctx, spseg.Creator, rendition, seg) 404 423 return nil 405 424 } 406 425
+3 -3
pkg/media/concat.go
··· 11 11 12 12 "github.com/go-gst/go-gst/gst" 13 13 "github.com/go-gst/go-gst/gst/app" 14 + "stream.place/streamplace/pkg/bus" 14 15 "stream.place/streamplace/pkg/log" 15 - "stream.place/streamplace/pkg/media/segchanman" 16 16 ) 17 17 18 18 type ConcatStreamer interface { 19 - SubscribeSegment(ctx context.Context, user string, rendition string) <-chan *segchanman.Seg 20 - UnsubscribeSegment(ctx context.Context, user string, rendition string, ch <-chan *segchanman.Seg) 19 + SubscribeSegment(ctx context.Context, user string, rendition string) <-chan *bus.Seg 20 + UnsubscribeSegment(ctx context.Context, user string, rendition string, ch <-chan *bus.Seg) 21 21 } 22 22 23 23 // This function remains in scope for the duration of a single users' playback
+3 -3
pkg/media/concat2.go
··· 6 6 "fmt" 7 7 8 8 "github.com/go-gst/go-gst/gst" 9 + "stream.place/streamplace/pkg/bus" 9 10 "stream.place/streamplace/pkg/log" 10 - "stream.place/streamplace/pkg/media/segchanman" 11 11 ) 12 12 13 13 var ErrConcatDone = errors.New("concat done") 14 14 15 - func ConcatBin(ctx context.Context, segCh <-chan *segchanman.Seg) (*gst.Bin, error) { 15 + func ConcatBin(ctx context.Context, segCh <-chan *bus.Seg) (*gst.Bin, error) { 16 16 ctx = log.WithLogValues(ctx, "func", "ConcatBin") 17 17 bin := gst.NewBin("concat-bin") 18 18 ··· 132 132 return bin, nil 133 133 } 134 134 135 - func addConcatDemuxer(ctx context.Context, bin *gst.Bin, seg *segchanman.Seg, syncPadVideoSink *gst.Pad, syncPadAudioSink *gst.Pad) error { 135 + func addConcatDemuxer(ctx context.Context, bin *gst.Bin, seg *bus.Seg, syncPadVideoSink *gst.Pad, syncPadAudioSink *gst.Pad) error { 136 136 137 137 log.Debug(ctx, "adding concat demuxer", "seg", seg.Filepath) 138 138 demuxBin, err := ConcatDemuxBin(ctx, seg)
+4 -4
pkg/media/concat2_test.go
··· 15 15 "github.com/stretchr/testify/require" 16 16 "go.uber.org/goleak" 17 17 "golang.org/x/sync/errgroup" 18 + "stream.place/streamplace/pkg/bus" 18 19 "stream.place/streamplace/pkg/gstinit" 19 20 "stream.place/streamplace/pkg/log" 20 - "stream.place/streamplace/pkg/media/segchanman" 21 21 ) 22 22 23 23 func TestConcatBin(t *testing.T) { ··· 84 84 return fmt.Errorf("failed to read fixture file: %w", err) 85 85 } 86 86 87 - testSegs := []*segchanman.Seg{} 87 + testSegs := []*bus.Seg{} 88 88 for range 5 { 89 - testSegs = append(testSegs, &segchanman.Seg{ 89 + testSegs = append(testSegs, &bus.Seg{ 90 90 Data: bs, 91 91 Filepath: filename, 92 92 }) 93 93 } 94 94 95 - segCh := make(chan *segchanman.Seg) 95 + segCh := make(chan *bus.Seg) 96 96 go func() { 97 97 for _, seg := range testSegs { 98 98 segCh <- seg
+2 -2
pkg/media/concat_demux.go
··· 8 8 9 9 "github.com/go-gst/go-gst/gst" 10 10 "github.com/go-gst/go-gst/gst/app" 11 + "stream.place/streamplace/pkg/bus" 11 12 "stream.place/streamplace/pkg/log" 12 - "stream.place/streamplace/pkg/media/segchanman" 13 13 ) 14 14 15 15 // silly technique to avoid leaking pads 16 16 func doNothing(self *gst.Element, pad *gst.Pad) {} 17 17 18 - func ConcatDemuxBin(ctx context.Context, seg *segchanman.Seg) (*gst.Bin, error) { 18 + func ConcatDemuxBin(ctx context.Context, seg *bus.Seg) (*gst.Bin, error) { 19 19 ctx = log.WithLogValues(ctx, "func", "SegDemuxBin") 20 20 bin := gst.NewBin("seg-demux-bin") 21 21
+2 -2
pkg/media/concat_demux_test.go
··· 13 13 "github.com/stretchr/testify/require" 14 14 "go.uber.org/goleak" 15 15 "golang.org/x/sync/errgroup" 16 + "stream.place/streamplace/pkg/bus" 16 17 "stream.place/streamplace/pkg/gstinit" 17 18 "stream.place/streamplace/pkg/log" 18 - "stream.place/streamplace/pkg/media/segchanman" 19 19 ) 20 20 21 21 func TestConcatDemuxBin(t *testing.T) { ··· 79 79 return fmt.Errorf("failed to read fixture file: %w", err) 80 80 } 81 81 82 - testSeg := &segchanman.Seg{ 82 + testSeg := &bus.Seg{ 83 83 Data: bs, 84 84 Filepath: filename, 85 85 }
-17
pkg/media/media.go
··· 19 19 "stream.place/streamplace/pkg/bus" 20 20 "stream.place/streamplace/pkg/config" 21 21 "stream.place/streamplace/pkg/gstinit" 22 - "stream.place/streamplace/pkg/media/segchanman" 23 22 "stream.place/streamplace/pkg/model" 24 23 25 24 "stream.place/streamplace/pkg/replication" ··· 35 34 36 35 type MediaManager struct { 37 36 cli *config.CLI 38 - segChanMan *segchanman.SegChanMan 39 37 replicator replication.Replicator 40 38 hlsRunning map[string]*M3U8 41 39 hlsRunningMut sync.Mutex ··· 116 114 } 117 115 return &MediaManager{ 118 116 cli: cli, 119 - segChanMan: segchanman.MakeSegChanMan(), 120 117 replicator: rep, 121 118 hlsRunning: map[string]*M3U8{}, 122 119 httpPipes: map[string]io.Writer{}, ··· 160 157 defer mm.newSegmentSubsMutex.Unlock() 161 158 mm.newSegmentSubs = append(mm.newSegmentSubs, ch) 162 159 return ch 163 - } 164 - 165 - // subscribe to the latest segments from a given user for livestreaming purposes 166 - func (mm *MediaManager) SubscribeSegment(ctx context.Context, user string, rendition string) <-chan *segchanman.Seg { 167 - return mm.segChanMan.SubscribeSegment(ctx, user, rendition) 168 - } 169 - 170 - func (mm *MediaManager) UnsubscribeSegment(ctx context.Context, user string, rendition string, ch <-chan *segchanman.Seg) { 171 - mm.segChanMan.UnsubscribeSegment(ctx, user, rendition, ch) 172 - } 173 - 174 - // subscribe to the latest segments from a given user for livestreaming purposes 175 - func (mm *MediaManager) PublishSegment(ctx context.Context, user, rendition string, seg *segchanman.Seg) { 176 - mm.segChanMan.PublishSegment(ctx, user, rendition, seg) 177 160 } 178 161 179 162 type obj map[string]any
+3 -9
pkg/media/packetize.go
··· 8 8 9 9 "github.com/go-gst/go-gst/gst" 10 10 "github.com/go-gst/go-gst/gst/app" 11 + "stream.place/streamplace/pkg/bus" 11 12 "stream.place/streamplace/pkg/log" 12 - "stream.place/streamplace/pkg/media/segchanman" 13 13 ) 14 14 15 - type PacketizedSegment struct { 16 - Video [][]byte 17 - Audio [][]byte 18 - Duration time.Duration 19 - } 20 - 21 15 // take in a segment and return a bunch of packets suitable for webrtc 22 - func Packetize(ctx context.Context, seg *segchanman.Seg) (*PacketizedSegment, error) { 16 + func Packetize(ctx context.Context, seg *bus.Seg) (*bus.PacketizedSegment, error) { 23 17 24 18 pipelineSlice := []string{ 25 19 "h264parse name=videoparse ! video/x-h264,stream-format=byte-stream ! appsink sync=false name=videoappsink", ··· 197 191 return nil, fmt.Errorf("pipeline error: %w", err) 198 192 } 199 193 200 - return &PacketizedSegment{ 194 + return &bus.PacketizedSegment{ 201 195 Video: videoOutput, 202 196 Audio: audioOutput, 203 197 Duration: segDur,
+2 -2
pkg/media/packetize_test.go
··· 10 10 "github.com/stretchr/testify/require" 11 11 "go.uber.org/goleak" 12 12 "golang.org/x/sync/errgroup" 13 + "stream.place/streamplace/pkg/bus" 13 14 "stream.place/streamplace/pkg/gstinit" 14 - "stream.place/streamplace/pkg/media/segchanman" 15 15 ) 16 16 17 17 func TestPacketize(t *testing.T) { ··· 41 41 bs, err := io.ReadAll(inputFile) 42 42 require.NoError(t, err) 43 43 44 - testSeg := &segchanman.Seg{ 44 + testSeg := &bus.Seg{ 45 45 Data: bs, 46 46 Filepath: filename, 47 47 }
-92
pkg/media/segchanman/segchanman.go
··· 1 - package segchanman 2 - 3 - import ( 4 - "context" 5 - "fmt" 6 - "sync" 7 - "time" 8 - 9 - "go.opentelemetry.io/otel" 10 - "stream.place/streamplace/pkg/log" 11 - "stream.place/streamplace/pkg/spmetrics" 12 - ) 13 - 14 - // it's a segment channel manager, you see 15 - 16 - type Seg struct { 17 - Filepath string 18 - Data []byte 19 - } 20 - 21 - type SegChanMan struct { 22 - segChans map[string][]chan *Seg 23 - segChansMutex sync.Mutex 24 - } 25 - 26 - func MakeSegChanMan() *SegChanMan { 27 - return &SegChanMan{ 28 - segChans: make(map[string][]chan *Seg), 29 - } 30 - } 31 - 32 - func segChanKey(user string, rendition string) string { 33 - return fmt.Sprintf("%s::%s", user, rendition) 34 - } 35 - 36 - func (s *SegChanMan) SubscribeSegment(ctx context.Context, user string, rendition string) <-chan *Seg { 37 - key := segChanKey(user, rendition) 38 - s.segChansMutex.Lock() 39 - defer s.segChansMutex.Unlock() 40 - chs, ok := s.segChans[key] 41 - if !ok { 42 - chs = []chan *Seg{} 43 - s.segChans[key] = chs 44 - } 45 - ch := make(chan *Seg) 46 - chs = append(chs, ch) 47 - s.segChans[key] = chs 48 - spmetrics.SegmentSubscriptionsOpen.WithLabelValues(user, rendition).Set(float64(len(chs))) 49 - return ch 50 - } 51 - 52 - func (s *SegChanMan) UnsubscribeSegment(ctx context.Context, user string, rendition string, ch <-chan *Seg) { 53 - key := segChanKey(user, rendition) 54 - s.segChansMutex.Lock() 55 - defer s.segChansMutex.Unlock() 56 - chs, ok := s.segChans[key] 57 - if !ok { 58 - return 59 - } 60 - for i, c := range chs { 61 - if c == ch { 62 - chs = append(chs[:i], chs[i+1:]...) 63 - break 64 - } 65 - } 66 - spmetrics.SegmentSubscriptionsOpen.WithLabelValues(user, rendition).Set(float64(len(chs))) 67 - s.segChans[key] = chs 68 - } 69 - 70 - func (s *SegChanMan) PublishSegment(ctx context.Context, user string, rendition string, seg *Seg) { 71 - ctx, span := otel.Tracer("signer").Start(ctx, "PublishSegment") 72 - defer span.End() 73 - key := segChanKey(user, rendition) 74 - s.segChansMutex.Lock() 75 - defer s.segChansMutex.Unlock() 76 - chs, ok := s.segChans[key] 77 - if !ok { 78 - return 79 - } 80 - for _, ch := range chs { 81 - go func(ch chan *Seg) { 82 - select { 83 - case ch <- seg: 84 - case <-ctx.Done(): 85 - return 86 - case <-time.After(1 * time.Minute): 87 - log.Warn(ctx, "failed to send segment to channel, timing out", "user", user, "rendition", rendition) 88 - } 89 - 90 - }(ch) 91 - } 92 - }
+1 -1
pkg/media/segmenter_hls.go
··· 26 26 return fmt.Errorf("error creating ToHLS pipeline: %w", err) 27 27 } 28 28 29 - outputQueue, done, err := ConcatStream(ctx, pipeline, user, rendition, mm) 29 + outputQueue, done, err := ConcatStream(ctx, pipeline, user, rendition, mm.bus) 30 30 if err != nil { 31 31 return fmt.Errorf("failed to get output queue: %w", err) 32 32 }
-6
pkg/media/validate.go
··· 12 12 "stream.place/streamplace/pkg/constants" 13 13 "stream.place/streamplace/pkg/crypto/signers" 14 14 "stream.place/streamplace/pkg/log" 15 - "stream.place/streamplace/pkg/media/segchanman" 16 15 "stream.place/streamplace/pkg/model" 17 16 18 17 "git.stream.place/streamplace/c2pa-go/pkg/c2pa" ··· 80 79 if _, err := io.Copy(fd, r); err != nil { 81 80 return err 82 81 } 83 - scmSeg := &segchanman.Seg{ 84 - Filepath: fd.Name(), 85 - Data: buf, 86 - } 87 - go mm.PublishSegment(ctx, repoDID, "source", scmSeg) 88 82 seg := &model.Segment{ 89 83 ID: *mani.Label, 90 84 SigningKeyDID: signingKeyDID,
+5 -5
pkg/media/webrtc_playback.go
··· 11 11 "github.com/google/uuid" 12 12 "github.com/pion/webrtc/v4" 13 13 "github.com/pion/webrtc/v4/pkg/media" 14 + "stream.place/streamplace/pkg/bus" 14 15 "stream.place/streamplace/pkg/log" 15 - "stream.place/streamplace/pkg/media/segchanman" 16 16 "stream.place/streamplace/pkg/spmetrics" 17 17 ) 18 18 ··· 41 41 return nil, fmt.Errorf("failed to create GStreamer pipeline: %w", err) //nolint:all 42 42 } 43 43 44 - segBuffer := make(chan *segchanman.Seg, 1024) 44 + segBuffer := make(chan *bus.Seg, 1024) 45 45 go func() { 46 - ch := mm.SubscribeSegment(ctx, user, rendition) 47 - defer mm.UnsubscribeSegment(ctx, user, rendition, ch) 46 + ch := mm.bus.SubscribeSegment(ctx, user, rendition) 47 + defer mm.bus.UnsubscribeSegment(ctx, user, rendition, ch) 48 48 for { 49 49 select { 50 50 case <-ctx.Done(): ··· 57 57 } 58 58 }() 59 59 60 - segCh := make(chan *segchanman.Seg) 60 + segCh := make(chan *bus.Seg) 61 61 go func() { 62 62 for { 63 63 select {
+5 -4
pkg/media/webrtc_playback2.go
··· 9 9 "github.com/pion/webrtc/v4" 10 10 "github.com/pion/webrtc/v4/pkg/media" 11 11 "golang.org/x/sync/errgroup" 12 + "stream.place/streamplace/pkg/bus" 12 13 "stream.place/streamplace/pkg/log" 13 14 "stream.place/streamplace/pkg/spmetrics" 14 15 ) ··· 91 92 } 92 93 }() 93 94 94 - packetQueue := make(chan *PacketizedSegment, 1024) 95 + packetQueue := make(chan *bus.PacketizedSegment, 1024) 95 96 go func() { 96 - ch := mm.SubscribeSegment(ctx, user, rendition) 97 - defer mm.UnsubscribeSegment(ctx, user, rendition, ch) 97 + ch := mm.bus.SubscribeSegment(ctx, user, rendition) 98 + defer mm.bus.UnsubscribeSegment(ctx, user, rendition, ch) 98 99 for { 99 100 select { 100 101 case <-ctx.Done(): ··· 123 124 124 125 p1 := <-packetQueue 125 126 p2 := <-packetQueue 126 - bufPacketQueue := make(chan *PacketizedSegment, 1024) 127 + bufPacketQueue := make(chan *bus.PacketizedSegment, 1024) 127 128 go func() { 128 129 bufPacketQueue <- p1 129 130 bufPacketQueue <- p2