Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at v0.9.8 294 lines 8.5 kB view raw
1package media 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "io" 8 "os" 9 "strings" 10 "time" 11 12 "github.com/go-gst/go-gst/gst" 13 "github.com/go-gst/go-gst/gst/app" 14 "stream.place/streamplace/pkg/config" 15 "stream.place/streamplace/pkg/log" 16) 17 18// For testing. Normally, We don't want to stop the pipeline upon a 19// segmentation error because we want to keep the stream alive. Lots 20// of weird invalid data coming in from WebRTC connections on phones. 21// Better we drop one weird segment than force the stream to restart. 22// But for tests, we want (sometimes) to know if there's a problem. 23var FatalSegmentationErrors = false 24 25// element that takes the input stream, muxes to mp4, and signs the result 26func SegmentElem(ctx context.Context, cli *config.CLI, streamer string, doH264Parse bool, cb func(ctx context.Context, buf []byte, now int64) error) (*gst.Element, error) { 27 // elem, err := gst.NewElement("splitmuxsink name=splitter async-finalize=true sink-factory=appsink muxer-factory=matroskamux max-size-bytes=1") 28 elem, err := gst.NewElementWithProperties("splitmuxsink", map[string]any{ 29 "name": "signer", 30 "async-finalize": true, 31 "sink-factory": "appsink", 32 "muxer-factory": "mp4mux", 33 "max-size-bytes": 1, 34 }) 35 if err != nil { 36 return nil, err 37 } 38 39 p := elem.GetRequestPad("video") 40 if p == nil { 41 return nil, fmt.Errorf("failed to get video pad") 42 } 43 p = elem.GetRequestPad("audio_%u") 44 if p == nil { 45 return nil, fmt.Errorf("failed to get audio pad") 46 } 47 48 resetTimer := make(chan struct{}) 49 50 go func() { 51 for { 52 select { 53 case <-ctx.Done(): 54 return 55 case <-resetTimer: 56 continue 57 case <-time.After(time.Second * 30): 58 log.Warn(ctx, "no new segment for 30 seconds") 59 elem.ErrorMessage(gst.DomainCore, gst.CoreErrorFailed, "No new segment for 30 seconds", "No new segment for 30 seconds (debug)") 60 return 61 } 62 } 63 }() 64 65 // we didn't need faststart but i'm leaving this commented here in case 66 // you want to change any other muxer properties in the future 67 68 _, err = elem.Connect("muxer-added", func(split, muxEle *gst.Element) { 69 err := muxEle.SetProperty("presentation-time", false) 70 if err != nil { 71 panic("error setting presentation-time to false: " + err.Error()) 72 } 73 err = muxEle.SetProperty("interleave-bytes", InterleaveBytes) 74 if err != nil { 75 panic("error setting interleave-bytes" + err.Error()) 76 } 77 err = muxEle.SetProperty("interleave-time", InterleaveTime) 78 if err != nil { 79 panic("error setting interleave-time" + err.Error()) 80 } 81 err = muxEle.SetProperty("faststart", true) 82 if err != nil { 83 panic("error setting faststart" + err.Error()) 84 } 85 err = muxEle.SetProperty("movie-timescale", uint(60000)) 86 if err != nil { 87 panic("error setting movie-timescale" + err.Error()) 88 } 89 err = muxEle.SetProperty("trak-timescale", uint(60000)) 90 if err != nil { 91 panic("error setting trak-timescale" + err.Error()) 92 } 93 }) 94 if err != nil { 95 return nil, fmt.Errorf("failed to connect muxer-added handler: %w", err) 96 } 97 98 // channel to make sure data is emitted in order 99 var ch chan struct{} 100 101 _, err = elem.Connect("sink-added", func(split, sinkEle *gst.Element) { 102 previousSegCh := ch 103 mySegCh := make(chan struct{}, 1) 104 ch = mySegCh 105 buf := &bytes.Buffer{} 106 err := sinkEle.SetProperty("sync", false) 107 if err != nil { 108 panic("error setting sync to false: " + err.Error()) 109 } 110 appsink := app.SinkFromElement(sinkEle) 111 if appsink == nil { 112 panic("appsink should not be nil") 113 } 114 115 appsink.SetCallbacks(&app.SinkCallbacks{ 116 NewSampleFunc: WriterNewSample(ctx, buf), 117 EOSFunc: func(sink *app.Sink) { 118 // ctx, span := otel.Tracer("signer").Start(ctx, "SegmentAndSignElem", trace.WithAttributes( 119 // attribute.String("streamer", ms.Streamer()), 120 // )) 121 // defer span.End() 122 now := time.Now().UnixMilli() 123 bs := buf.Bytes() 124 125 if previousSegCh != nil { 126 <-previousSegCh 127 } 128 resetTimer <- struct{}{} 129 convergeAndSign := func() error { 130 convergedBs, err := ConvergeSegment(ctx, cli, bs, now, streamer, doH264Parse) 131 if err != nil { 132 log.Error(ctx, "error converging segment", "error", err) 133 } else { 134 bs = convergedBs 135 } 136 log.Debug(ctx, "signing segment", "size", len(bs)) 137 err = cb(ctx, bs, now) 138 if err != nil { 139 return fmt.Errorf("error signing segment: %w", err) 140 } 141 return nil 142 } 143 err := func() error { 144 convergeDone := make(chan error) 145 go func() { 146 convergeDone <- convergeAndSign() 147 }() 148 select { 149 case <-ctx.Done(): 150 return ctx.Err() 151 case err := <-convergeDone: 152 return err 153 case <-time.After(time.Second * 3): 154 go func() { 155 err = cli.DataFileWrite([]string{"debug-recordings", streamer, fmt.Sprintf("converge-timeout-%d.mp4", now)}, bytes.NewReader(bs), true) 156 if err != nil { 157 log.Error(ctx, "error writing debug recording", "error", err) 158 } 159 }() 160 return fmt.Errorf("timeout converging segment") 161 } 162 }() 163 close(mySegCh) 164 if err != nil { 165 log.Error(ctx, "error in segmenter", "error", err) 166 if FatalSegmentationErrors { 167 sink.ErrorMessage(gst.DomainCore, gst.CoreErrorFailed, "error in segmenter", err.Error()) 168 return 169 } 170 } 171 }, 172 }) 173 }) 174 if err != nil { 175 return nil, fmt.Errorf("failed to connect sink-added handler: %w", err) 176 } 177 178 return elem, nil 179} 180 181func (mm *MediaManager) SegmentAndSignElem(ctx context.Context, ms MediaSigner) (*gst.Element, error) { 182 return SegmentElem(ctx, mm.cli, ms.Streamer(), false, func(ctx context.Context, bs []byte, now int64) error { 183 if mm.cli.SmearAudio { 184 smearedBuf := &bytes.Buffer{} 185 err := RewriteAudioTimestamps(ctx, mm.cli, bytes.NewReader(bs), smearedBuf, true) 186 if err != nil { 187 return fmt.Errorf("error smearing audio timestamps: %w", err) 188 } 189 bs = smearedBuf.Bytes() 190 } 191 signedBs, err := ms.SignMP4(ctx, bytes.NewReader(bs), now) 192 if err != nil { 193 return fmt.Errorf("error calling SignMP4: %w", err) 194 } 195 log.Debug(ctx, "signed segment", "size", len(signedBs)) 196 err = mm.ValidateMP4(ctx, bytes.NewReader(signedBs), true) 197 if err != nil { 198 mm.cli.DumpDebugSegment(ctx, "just-signed-segment.mp4", bytes.NewReader(signedBs)) 199 return fmt.Errorf("error validating just-signed segment: %w", err) 200 } 201 return nil 202 }) 203} 204 205func SegmentFileUnsigned(ctx context.Context, cli *config.CLI, streamer string, input string, ch chan *SplitSegment) error { 206 fd, err := os.OpenFile(input, os.O_RDONLY, 0644) 207 log.Log(ctx, "reading file", "file", input) 208 if err != nil { 209 return fmt.Errorf("failed to read file: %w", err) 210 } 211 defer fd.Close() 212 return SegmentUnsigned(ctx, cli, streamer, fd, false, ch) 213} 214 215func SegmentUnsigned(ctx context.Context, cli *config.CLI, streamer string, input io.Reader, doH264Parse bool, ch chan *SplitSegment) error { 216 ctx, cancel := context.WithCancel(ctx) 217 defer cancel() 218 pipelineSlice := []string{ 219 "appsrc name=appsrc ! qtdemux name=demux", 220 "demux. ! queue ! h264parse name=videoparse disable-passthrough=true config-interval=0", 221 "demux. ! queue ! opusparse name=audioparse", 222 } 223 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n")) 224 if err != nil { 225 return fmt.Errorf("error creating MKVIngest pipeline: %w", err) 226 } 227 228 srcele, err := pipeline.GetElementByName("appsrc") 229 if err != nil { 230 return err 231 } 232 src := app.SrcFromElement(srcele) 233 src.SetCallbacks(&app.SourceCallbacks{ 234 NeedDataFunc: ReaderNeedDataIncremental(ctx, input), 235 }) 236 videoParseEle, err := pipeline.GetElementByName("videoparse") 237 if err != nil { 238 return err 239 } 240 241 segmenter, err := SegmentElem(ctx, cli, streamer, doH264Parse, func(ctx context.Context, buf []byte, now int64) error { 242 ch <- &SplitSegment{ 243 Filename: fmt.Sprintf("%d.mp4", now), 244 Data: buf, 245 } 246 return nil 247 }) 248 if err != nil { 249 return err 250 } 251 252 err = pipeline.Add(segmenter) 253 if err != nil { 254 return err 255 } 256 err = videoParseEle.Link(segmenter) 257 if err != nil { 258 return err 259 } 260 audioparse, err := pipeline.GetElementByName("audioparse") 261 if err != nil { 262 return err 263 } 264 err = audioparse.Link(segmenter) 265 if err != nil { 266 return err 267 } 268 269 busErr := make(chan error) 270 go func() { 271 err := HandleBusMessages(ctx, pipeline) 272 cancel() 273 busErr <- err 274 }() 275 276 err = pipeline.SetState(gst.StatePlaying) 277 if err != nil { 278 return err 279 } 280 281 defer func() { 282 err := pipeline.SetState(gst.StateNull) 283 if err != nil { 284 log.Error(ctx, "error setting pipeline to null state", "error", err) 285 } 286 }() 287 288 err = <-busErr 289 if err != nil { 290 return err 291 } 292 293 return nil 294}