Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

Merge pull request #319 from streamplace/eli/new-webrtc-fixes

media: fix some new webrtc caching

authored by

Eli Mallon and committed by
GitHub
efaace9f b1a5bbf3

+37 -41
+11
pkg/bus/segchanman.go
··· 107 107 } 108 108 curBuf = append(curBuf, seg) 109 109 if len(curBuf) > bufSize { 110 + log.Warn(ctx, "segment buffer is too large, dropping oldest segment", "user", user, "rendition", rendition, "bufSize", bufSize, "len", len(curBuf)) 110 111 curBuf = curBuf[1:] 111 112 } 112 113 b.segBuf[key] = curBuf ··· 127 128 }(ch) 128 129 } 129 130 } 131 + 132 + func (b *Bus) EndSession(ctx context.Context, user string, rendition string) { 133 + b.segChansMutex.Lock() 134 + defer b.segChansMutex.Unlock() 135 + b.segBufMutex.Lock() 136 + defer b.segBufMutex.Unlock() 137 + 138 + key := segChanKey(user, rendition) 139 + delete(b.segBuf, key) 140 + }
+3
pkg/director/stream_session.go
··· 108 108 // case <-time.After(time.Minute * 1): 109 109 case <-time.After(time.Second * 60): 110 110 log.Log(ctx, "no new segments for 1 minute, shutting down") 111 + for _, r := range allRenditions { 112 + ss.bus.EndSession(ctx, spseg.Creator, r.Name) 113 + } 111 114 cancel() 112 115 } 113 116 }
+8 -16
pkg/media/packetize.go
··· 98 98 return gst.FlowError 99 99 } 100 100 101 - samples := buffer.Map(gst.MapRead).Bytes() 102 - defer buffer.Unmap() 101 + samples := buffer.Bytes() 103 102 104 103 videoOutput = append(videoOutput, samples) 105 104 ··· 115 114 }, 116 115 EOSFunc: func(sink *app.Sink) { 117 116 log.Debug(ctx, "videoappsink EOSFunc") 118 - // go func() { 119 - // eosCh <- struct{}{} 120 - // }() 121 117 }, 122 118 }) 123 119 ··· 128 124 NewSampleFunc: func(sink *app.Sink) gst.FlowReturn { 129 125 sample := sink.PullSample() 130 126 if sample == nil { 127 + log.Warn(ctx, "audioappsink NewSampleFunc EOS") 131 128 return gst.FlowEOS 132 129 } 133 130 ··· 136 133 return gst.FlowError 137 134 } 138 135 139 - samples := buffer.Map(gst.MapRead).Bytes() 140 - defer buffer.Unmap() 136 + samples := buffer.Bytes() 141 137 142 138 audioOutput = append(audioOutput, samples) 143 139 ··· 146 142 if dur != nil { 147 143 segDur += *dur 148 144 } else { 149 - log.Log(ctx, "no audio duration", "samples", len(samples)) 150 - err := fmt.Errorf("no audio duration") 151 - pipeline.Error(err.Error(), err) 145 + log.Error(ctx, "no audio duration", "samples", len(samples)) 152 146 return gst.FlowError 153 147 } 154 148 ··· 156 150 }, 157 151 EOSFunc: func(sink *app.Sink) { 158 152 log.Debug(ctx, "audioappsink EOSFunc") 159 - // go func() { 160 - // eosCh <- struct{}{} 161 - // }() 162 153 }, 163 154 }) 164 155 ··· 181 172 if err != nil { 182 173 log.Error(ctx, "failed to set pipeline to null state", "error", err) 183 174 } 175 + err = pipeline.Remove(demuxBin.Element) 176 + if err != nil { 177 + log.Error(ctx, "failed to remove demux bin from bin", "error", err) 178 + } 184 179 }() 185 - 186 - // <-eosCh 187 - // <-eosCh 188 180 189 181 err = <-busErr 190 182 if err != nil {
+15 -12
pkg/media/webrtc_ingest.go
··· 113 113 // Create channel that is blocked until ICE Gathering is complete 114 114 gatherComplete := rtcrec.GatheringCompletePromise(peerConnection) 115 115 116 - go func() { 117 - ticker := time.NewTicker(time.Second * 1) 118 - for { 119 - select { 120 - case <-ctx.Done(): 121 - return 122 - case <-ticker.C: 123 - state := pipeline.GetCurrentState() 124 - log.Debug(ctx, "pipeline state", "state", state) 125 - } 126 - } 127 - }() 128 116 // Setup complete! Now we boot up streaming in the background while returning the SDP offer to the user. 129 117 130 118 go func() { 131 119 ctx, cancel := context.WithCancel(ctx) 132 120 defer cancel() 133 121 defer func() { close(done) }() 122 + 123 + go func() { 124 + ticker := time.NewTicker(time.Second * 1) 125 + for { 126 + select { 127 + case <-ctx.Done(): 128 + return 129 + case <-ticker.C: 130 + state := pipeline.GetCurrentState() 131 + log.Debug(ctx, "pipeline state", "state", state) 132 + } 133 + } 134 + }() 134 135 135 136 go func() { 136 137 if err := HandleBusMessages(ctx, pipeline); err != nil { ··· 294 295 if err := videoSrcElem.SetState(gst.StateNull); err != nil { 295 296 log.Log(ctx, "failed to set videoSrcElem state to null", "error", err) 296 297 } 298 + 299 + log.Log(ctx, "webrtc ingest pipeline done") 297 300 298 301 }() 299 302 select {
-13
pkg/media/webrtc_playback2.go
··· 72 72 ctx, cancel := context.WithCancel(ctx) 73 73 defer cancel() 74 74 75 - started := time.Now() 76 - elapsed := time.Duration(0) 77 75 latency := time.Duration(0) 78 - 79 - go func() { 80 - for { 81 - select { 82 - case <-ctx.Done(): 83 - return 84 - case <-time.After(time.Second * 5): 85 - log.Log(ctx, "check elapsed", "elapsed", elapsed, "duration", time.Since(started)) 86 - } 87 - } 88 - }() 89 76 90 77 packetQueue := make(chan *bus.PacketizedSegment, 1024) 91 78 go func() {