Live video on the AT Protocol

Merge pull request #313 from streamplace/eli/fix-stuck-playback-channel-2

director: handle segments that come in after session closes

authored by Eli Mallon and committed by GitHub 79f65d8f 8025558e

+15 -4
+1
pkg/director/director.go
··· 67 segmentChan: make(chan struct{}), 68 op: d.op, 69 packets: make([]bus.PacketizedSegment, 0), 70 } 71 d.streamSessions[not.Segment.RepoDID] = ss 72 g.Go(func() error {
··· 67 segmentChan: make(chan struct{}), 68 op: d.op, 69 packets: make([]bus.PacketizedSegment, 0), 70 + started: make(chan struct{}), 71 } 72 d.streamSessions[not.Segment.RepoDID] = ss 73 g.Go(func() error {
+14 -4
pkg/director/stream_session.go
··· 39 lastStatus time.Time 40 lastStatusLock sync.Mutex 41 g *errgroup.Group 42 packets []bus.PacketizedSegment 43 } 44 ··· 47 ss.g, ctx = errgroup.WithContext(ctx) 48 sid := livepeer.RandomTrailer(8) 49 ctx = log.WithLogValues(ctx, "sid", sid) 50 log.Log(ctx, "starting stream session") 51 defer cancel() 52 spseg, err := not.Segment.ToStreamplaceSegment() ··· 94 // }) 95 // } 96 97 for { 98 select { 99 case <-ss.segmentChan: ··· 112 // non-fatal; if you actually want to melt the universe on an error you 113 // should panic() 114 func (ss *StreamSession) Go(ctx context.Context, f func() error) { 115 ss.g.Go(func() error { 116 err := f() 117 if err != nil { ··· 122 } 123 124 func (ss *StreamSession) NewSegment(ctx context.Context, not *media.NewSegmentNotification) error { 125 - if ctx.Err() != nil { 126 - return nil 127 - } 128 - ss.segmentChan <- struct{}{} 129 aqt := aqtime.FromTime(not.Segment.StartTime) 130 ctx = log.WithLogValues(ctx, "segID", not.Segment.ID, "repoDID", not.Segment.RepoDID, "timestamp", aqt.FileSafeString()) 131 err := ss.mod.CreateSegment(not.Segment)
··· 39 lastStatus time.Time 40 lastStatusLock sync.Mutex 41 g *errgroup.Group 42 + started chan struct{} 43 + ctx context.Context 44 packets []bus.PacketizedSegment 45 } 46 ··· 49 ss.g, ctx = errgroup.WithContext(ctx) 50 sid := livepeer.RandomTrailer(8) 51 ctx = log.WithLogValues(ctx, "sid", sid) 52 + ss.ctx = ctx 53 log.Log(ctx, "starting stream session") 54 defer cancel() 55 spseg, err := not.Segment.ToStreamplaceSegment() ··· 97 // }) 98 // } 99 100 + close(ss.started) 101 + 102 for { 103 select { 104 case <-ss.segmentChan: ··· 117 // non-fatal; if you actually want to melt the universe on an error you 118 // should panic() 119 func (ss *StreamSession) Go(ctx context.Context, f func() error) { 120 + <-ss.started 121 ss.g.Go(func() error { 122 err := f() 123 if err != nil { ··· 128 } 129 130 func (ss *StreamSession) NewSegment(ctx context.Context, not *media.NewSegmentNotification) error { 131 + <-ss.started 132 + go func() { 133 + select { 134 + case <-ss.ctx.Done(): 135 + return 136 + case ss.segmentChan <- struct{}{}: 137 + } 138 + }() 139 aqt := aqtime.FromTime(not.Segment.StartTime) 140 ctx = log.WithLogValues(ctx, "segID", not.Segment.ID, "repoDID", not.Segment.RepoDID, "timestamp", aqt.FileSafeString()) 141 err := ss.mod.CreateSegment(not.Segment)