Live video on the AT Protocol
at next 294 lines 8.5 kB view raw
1package media 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "io" 8 "os" 9 "strings" 10 "time" 11 12 "github.com/go-gst/go-gst/gst" 13 "github.com/go-gst/go-gst/gst/app" 14 "stream.place/streamplace/pkg/config" 15 "stream.place/streamplace/pkg/log" 16) 17 18// For testing. Normally, We don't want to stop the pipeline upon a 19// segmentation error because we want to keep the stream alive. Lots 20// of weird invalid data coming in from WebRTC connections on phones. 21// Better we drop one weird segment than force the stream to restart. 22// But for tests, we want (sometimes) to know if there's a problem. 23var FatalSegmentationErrors = false 24 25// element that takes the input stream, muxes to mp4, and signs the result 26func SegmentElem(ctx context.Context, cli *config.CLI, streamer string, doH264Parse bool, cb func(ctx context.Context, buf []byte, now int64) error) (*gst.Element, error) { 27 // elem, err := gst.NewElement("splitmuxsink name=splitter async-finalize=true sink-factory=appsink muxer-factory=matroskamux max-size-bytes=1") 28 elem, err := gst.NewElementWithProperties("splitmuxsink", map[string]any{ 29 "name": "signer", 30 "async-finalize": true, 31 "sink-factory": "appsink", 32 "muxer-factory": "mp4mux", 33 "max-size-bytes": 1, 34 }) 35 if err != nil { 36 return nil, err 37 } 38 39 p := elem.GetRequestPad("video") 40 if p == nil { 41 return nil, fmt.Errorf("failed to get video pad") 42 } 43 p = elem.GetRequestPad("audio_%u") 44 if p == nil { 45 return nil, fmt.Errorf("failed to get audio pad") 46 } 47 48 resetTimer := make(chan struct{}) 49 50 go func() { 51 for { 52 select { 53 case <-ctx.Done(): 54 return 55 case <-resetTimer: 56 continue 57 case <-time.After(time.Second * 30): 58 log.Warn(ctx, "no new segment for 30 seconds") 59 elem.ErrorMessage(gst.DomainCore, gst.CoreErrorFailed, "No new segment for 30 seconds", "No new segment for 30 seconds (debug)") 60 return 61 } 62 } 63 }() 64 65 // we didn't need faststart but i'm leaving this commented here in case 66 // you want to change any other muxer properties in the future 67 68 _, err = elem.Connect("muxer-added", func(split, muxEle *gst.Element) { 69 err := muxEle.SetProperty("presentation-time", false) 70 if err != nil { 71 panic("error setting presentation-time to false: " + err.Error()) 72 } 73 err = muxEle.SetProperty("interleave-bytes", InterleaveBytes) 74 if err != nil { 75 panic("error setting interleave-bytes" + err.Error()) 76 } 77 err = muxEle.SetProperty("interleave-time", InterleaveTime) 78 if err != nil { 79 panic("error setting interleave-time" + err.Error()) 80 } 81 err = muxEle.SetProperty("faststart", true) 82 if err != nil { 83 panic("error setting faststart" + err.Error()) 84 } 85 err = muxEle.SetProperty("movie-timescale", uint(60000)) 86 if err != nil { 87 panic("error setting movie-timescale" + err.Error()) 88 } 89 err = muxEle.SetProperty("trak-timescale", uint(60000)) 90 if err != nil { 91 panic("error setting trak-timescale" + err.Error()) 92 } 93 }) 94 if err != nil { 95 return nil, fmt.Errorf("failed to connect muxer-added handler: %w", err) 96 } 97 98 // channel to make sure data is emitted in order 99 var ch chan struct{} 100 101 _, err = elem.Connect("sink-added", func(split, sinkEle *gst.Element) { 102 previousSegCh := ch 103 mySegCh := make(chan struct{}, 1) 104 ch = mySegCh 105 buf := &bytes.Buffer{} 106 err := sinkEle.SetProperty("sync", false) 107 if err != nil { 108 panic("error setting sync to false: " + err.Error()) 109 } 110 appsink := app.SinkFromElement(sinkEle) 111 if appsink == nil { 112 panic("appsink should not be nil") 113 } 114 115 appsink.SetCallbacks(&app.SinkCallbacks{ 116 NewSampleFunc: WriterNewSample(ctx, buf), 117 EOSFunc: func(sink *app.Sink) { 118 // ctx, span := otel.Tracer("signer").Start(ctx, "SegmentAndSignElem", trace.WithAttributes( 119 // attribute.String("streamer", ms.Streamer()), 120 // )) 121 // defer span.End() 122 now := time.Now().UnixMilli() 123 bs := buf.Bytes() 124 125 if previousSegCh != nil { 126 <-previousSegCh 127 } 128 resetTimer <- struct{}{} 129 convergeAndSign := func() error { 130 convergedBs, err := ConvergeSegment(ctx, cli, bs, now, streamer, doH264Parse) 131 if err != nil { 132 log.Error(ctx, "error converging segment", "error", err) 133 } else { 134 bs = convergedBs 135 } 136 log.Debug(ctx, "signing segment", "size", len(bs)) 137 err = cb(ctx, bs, now) 138 if err != nil { 139 return fmt.Errorf("error signing segment: %w", err) 140 } 141 return nil 142 } 143 err := func() error { 144 convergeDone := make(chan error) 145 go func() { 146 convergeDone <- convergeAndSign() 147 }() 148 select { 149 case <-ctx.Done(): 150 return ctx.Err() 151 case err := <-convergeDone: 152 return err 153 case <-time.After(time.Second * 3): 154 go func() { 155 err = cli.DataFileWrite([]string{"debug-recordings", streamer, fmt.Sprintf("converge-timeout-%d.mp4", now)}, bytes.NewReader(bs), true) 156 if err != nil { 157 log.Error(ctx, "error writing debug recording", "error", err) 158 } 159 }() 160 return fmt.Errorf("timeout converging segment") 161 } 162 }() 163 close(mySegCh) 164 if err != nil { 165 log.Error(ctx, "error in segmenter", "error", err) 166 if FatalSegmentationErrors { 167 sink.ErrorMessage(gst.DomainCore, gst.CoreErrorFailed, "error in segmenter", err.Error()) 168 return 169 } 170 } 171 }, 172 }) 173 }) 174 if err != nil { 175 return nil, fmt.Errorf("failed to connect sink-added handler: %w", err) 176 } 177 178 return elem, nil 179} 180 181func (mm *MediaManager) SegmentAndSignElem(ctx context.Context, ms MediaSigner) (*gst.Element, error) { 182 return SegmentElem(ctx, mm.cli, ms.Streamer(), false, func(ctx context.Context, bs []byte, now int64) error { 183 if mm.cli.SmearAudio { 184 smearedBuf := &bytes.Buffer{} 185 err := RewriteAudioTimestamps(ctx, mm.cli, bytes.NewReader(bs), smearedBuf, true) 186 if err != nil { 187 return fmt.Errorf("error smearing audio timestamps: %w", err) 188 } 189 bs = smearedBuf.Bytes() 190 } 191 signedBs, err := ms.SignMP4(ctx, bytes.NewReader(bs), now) 192 if err != nil { 193 return fmt.Errorf("error calling SignMP4: %w", err) 194 } 195 log.Debug(ctx, "signed segment", "size", len(signedBs)) 196 err = mm.ValidateMP4(ctx, bytes.NewReader(signedBs), true) 197 if err != nil { 198 mm.cli.DumpDebugSegment(ctx, "just-signed-segment.mp4", bytes.NewReader(signedBs)) 199 return fmt.Errorf("error validating just-signed segment: %w", err) 200 } 201 return nil 202 }) 203} 204 205func SegmentFileUnsigned(ctx context.Context, cli *config.CLI, streamer string, input string, ch chan *SplitSegment) error { 206 fd, err := os.OpenFile(input, os.O_RDONLY, 0644) 207 log.Log(ctx, "reading file", "file", input) 208 if err != nil { 209 return fmt.Errorf("failed to read file: %w", err) 210 } 211 defer fd.Close() 212 return SegmentUnsigned(ctx, cli, streamer, fd, false, ch) 213} 214 215func SegmentUnsigned(ctx context.Context, cli *config.CLI, streamer string, input io.Reader, doH264Parse bool, ch chan *SplitSegment) error { 216 ctx, cancel := context.WithCancel(ctx) 217 defer cancel() 218 pipelineSlice := []string{ 219 "appsrc name=appsrc ! qtdemux name=demux", 220 "demux. ! queue ! h264parse name=videoparse disable-passthrough=true config-interval=0", 221 "demux. ! queue ! opusparse name=audioparse", 222 } 223 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n")) 224 if err != nil { 225 return fmt.Errorf("error creating MKVIngest pipeline: %w", err) 226 } 227 228 srcele, err := pipeline.GetElementByName("appsrc") 229 if err != nil { 230 return err 231 } 232 src := app.SrcFromElement(srcele) 233 src.SetCallbacks(&app.SourceCallbacks{ 234 NeedDataFunc: ReaderNeedDataIncremental(ctx, input), 235 }) 236 videoParseEle, err := pipeline.GetElementByName("videoparse") 237 if err != nil { 238 return err 239 } 240 241 segmenter, err := SegmentElem(ctx, cli, streamer, doH264Parse, func(ctx context.Context, buf []byte, now int64) error { 242 ch <- &SplitSegment{ 243 Filename: fmt.Sprintf("%d.mp4", now), 244 Data: buf, 245 } 246 return nil 247 }) 248 if err != nil { 249 return err 250 } 251 252 err = pipeline.Add(segmenter) 253 if err != nil { 254 return err 255 } 256 err = videoParseEle.Link(segmenter) 257 if err != nil { 258 return err 259 } 260 audioparse, err := pipeline.GetElementByName("audioparse") 261 if err != nil { 262 return err 263 } 264 err = audioparse.Link(segmenter) 265 if err != nil { 266 return err 267 } 268 269 busErr := make(chan error) 270 go func() { 271 err := HandleBusMessages(ctx, pipeline) 272 cancel() 273 busErr <- err 274 }() 275 276 err = pipeline.SetState(gst.StatePlaying) 277 if err != nil { 278 return err 279 } 280 281 defer func() { 282 err := pipeline.SetState(gst.StateNull) 283 if err != nil { 284 log.Error(ctx, "error setting pipeline to null state", "error", err) 285 } 286 }() 287 288 err = <-busErr 289 if err != nil { 290 return err 291 } 292 293 return nil 294}