Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at v0.9.4 392 lines 11 kB view raw
1package media 2 3import ( 4 "bytes" 5 "context" 6 "errors" 7 "fmt" 8 "io" 9 "strings" 10 "sync" 11 12 "github.com/go-gst/go-gst/gst" 13 "github.com/go-gst/go-gst/gst/app" 14 "stream.place/streamplace/pkg/bus" 15 "stream.place/streamplace/pkg/log" 16) 17 18type ConcatStreamer interface { 19 SubscribeSegment(ctx context.Context, user string, rendition string) *bus.SegChan 20 UnsubscribeSegment(ctx context.Context, user string, rendition string, ch *bus.SegChan) 21} 22 23// This function remains in scope for the duration of a single users' playback 24func ConcatStream(ctx context.Context, pipeline *gst.Pipeline, user string, rendition string, streamer ConcatStreamer) (*gst.Element, <-chan struct{}, error) { 25 ctx = log.WithLogValues(ctx, "func", "ConcatStream") 26 ctx, cancel := context.WithCancel(ctx) 27 defer cancel() 28 29 // make 1000000000000 elements! 30 31 // input multiqueue 32 inputQueue, err := gst.NewElementWithProperties("multiqueue", map[string]any{}) 33 if err != nil { 34 return nil, nil, fmt.Errorf("failed to create multiqueue element: %w", err) 35 } 36 err = pipeline.Add(inputQueue) 37 if err != nil { 38 return nil, nil, fmt.Errorf("failed to add input multiqueue to pipeline: %w", err) 39 } 40 inputQueuePadVideoSink := inputQueue.GetRequestPad("sink_%u") 41 if inputQueuePadVideoSink == nil { 42 return nil, nil, fmt.Errorf("failed to get input queue video sink pad") 43 } 44 inputQueuePadAudioSink := inputQueue.GetRequestPad("sink_%u") 45 if inputQueuePadAudioSink == nil { 46 return nil, nil, fmt.Errorf("failed to get input queue audio sink pad") 47 } 48 inputQueuePadVideoSrc := inputQueue.GetStaticPad("src_0") 49 if inputQueuePadVideoSrc == nil { 50 return nil, nil, fmt.Errorf("failed to get input queue video src pad") 51 } 52 inputQueuePadAudioSrc := inputQueue.GetStaticPad("src_1") 53 if inputQueuePadAudioSrc == nil { 54 return nil, nil, fmt.Errorf("failed to get input queue audio src pad") 55 } 56 // streamsynchronizer 57 streamsynchronizer, err := gst.NewElementWithProperties("streamsynchronizer", map[string]any{}) 58 if err != nil { 59 return nil, nil, fmt.Errorf("failed to create streamsynchronizer element: %w", err) 60 } 61 62 err = pipeline.Add(streamsynchronizer) 63 if err != nil { 64 return nil, nil, fmt.Errorf("failed to add streamsynchronizer to pipeline: %w", err) 65 } 66 syncPadVideoSink := streamsynchronizer.GetRequestPad("sink_%u") 67 if syncPadVideoSink == nil { 68 return nil, nil, fmt.Errorf("failed to get sync video sink pad") 69 } 70 syncPadAudioSink := streamsynchronizer.GetRequestPad("sink_%u") 71 if syncPadAudioSink == nil { 72 return nil, nil, fmt.Errorf("failed to get sync audio sink pad") 73 } 74 syncPadVideoSrc := streamsynchronizer.GetStaticPad("src_0") 75 if syncPadVideoSrc == nil { 76 return nil, nil, fmt.Errorf("failed to get sync video src pad") 77 } 78 syncPadAudioSrc := streamsynchronizer.GetStaticPad("src_1") 79 if syncPadAudioSrc == nil { 80 return nil, nil, fmt.Errorf("failed to get sync audio src pad") 81 } 82 83 // output multiqueue 84 outputQueue, err := gst.NewElementWithProperties("multiqueue", map[string]any{ 85 "name": "concat-output-queue", 86 }) 87 if err != nil { 88 return nil, nil, fmt.Errorf("failed to create multiqueue element: %w", err) 89 } 90 err = pipeline.Add(outputQueue) 91 if err != nil { 92 return nil, nil, fmt.Errorf("failed to add output multiqueue to pipeline: %w", err) 93 } 94 outputQueuePadVideoSink := outputQueue.GetRequestPad("sink_%u") 95 if outputQueuePadVideoSink == nil { 96 return nil, nil, fmt.Errorf("failed to get output queue video sink pad") 97 } 98 outputQueuePadAudioSink := outputQueue.GetRequestPad("sink_%u") 99 if outputQueuePadAudioSink == nil { 100 return nil, nil, fmt.Errorf("failed to get output queue audio sink pad") 101 } 102 // linking 103 104 // input queue to streamsynchronizer 105 ret := inputQueuePadVideoSrc.Link(syncPadVideoSink) 106 if ret != gst.PadLinkOK { 107 return nil, nil, fmt.Errorf("failed to link multiqueue to streamsynchronizer: %v", ret) 108 } 109 ret = inputQueuePadAudioSrc.Link(syncPadAudioSink) 110 if ret != gst.PadLinkOK { 111 return nil, nil, fmt.Errorf("failed to link multiqueue to streamsynchronizer: %v", ret) 112 } 113 114 // streamsynchronizer to output queue 115 ret = syncPadVideoSrc.Link(outputQueuePadVideoSink) 116 if ret != gst.PadLinkOK { 117 return nil, nil, fmt.Errorf("failed to link streamsynchronizer to output queue: %v", ret) 118 } 119 ret = syncPadAudioSrc.Link(outputQueuePadAudioSink) 120 if ret != gst.PadLinkOK { 121 return nil, nil, fmt.Errorf("failed to link streamsynchronizer to output queue: %v", ret) 122 } 123 124 // ok now we can start looping over input files 125 126 // this goroutine will read all the files from the segment queue and buffer 127 // them in a pipe so that we don't miss any in between iterations of the output 128 allFiles := make(chan []byte, 1024) 129 go func() { 130 ch := streamer.SubscribeSegment(ctx, user, rendition) 131 defer streamer.UnsubscribeSegment(ctx, user, rendition, ch) 132 for { 133 select { 134 case <-ctx.Done(): 135 log.Debug(ctx, "exiting segment reader") 136 return 137 case file := <-ch.C: 138 log.Debug(ctx, "got segment", "file", file.Filepath) 139 allFiles <- file.Data 140 if len(file.Data) == 0 { 141 log.Warn(ctx, "no more segments, stopping segment reader") 142 return 143 } 144 } 145 } 146 }() 147 148 segCount := 0 149 150 // nextFile is the primary loop that pops off a file, creates new demuxer elements for it, 151 // and pushes into the pipeline 152 var nextFile func() 153 nextFile = func() { 154 mySegCount := segCount 155 segCount += 1 156 segDone := make(chan struct{}) 157 log.Debug(ctx, "moving to next file", "segCount", mySegCount) 158 pr, pw := io.Pipe() 159 go func() { 160 select { 161 case <-ctx.Done(): 162 pr.Close() 163 pw.Close() 164 return 165 case bs := <-allFiles: 166 if len(bs) == 0 { 167 log.Warn(ctx, "no more segments, ending stream") 168 pr.Close() 169 pw.Close() 170 cancel() 171 return 172 } 173 _, err = io.Copy(pw, bytes.NewReader(bs)) 174 if err != nil { 175 log.Error(ctx, "failed to copy segment file", "error", err) 176 cancel() 177 return 178 } 179 return 180 } 181 }() 182 183 demux, err := gst.NewElementWithProperties("qtdemux", map[string]any{ 184 "name": fmt.Sprintf("concat-demux-%d", mySegCount), 185 }) 186 if err != nil { 187 log.Error(ctx, "failed to create demux element", "error", err) 188 cancel() 189 return 190 } 191 192 err = pipeline.Add(demux) 193 if err != nil { 194 log.Error(ctx, "failed to add demux to pipeline", "error", err) 195 cancel() 196 return 197 } 198 199 demuxSinkPad := demux.GetStaticPad("sink") 200 if demuxSinkPad == nil { 201 log.Error(ctx, "failed to get demux sink pad") 202 cancel() 203 return 204 } 205 206 mu := sync.Mutex{} 207 count := 0 208 _, err = demux.Connect("pad-added", func(self *gst.Element, pad *gst.Pad) { 209 mu.Lock() 210 count += 1 211 mu.Unlock() 212 log.Debug(ctx, "demux pad-added", "name", pad.GetName(), "direction", pad.GetDirection()) 213 var downstreamPad *gst.Pad 214 if strings.HasPrefix(pad.GetName(), "video_") { 215 downstreamPad = inputQueuePadVideoSink 216 } else if strings.HasPrefix(pad.GetName(), "audio_") { 217 downstreamPad = inputQueuePadAudioSink 218 } else { 219 log.Error(ctx, "unknown pad", "name", pad.GetName(), "direction", pad.GetDirection()) 220 cancel() 221 return 222 } 223 ret := pad.Link(downstreamPad) 224 if ret != gst.PadLinkOK { 225 log.Error(ctx, "failed to link demux to downstream pad", "name", pad.GetName(), "direction", pad.GetDirection(), "error", ret) 226 cancel() 227 return 228 } 229 if pad.GetDirection() == gst.PadDirectionSource { 230 pad.AddProbe(gst.PadProbeTypeEventBoth, func(pad *gst.Pad, info *gst.PadProbeInfo) gst.PadProbeReturn { 231 if info.GetEvent().Type() != gst.EventTypeEOS { 232 return gst.PadProbeOK 233 } 234 log.Debug(ctx, "demux EOS", "name", pad.GetName(), "direction", pad.GetDirection()) 235 pad.Unlink(downstreamPad) 236 mu.Lock() 237 defer mu.Unlock() 238 count -= 1 239 240 if count == 0 { 241 // don't keep going if our context is done 242 if ctx.Err() == nil { 243 go nextFile() 244 segDone <- struct{}{} 245 } 246 } else { 247 log.Debug(ctx, "demux has more pads, waiting for them to close") 248 } 249 return gst.PadProbeRemove 250 }) 251 } 252 }) 253 if err != nil { 254 log.Error(ctx, "failed to connect demux pad-added", "error", err) 255 cancel() 256 return 257 } 258 259 appsrc, err := gst.NewElementWithProperties("appsrc", map[string]any{ 260 "name": fmt.Sprintf("concat-appsrc-%d", mySegCount), 261 "is-live": true, 262 }) 263 if err != nil { 264 log.Error(ctx, "failed to get appsrc element from pipeline", "error", err) 265 cancel() 266 return 267 } 268 269 src := app.SrcFromElement(appsrc) 270 271 appSrcPad := appsrc.GetStaticPad("src") 272 if appSrcPad == nil { 273 log.Error(ctx, "failed to get appsrc pad") 274 cancel() 275 return 276 } 277 278 done := func() { 279 // appsrc.Unlink(demux) 280 pads, err := src.GetPads() 281 if err != nil { 282 log.Error(ctx, "failed to get pads", "error", err) 283 cancel() 284 return 285 } 286 for _, pad := range pads { 287 log.Debug(ctx, "setting pad-idle", "name", pad.GetName(), "direction", pad.GetDirection()) 288 289 pad.AddProbe(gst.PadProbeTypeIdle, func(pad *gst.Pad, info *gst.PadProbeInfo) gst.PadProbeReturn { 290 log.Debug(ctx, "pad-idle", "name", pad.GetName(), "direction", pad.GetDirection()) 291 src.EndStream() 292 return gst.PadProbeRemove 293 }) 294 } 295 } 296 297 src.SetAutomaticEOS(false) 298 src.SetCallbacks(&app.SourceCallbacks{ 299 NeedDataFunc: func(self *app.Source, length uint) { 300 bs := make([]byte, length) 301 read, err := pr.Read(bs) 302 if err != nil { 303 if errors.Is(err, io.EOF) { 304 if read > 0 { 305 log.Debug(ctx, "got data on eof???") 306 cancel() 307 return 308 } 309 log.Debug(ctx, "EOF, ending segment", "length", read) 310 done() 311 return 312 } else { 313 log.Debug(ctx, "failed to read data, ending stream", "error", err) 314 cancel() 315 return 316 } 317 } 318 toPush := bs 319 if uint(read) < length { 320 toPush = bs[:read] 321 } 322 buffer := gst.NewBufferWithSize(int64(len(toPush))) 323 buffer.Map(gst.MapWrite).WriteData(toPush) 324 defer buffer.Unmap() 325 self.PushBuffer(buffer) 326 327 if uint(read) < length { 328 log.Debug(ctx, "short write, ending segment", "length", read) 329 done() 330 } 331 }, 332 }) 333 err = pipeline.Add(appsrc) 334 if err != nil { 335 log.Error(ctx, "failed to add appsrc to pipeline", "error", err) 336 cancel() 337 return 338 } 339 340 ret := appSrcPad.Link(demuxSinkPad) 341 if ret != gst.PadLinkOK { 342 log.Error(ctx, "failed to link appsrc to demux", "error", ret) 343 cancel() 344 return 345 } 346 347 err = demux.SetState(gst.StatePlaying) 348 if err != nil { 349 log.Error(ctx, "failed to set demux state", "error", err) 350 cancel() 351 return 352 } 353 err = appsrc.SetState(gst.StatePlaying) 354 if err != nil { 355 log.Error(ctx, "failed to set appsrc state", "error", err) 356 cancel() 357 return 358 } 359 360 select { 361 case <-ctx.Done(): 362 return 363 case <-segDone: 364 } 365 366 log.Debug(ctx, "ending segment") 367 if err := demux.SetState(gst.StateNull); err != nil { 368 log.Error(ctx, "failed to set demux state", "error", err) 369 return 370 } 371 src.SetCallbacks(&app.SourceCallbacks{}) 372 if err := appsrc.SetState(gst.StateNull); err != nil { 373 log.Error(ctx, "failed to set appsrc state", "error", err) 374 return 375 } 376 if err := pipeline.Remove(demux); err != nil { 377 log.Error(ctx, "failed to remove demux from pipleine", "error", err) 378 return 379 } 380 if err := pipeline.Remove(appsrc); err != nil { 381 log.Error(ctx, "failed to remove appsrc from pipleine", "error", err) 382 return 383 } 384 pr.Close() 385 pw.Close() 386 } 387 388 // fire it up! 389 go nextFile() 390 391 return outputQueue, ctx.Done(), nil 392}