Live video on the AT Protocol
at eli/rtmprec 272 lines 7.9 kB view raw
1package media 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "io" 8 "os" 9 "strings" 10 "time" 11 12 "github.com/go-gst/go-gst/gst" 13 "github.com/go-gst/go-gst/gst/app" 14 "stream.place/streamplace/pkg/config" 15 "stream.place/streamplace/pkg/log" 16) 17 18// For testing. Normally, We don't want to stop the pipeline upon a 19// segmentation error because we want to keep the stream alive. Lots 20// of weird invalid data coming in from WebRTC connections on phones. 21// Better we drop one weird segment than force the stream to restart. 22// But for tests, we want (sometimes) to know if there's a problem. 23var FatalSegmentationErrors = false 24 25// element that takes the input stream, muxes to mp4, and signs the result 26func SegmentElem(ctx context.Context, cli *config.CLI, streamer string, doH264Parse bool, cb func(ctx context.Context, buf []byte, now int64) error) (*gst.Element, error) { 27 // elem, err := gst.NewElement("splitmuxsink name=splitter async-finalize=true sink-factory=appsink muxer-factory=matroskamux max-size-bytes=1") 28 elem, err := gst.NewElementWithProperties("splitmuxsink", map[string]any{ 29 "name": "signer", 30 "async-finalize": true, 31 "sink-factory": "appsink", 32 "muxer-factory": "mp4mux", 33 "max-size-bytes": 1, 34 }) 35 if err != nil { 36 return nil, err 37 } 38 39 p := elem.GetRequestPad("video") 40 if p == nil { 41 return nil, fmt.Errorf("failed to get video pad") 42 } 43 p = elem.GetRequestPad("audio_%u") 44 if p == nil { 45 return nil, fmt.Errorf("failed to get audio pad") 46 } 47 48 resetTimer := make(chan struct{}) 49 50 go func() { 51 for { 52 select { 53 case <-ctx.Done(): 54 return 55 case <-resetTimer: 56 continue 57 case <-time.After(time.Second * 30): 58 log.Warn(ctx, "no new segment for 30 seconds") 59 elem.ErrorMessage(gst.DomainCore, gst.CoreErrorFailed, "No new segment for 30 seconds", "No new segment for 30 seconds (debug)") 60 return 61 } 62 } 63 }() 64 65 // we didn't need faststart but i'm leaving this commented here in case 66 // you want to change any other muxer properties in the future 67 68 _, err = elem.Connect("muxer-added", func(split, muxEle *gst.Element) { 69 err := muxEle.SetProperty("presentation-time", false) 70 if err != nil { 71 panic("error setting presentation-time to false: " + err.Error()) 72 } 73 err = muxEle.SetProperty("interleave-bytes", InterleaveBytes) 74 if err != nil { 75 panic("error setting interleave-bytes" + err.Error()) 76 } 77 err = muxEle.SetProperty("interleave-time", InterleaveTime) 78 if err != nil { 79 panic("error setting interleave-time" + err.Error()) 80 } 81 err = muxEle.SetProperty("faststart", true) 82 if err != nil { 83 panic("error setting faststart" + err.Error()) 84 } 85 err = muxEle.SetProperty("movie-timescale", uint(60000)) 86 if err != nil { 87 panic("error setting movie-timescale" + err.Error()) 88 } 89 err = muxEle.SetProperty("trak-timescale", uint(60000)) 90 if err != nil { 91 panic("error setting trak-timescale" + err.Error()) 92 } 93 }) 94 if err != nil { 95 return nil, fmt.Errorf("failed to connect muxer-added handler: %w", err) 96 } 97 98 // channel to make sure data is emitted in order 99 var ch chan struct{} 100 101 _, err = elem.Connect("sink-added", func(split, sinkEle *gst.Element) { 102 previousSegCh := ch 103 mySegCh := make(chan struct{}, 1) 104 ch = mySegCh 105 buf := &bytes.Buffer{} 106 err := sinkEle.SetProperty("sync", false) 107 if err != nil { 108 panic("error setting sync to false: " + err.Error()) 109 } 110 appsink := app.SinkFromElement(sinkEle) 111 if appsink == nil { 112 panic("appsink should not be nil") 113 } 114 115 appsink.SetCallbacks(&app.SinkCallbacks{ 116 NewSampleFunc: WriterNewSample(ctx, buf), 117 EOSFunc: func(sink *app.Sink) { 118 // ctx, span := otel.Tracer("signer").Start(ctx, "SegmentAndSignElem", trace.WithAttributes( 119 // attribute.String("streamer", ms.Streamer()), 120 // )) 121 // defer span.End() 122 now := time.Now().UnixMilli() 123 resetTimer <- struct{}{} 124 bs := buf.Bytes() 125 126 if previousSegCh != nil { 127 <-previousSegCh 128 } 129 err := func() error { 130 bs, err := ConvergeSegment(ctx, cli, bs, now, streamer, doH264Parse) 131 if err != nil { 132 return fmt.Errorf("error converging segment: %w", err) 133 } 134 log.Debug(ctx, "signing segment", "size", len(bs)) 135 err = cb(ctx, bs, now) 136 if err != nil { 137 return fmt.Errorf("error signing segment: %w", err) 138 } 139 return nil 140 }() 141 close(mySegCh) 142 if err != nil { 143 log.Error(ctx, "error in segmenter", "error", err) 144 if FatalSegmentationErrors { 145 sink.ErrorMessage(gst.DomainCore, gst.CoreErrorFailed, "error in segmenter", err.Error()) 146 return 147 } 148 } 149 }, 150 }) 151 }) 152 if err != nil { 153 return nil, fmt.Errorf("failed to connect sink-added handler: %w", err) 154 } 155 156 return elem, nil 157} 158 159func (mm *MediaManager) SegmentAndSignElem(ctx context.Context, ms MediaSigner) (*gst.Element, error) { 160 return SegmentElem(ctx, mm.cli, ms.Streamer(), false, func(ctx context.Context, bs []byte, now int64) error { 161 if mm.cli.SmearAudio { 162 smearedBuf := &bytes.Buffer{} 163 err := RewriteAudioTimestamps(ctx, mm.cli, bytes.NewReader(bs), smearedBuf, true) 164 if err != nil { 165 return fmt.Errorf("error smearing audio timestamps: %w", err) 166 } 167 bs = smearedBuf.Bytes() 168 } 169 signedBs, err := ms.SignMP4(ctx, bytes.NewReader(bs), now) 170 if err != nil { 171 return fmt.Errorf("error calling SignMP4: %w", err) 172 } 173 log.Debug(ctx, "signed segment", "size", len(signedBs)) 174 err = mm.ValidateMP4(ctx, bytes.NewReader(signedBs), true) 175 if err != nil { 176 mm.cli.DumpDebugSegment(ctx, "just-signed-segment.mp4", bytes.NewReader(signedBs)) 177 return fmt.Errorf("error validating just-signed segment: %w", err) 178 } 179 return nil 180 }) 181} 182 183func SegmentFileUnsigned(ctx context.Context, cli *config.CLI, streamer string, input string, ch chan *SplitSegment) error { 184 fd, err := os.OpenFile(input, os.O_RDONLY, 0644) 185 log.Log(ctx, "reading file", "file", input) 186 if err != nil { 187 return fmt.Errorf("failed to read file: %w", err) 188 } 189 defer fd.Close() 190 return SegmentUnsigned(ctx, cli, streamer, fd, false, ch) 191} 192 193func SegmentUnsigned(ctx context.Context, cli *config.CLI, streamer string, input io.Reader, doH264Parse bool, ch chan *SplitSegment) error { 194 ctx, cancel := context.WithCancel(ctx) 195 defer cancel() 196 pipelineSlice := []string{ 197 "appsrc name=appsrc ! qtdemux name=demux", 198 "demux. ! queue ! h264parse name=videoparse disable-passthrough=true config-interval=0", 199 "demux. ! queue ! opusparse name=audioparse", 200 } 201 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n")) 202 if err != nil { 203 return fmt.Errorf("error creating MKVIngest pipeline: %w", err) 204 } 205 206 srcele, err := pipeline.GetElementByName("appsrc") 207 if err != nil { 208 return err 209 } 210 src := app.SrcFromElement(srcele) 211 src.SetCallbacks(&app.SourceCallbacks{ 212 NeedDataFunc: ReaderNeedDataIncremental(ctx, input), 213 }) 214 videoParseEle, err := pipeline.GetElementByName("videoparse") 215 if err != nil { 216 return err 217 } 218 219 segmenter, err := SegmentElem(ctx, cli, streamer, doH264Parse, func(ctx context.Context, buf []byte, now int64) error { 220 ch <- &SplitSegment{ 221 Filename: fmt.Sprintf("%d.mp4", now), 222 Data: buf, 223 } 224 return nil 225 }) 226 if err != nil { 227 return err 228 } 229 230 err = pipeline.Add(segmenter) 231 if err != nil { 232 return err 233 } 234 err = videoParseEle.Link(segmenter) 235 if err != nil { 236 return err 237 } 238 audioparse, err := pipeline.GetElementByName("audioparse") 239 if err != nil { 240 return err 241 } 242 err = audioparse.Link(segmenter) 243 if err != nil { 244 return err 245 } 246 247 busErr := make(chan error) 248 go func() { 249 err := HandleBusMessages(ctx, pipeline) 250 cancel() 251 busErr <- err 252 }() 253 254 err = pipeline.SetState(gst.StatePlaying) 255 if err != nil { 256 return err 257 } 258 259 defer func() { 260 err := pipeline.SetState(gst.StateNull) 261 if err != nil { 262 log.Error(ctx, "error setting pipeline to null state", "error", err) 263 } 264 }() 265 266 err = <-busErr 267 if err != nil { 268 return err 269 } 270 271 return nil 272}