Live video on the AT Protocol
at eli/bump-xcode 392 lines 11 kB view raw
1package media 2 3import ( 4 "bytes" 5 "context" 6 "errors" 7 "fmt" 8 "io" 9 "strings" 10 "sync" 11 12 "github.com/go-gst/go-gst/gst" 13 "github.com/go-gst/go-gst/gst/app" 14 "stream.place/streamplace/pkg/bus" 15 "stream.place/streamplace/pkg/log" 16) 17 18type ConcatStreamer interface { 19 SubscribeSegment(ctx context.Context, user string, rendition string) *bus.SegChan 20 UnsubscribeSegment(ctx context.Context, user string, rendition string, ch *bus.SegChan) 21} 22 23// This function remains in scope for the duration of a single users' playback 24func ConcatStream(ctx context.Context, pipeline *gst.Pipeline, user string, rendition string, streamer ConcatStreamer) (*gst.Element, <-chan struct{}, error) { 25 ctx = log.WithLogValues(ctx, "func", "ConcatStream") 26 ctx, cancel := context.WithCancel(ctx) 27 defer cancel() 28 29 // make 1000000000000 elements! 30 31 // input multiqueue 32 inputQueue, err := gst.NewElementWithProperties("multiqueue", map[string]any{}) 33 if err != nil { 34 return nil, nil, fmt.Errorf("failed to create multiqueue element: %w", err) 35 } 36 err = pipeline.Add(inputQueue) 37 if err != nil { 38 return nil, nil, fmt.Errorf("failed to add input multiqueue to pipeline: %w", err) 39 } 40 inputQueuePadVideoSink := inputQueue.GetRequestPad("sink_%u") 41 if inputQueuePadVideoSink == nil { 42 return nil, nil, fmt.Errorf("failed to get input queue video sink pad") 43 } 44 inputQueuePadAudioSink := inputQueue.GetRequestPad("sink_%u") 45 if inputQueuePadAudioSink == nil { 46 return nil, nil, fmt.Errorf("failed to get input queue audio sink pad") 47 } 48 inputQueuePadVideoSrc := inputQueue.GetStaticPad("src_0") 49 if inputQueuePadVideoSrc == nil { 50 return nil, nil, fmt.Errorf("failed to get input queue video src pad") 51 } 52 inputQueuePadAudioSrc := inputQueue.GetStaticPad("src_1") 53 if inputQueuePadAudioSrc == nil { 54 return nil, nil, fmt.Errorf("failed to get input queue audio src pad") 55 } 56 // streamsynchronizer 57 streamsynchronizer, err := gst.NewElementWithProperties("streamsynchronizer", map[string]any{}) 58 if err != nil { 59 return nil, nil, fmt.Errorf("failed to create streamsynchronizer element: %w", err) 60 } 61 62 err = pipeline.Add(streamsynchronizer) 63 if err != nil { 64 return nil, nil, fmt.Errorf("failed to add streamsynchronizer to pipeline: %w", err) 65 } 66 syncPadVideoSink := streamsynchronizer.GetRequestPad("sink_%u") 67 if syncPadVideoSink == nil { 68 return nil, nil, fmt.Errorf("failed to get sync video sink pad") 69 } 70 syncPadAudioSink := streamsynchronizer.GetRequestPad("sink_%u") 71 if syncPadAudioSink == nil { 72 return nil, nil, fmt.Errorf("failed to get sync audio sink pad") 73 } 74 syncPadVideoSrc := streamsynchronizer.GetStaticPad("src_0") 75 if syncPadVideoSrc == nil { 76 return nil, nil, fmt.Errorf("failed to get sync video src pad") 77 } 78 syncPadAudioSrc := streamsynchronizer.GetStaticPad("src_1") 79 if syncPadAudioSrc == nil { 80 return nil, nil, fmt.Errorf("failed to get sync audio src pad") 81 } 82 83 // output multiqueue 84 outputQueue, err := gst.NewElementWithProperties("multiqueue", map[string]any{ 85 "name": "concat-output-queue", 86 }) 87 if err != nil { 88 return nil, nil, fmt.Errorf("failed to create multiqueue element: %w", err) 89 } 90 err = pipeline.Add(outputQueue) 91 if err != nil { 92 return nil, nil, fmt.Errorf("failed to add output multiqueue to pipeline: %w", err) 93 } 94 outputQueuePadVideoSink := outputQueue.GetRequestPad("sink_%u") 95 if outputQueuePadVideoSink == nil { 96 return nil, nil, fmt.Errorf("failed to get output queue video sink pad") 97 } 98 outputQueuePadAudioSink := outputQueue.GetRequestPad("sink_%u") 99 if outputQueuePadAudioSink == nil { 100 return nil, nil, fmt.Errorf("failed to get output queue audio sink pad") 101 } 102 // linking 103 104 // input queue to streamsynchronizer 105 ret := inputQueuePadVideoSrc.Link(syncPadVideoSink) 106 if ret != gst.PadLinkOK { 107 return nil, nil, fmt.Errorf("failed to link multiqueue to streamsynchronizer: %v", ret) 108 } 109 ret = inputQueuePadAudioSrc.Link(syncPadAudioSink) 110 if ret != gst.PadLinkOK { 111 return nil, nil, fmt.Errorf("failed to link multiqueue to streamsynchronizer: %v", ret) 112 } 113 114 // streamsynchronizer to output queue 115 ret = syncPadVideoSrc.Link(outputQueuePadVideoSink) 116 if ret != gst.PadLinkOK { 117 return nil, nil, fmt.Errorf("failed to link streamsynchronizer to output queue: %v", ret) 118 } 119 ret = syncPadAudioSrc.Link(outputQueuePadAudioSink) 120 if ret != gst.PadLinkOK { 121 return nil, nil, fmt.Errorf("failed to link streamsynchronizer to output queue: %v", ret) 122 } 123 124 // ok now we can start looping over input files 125 126 // this goroutine will read all the files from the segment queue and buffer 127 // them in a pipe so that we don't miss any in between iterations of the output 128 allFiles := make(chan []byte, 1024) 129 go func() { 130 ch := streamer.SubscribeSegment(ctx, user, rendition) 131 defer streamer.UnsubscribeSegment(ctx, user, rendition, ch) 132 for { 133 select { 134 case <-ctx.Done(): 135 log.Debug(ctx, "exiting segment reader") 136 return 137 case file := <-ch.C: 138 log.Debug(ctx, "got segment", "file", file.Filepath) 139 allFiles <- file.Data 140 if len(file.Data) == 0 { 141 log.Warn(ctx, "no more segments, stopping segment reader") 142 return 143 } 144 } 145 } 146 }() 147 148 segCount := 0 149 150 // nextFile is the primary loop that pops off a file, creates new demuxer elements for it, 151 // and pushes into the pipeline 152 var nextFile func() 153 nextFile = func() { 154 mySegCount := segCount 155 segCount += 1 156 segDone := make(chan struct{}) 157 log.Debug(ctx, "moving to next file", "segCount", mySegCount) 158 pr, pw := io.Pipe() 159 go func() { 160 select { 161 case <-ctx.Done(): 162 pr.Close() 163 pw.Close() 164 return 165 case bs := <-allFiles: 166 if len(bs) == 0 { 167 log.Warn(ctx, "no more segments, ending stream") 168 pr.Close() 169 pw.Close() 170 cancel() 171 return 172 } 173 _, err = io.Copy(pw, bytes.NewReader(bs)) 174 if err != nil { 175 log.Error(ctx, "failed to copy segment file", "error", err) 176 cancel() 177 return 178 } 179 return 180 } 181 }() 182 183 demux, err := gst.NewElementWithProperties("qtdemux", map[string]any{ 184 "name": fmt.Sprintf("concat-demux-%d", mySegCount), 185 }) 186 if err != nil { 187 log.Error(ctx, "failed to create demux element", "error", err) 188 cancel() 189 return 190 } 191 192 err = pipeline.Add(demux) 193 if err != nil { 194 log.Error(ctx, "failed to add demux to pipeline", "error", err) 195 cancel() 196 return 197 } 198 199 demuxSinkPad := demux.GetStaticPad("sink") 200 if demuxSinkPad == nil { 201 log.Error(ctx, "failed to get demux sink pad") 202 cancel() 203 return 204 } 205 206 mu := sync.Mutex{} 207 count := 0 208 _, err = demux.Connect("pad-added", func(self *gst.Element, pad *gst.Pad) { 209 mu.Lock() 210 count += 1 211 mu.Unlock() 212 log.Debug(ctx, "demux pad-added", "name", pad.GetName(), "direction", pad.GetDirection()) 213 var downstreamPad *gst.Pad 214 if strings.HasPrefix(pad.GetName(), "video_") { 215 downstreamPad = inputQueuePadVideoSink 216 } else if strings.HasPrefix(pad.GetName(), "audio_") { 217 downstreamPad = inputQueuePadAudioSink 218 } else { 219 log.Error(ctx, "unknown pad", "name", pad.GetName(), "direction", pad.GetDirection()) 220 cancel() 221 return 222 } 223 ret := pad.Link(downstreamPad) 224 if ret != gst.PadLinkOK { 225 log.Error(ctx, "failed to link demux to downstream pad", "name", pad.GetName(), "direction", pad.GetDirection(), "error", ret) 226 cancel() 227 return 228 } 229 if pad.GetDirection() == gst.PadDirectionSource { 230 pad.AddProbe(gst.PadProbeTypeEventBoth, func(pad *gst.Pad, info *gst.PadProbeInfo) gst.PadProbeReturn { 231 if info.GetEvent().Type() != gst.EventTypeEOS { 232 return gst.PadProbeOK 233 } 234 log.Debug(ctx, "demux EOS", "name", pad.GetName(), "direction", pad.GetDirection()) 235 pad.Unlink(downstreamPad) 236 mu.Lock() 237 defer mu.Unlock() 238 count -= 1 239 240 if count == 0 { 241 // don't keep going if our context is done 242 if ctx.Err() == nil { 243 go nextFile() 244 segDone <- struct{}{} 245 } 246 } else { 247 log.Debug(ctx, "demux has more pads, waiting for them to close") 248 } 249 return gst.PadProbeRemove 250 }) 251 } 252 }) 253 if err != nil { 254 log.Error(ctx, "failed to connect demux pad-added", "error", err) 255 cancel() 256 return 257 } 258 259 appsrc, err := gst.NewElementWithProperties("appsrc", map[string]any{ 260 "name": fmt.Sprintf("concat-appsrc-%d", mySegCount), 261 "is-live": true, 262 }) 263 if err != nil { 264 log.Error(ctx, "failed to get appsrc element from pipeline", "error", err) 265 cancel() 266 return 267 } 268 269 src := app.SrcFromElement(appsrc) 270 271 appSrcPad := appsrc.GetStaticPad("src") 272 if appSrcPad == nil { 273 log.Error(ctx, "failed to get appsrc pad") 274 cancel() 275 return 276 } 277 278 done := func() { 279 // appsrc.Unlink(demux) 280 pads, err := src.GetPads() 281 if err != nil { 282 log.Error(ctx, "failed to get pads", "error", err) 283 cancel() 284 return 285 } 286 for _, pad := range pads { 287 log.Debug(ctx, "setting pad-idle", "name", pad.GetName(), "direction", pad.GetDirection()) 288 289 pad.AddProbe(gst.PadProbeTypeIdle, func(pad *gst.Pad, info *gst.PadProbeInfo) gst.PadProbeReturn { 290 log.Debug(ctx, "pad-idle", "name", pad.GetName(), "direction", pad.GetDirection()) 291 src.EndStream() 292 return gst.PadProbeRemove 293 }) 294 } 295 } 296 297 src.SetAutomaticEOS(false) 298 src.SetCallbacks(&app.SourceCallbacks{ 299 NeedDataFunc: func(self *app.Source, length uint) { 300 bs := make([]byte, length) 301 read, err := pr.Read(bs) 302 if err != nil { 303 if errors.Is(err, io.EOF) { 304 if read > 0 { 305 log.Debug(ctx, "got data on eof???") 306 cancel() 307 return 308 } 309 log.Debug(ctx, "EOF, ending segment", "length", read) 310 done() 311 return 312 } else { 313 log.Debug(ctx, "failed to read data, ending stream", "error", err) 314 cancel() 315 return 316 } 317 } 318 toPush := bs 319 if uint(read) < length { 320 toPush = bs[:read] 321 } 322 buffer := gst.NewBufferWithSize(int64(len(toPush))) 323 buffer.Map(gst.MapWrite).WriteData(toPush) 324 defer buffer.Unmap() 325 self.PushBuffer(buffer) 326 327 if uint(read) < length { 328 log.Debug(ctx, "short write, ending segment", "length", read) 329 done() 330 } 331 }, 332 }) 333 err = pipeline.Add(appsrc) 334 if err != nil { 335 log.Error(ctx, "failed to add appsrc to pipeline", "error", err) 336 cancel() 337 return 338 } 339 340 ret := appSrcPad.Link(demuxSinkPad) 341 if ret != gst.PadLinkOK { 342 log.Error(ctx, "failed to link appsrc to demux", "error", ret) 343 cancel() 344 return 345 } 346 347 err = demux.SetState(gst.StatePlaying) 348 if err != nil { 349 log.Error(ctx, "failed to set demux state", "error", err) 350 cancel() 351 return 352 } 353 err = appsrc.SetState(gst.StatePlaying) 354 if err != nil { 355 log.Error(ctx, "failed to set appsrc state", "error", err) 356 cancel() 357 return 358 } 359 360 select { 361 case <-ctx.Done(): 362 return 363 case <-segDone: 364 } 365 366 log.Debug(ctx, "ending segment") 367 if err := demux.SetState(gst.StateNull); err != nil { 368 log.Error(ctx, "failed to set demux state", "error", err) 369 return 370 } 371 src.SetCallbacks(&app.SourceCallbacks{}) 372 if err := appsrc.SetState(gst.StateNull); err != nil { 373 log.Error(ctx, "failed to set appsrc state", "error", err) 374 return 375 } 376 if err := pipeline.Remove(demux); err != nil { 377 log.Error(ctx, "failed to remove demux from pipleine", "error", err) 378 return 379 } 380 if err := pipeline.Remove(appsrc); err != nil { 381 log.Error(ctx, "failed to remove appsrc from pipleine", "error", err) 382 return 383 } 384 pr.Close() 385 pw.Close() 386 } 387 388 // fire it up! 389 go nextFile() 390 391 return outputQueue, ctx.Done(), nil 392}