Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at natb/rtmp-preferred 379 lines 11 kB view raw
1package media 2 3import ( 4 "bytes" 5 "context" 6 "errors" 7 "fmt" 8 "io" 9 "strings" 10 "sync" 11 12 "github.com/go-gst/go-gst/gst" 13 "github.com/go-gst/go-gst/gst/app" 14 "stream.place/streamplace/pkg/log" 15 "stream.place/streamplace/pkg/media/segchanman" 16) 17 18type ConcatStreamer interface { 19 SubscribeSegment(ctx context.Context, user string, rendition string) <-chan *segchanman.Seg 20 UnsubscribeSegment(ctx context.Context, user string, rendition string, ch <-chan *segchanman.Seg) 21} 22 23// This function remains in scope for the duration of a single users' playback 24func ConcatStream(ctx context.Context, pipeline *gst.Pipeline, user string, rendition string, streamer ConcatStreamer) (*gst.Element, <-chan struct{}, error) { 25 ctx = log.WithLogValues(ctx, "func", "ConcatStream") 26 ctx, cancel := context.WithCancel(ctx) 27 28 // make 1000000000000 elements! 29 30 // input multiqueue 31 inputQueue, err := gst.NewElementWithProperties("multiqueue", map[string]any{}) 32 if err != nil { 33 return nil, nil, fmt.Errorf("failed to create multiqueue element: %w", err) 34 } 35 err = pipeline.Add(inputQueue) 36 if err != nil { 37 return nil, nil, fmt.Errorf("failed to add input multiqueue to pipeline: %w", err) 38 } 39 inputQueuePadVideoSink := inputQueue.GetRequestPad("sink_%u") 40 if inputQueuePadVideoSink == nil { 41 return nil, nil, fmt.Errorf("failed to get input queue video sink pad") 42 } 43 inputQueuePadAudioSink := inputQueue.GetRequestPad("sink_%u") 44 if inputQueuePadAudioSink == nil { 45 return nil, nil, fmt.Errorf("failed to get input queue audio sink pad") 46 } 47 inputQueuePadVideoSrc := inputQueue.GetStaticPad("src_0") 48 if inputQueuePadVideoSrc == nil { 49 return nil, nil, fmt.Errorf("failed to get input queue video src pad") 50 } 51 inputQueuePadAudioSrc := inputQueue.GetStaticPad("src_1") 52 if inputQueuePadAudioSrc == nil { 53 return nil, nil, fmt.Errorf("failed to get input queue audio src pad") 54 } 55 // streamsynchronizer 56 streamsynchronizer, err := gst.NewElementWithProperties("streamsynchronizer", map[string]any{}) 57 if err != nil { 58 return nil, nil, fmt.Errorf("failed to create streamsynchronizer element: %w", err) 59 } 60 61 err = pipeline.Add(streamsynchronizer) 62 if err != nil { 63 return nil, nil, fmt.Errorf("failed to add streamsynchronizer to pipeline: %w", err) 64 } 65 syncPadVideoSink := streamsynchronizer.GetRequestPad("sink_%u") 66 if syncPadVideoSink == nil { 67 return nil, nil, fmt.Errorf("failed to get sync video sink pad") 68 } 69 syncPadAudioSink := streamsynchronizer.GetRequestPad("sink_%u") 70 if syncPadAudioSink == nil { 71 return nil, nil, fmt.Errorf("failed to get sync audio sink pad") 72 } 73 syncPadVideoSrc := streamsynchronizer.GetStaticPad("src_0") 74 if syncPadVideoSrc == nil { 75 return nil, nil, fmt.Errorf("failed to get sync video src pad") 76 } 77 syncPadAudioSrc := streamsynchronizer.GetStaticPad("src_1") 78 if syncPadAudioSrc == nil { 79 return nil, nil, fmt.Errorf("failed to get sync audio src pad") 80 } 81 82 // output multiqueue 83 outputQueue, err := gst.NewElementWithProperties("multiqueue", map[string]any{ 84 "name": "concat-output-queue", 85 }) 86 if err != nil { 87 return nil, nil, fmt.Errorf("failed to create multiqueue element: %w", err) 88 } 89 err = pipeline.Add(outputQueue) 90 if err != nil { 91 return nil, nil, fmt.Errorf("failed to add output multiqueue to pipeline: %w", err) 92 } 93 outputQueuePadVideoSink := outputQueue.GetRequestPad("sink_%u") 94 if outputQueuePadVideoSink == nil { 95 return nil, nil, fmt.Errorf("failed to get output queue video sink pad") 96 } 97 outputQueuePadAudioSink := outputQueue.GetRequestPad("sink_%u") 98 if outputQueuePadAudioSink == nil { 99 return nil, nil, fmt.Errorf("failed to get output queue audio sink pad") 100 } 101 // linking 102 103 // input queue to streamsynchronizer 104 ret := inputQueuePadVideoSrc.Link(syncPadVideoSink) 105 if ret != gst.PadLinkOK { 106 return nil, nil, fmt.Errorf("failed to link multiqueue to streamsynchronizer: %v", ret) 107 } 108 ret = inputQueuePadAudioSrc.Link(syncPadAudioSink) 109 if ret != gst.PadLinkOK { 110 return nil, nil, fmt.Errorf("failed to link multiqueue to streamsynchronizer: %v", ret) 111 } 112 113 // streamsynchronizer to output queue 114 ret = syncPadVideoSrc.Link(outputQueuePadVideoSink) 115 if ret != gst.PadLinkOK { 116 return nil, nil, fmt.Errorf("failed to link streamsynchronizer to output queue: %v", ret) 117 } 118 ret = syncPadAudioSrc.Link(outputQueuePadAudioSink) 119 if ret != gst.PadLinkOK { 120 return nil, nil, fmt.Errorf("failed to link streamsynchronizer to output queue: %v", ret) 121 } 122 123 // ok now we can start looping over input files 124 125 // this goroutine will read all the files from the segment queue and buffer 126 // them in a pipe so that we don't miss any in between iterations of the output 127 allFiles := make(chan []byte, 1024) 128 go func() { 129 ch := streamer.SubscribeSegment(ctx, user, rendition) 130 defer streamer.UnsubscribeSegment(ctx, user, rendition, ch) 131 for { 132 select { 133 case <-ctx.Done(): 134 log.Debug(ctx, "exiting segment reader") 135 return 136 case file := <-ch: 137 log.Debug(ctx, "got segment", "file", file.Filepath) 138 allFiles <- file.Data 139 if len(file.Data) == 0 { 140 log.Warn(ctx, "no more segments, stopping segment reader") 141 return 142 } 143 } 144 } 145 }() 146 147 segCount := 0 148 149 // nextFile is the primary loop that pops off a file, creates new demuxer elements for it, 150 // and pushes into the pipeline 151 var nextFile func() 152 nextFile = func() { 153 mySegCount := segCount 154 segCount += 1 155 segDone := make(chan struct{}) 156 log.Debug(ctx, "moving to next file", "segCount", mySegCount) 157 pr, pw := io.Pipe() 158 go func() { 159 select { 160 case <-ctx.Done(): 161 pr.Close() 162 pw.Close() 163 return 164 case bs := <-allFiles: 165 if len(bs) == 0 { 166 log.Warn(ctx, "no more segments, ending stream") 167 pr.Close() 168 pw.Close() 169 cancel() 170 return 171 } 172 _, err = io.Copy(pw, bytes.NewReader(bs)) 173 if err != nil { 174 log.Error(ctx, "failed to copy segment file", "error", err) 175 cancel() 176 return 177 } 178 return 179 } 180 }() 181 182 demux, err := gst.NewElementWithProperties("qtdemux", map[string]any{ 183 "name": fmt.Sprintf("concat-demux-%d", mySegCount), 184 }) 185 if err != nil { 186 log.Error(ctx, "failed to create demux element", "error", err) 187 cancel() 188 return 189 } 190 191 err = pipeline.Add(demux) 192 if err != nil { 193 log.Error(ctx, "failed to add demux to pipeline", "error", err) 194 cancel() 195 return 196 } 197 198 demuxSinkPad := demux.GetStaticPad("sink") 199 if demuxSinkPad == nil { 200 log.Error(ctx, "failed to get demux sink pad") 201 cancel() 202 return 203 } 204 205 mu := sync.Mutex{} 206 count := 0 207 _, err = demux.Connect("pad-added", func(self *gst.Element, pad *gst.Pad) { 208 mu.Lock() 209 count += 1 210 mu.Unlock() 211 log.Debug(ctx, "demux pad-added", "name", pad.GetName(), "direction", pad.GetDirection()) 212 var downstreamPad *gst.Pad 213 if strings.HasPrefix(pad.GetName(), "video_") { 214 downstreamPad = inputQueuePadVideoSink 215 } else if strings.HasPrefix(pad.GetName(), "audio_") { 216 downstreamPad = inputQueuePadAudioSink 217 } else { 218 log.Error(ctx, "unknown pad", "name", pad.GetName(), "direction", pad.GetDirection()) 219 cancel() 220 return 221 } 222 ret := pad.Link(downstreamPad) 223 if ret != gst.PadLinkOK { 224 log.Error(ctx, "failed to link demux to downstream pad", "name", pad.GetName(), "direction", pad.GetDirection(), "error", ret) 225 cancel() 226 return 227 } 228 if pad.GetDirection() == gst.PadDirectionSource { 229 pad.AddProbe(gst.PadProbeTypeEventBoth, func(pad *gst.Pad, info *gst.PadProbeInfo) gst.PadProbeReturn { 230 if info.GetEvent().Type() != gst.EventTypeEOS { 231 return gst.PadProbeOK 232 } 233 log.Debug(ctx, "demux EOS", "name", pad.GetName(), "direction", pad.GetDirection()) 234 pad.Unlink(downstreamPad) 235 mu.Lock() 236 defer mu.Unlock() 237 count -= 1 238 239 if count == 0 { 240 // don't keep going if our context is done 241 if ctx.Err() == nil { 242 go nextFile() 243 segDone <- struct{}{} 244 } 245 } else { 246 log.Debug(ctx, "demux has more pads, waiting for them to close") 247 } 248 return gst.PadProbeRemove 249 }) 250 } 251 }) 252 if err != nil { 253 log.Error(ctx, "failed to connect demux pad-added", "error", err) 254 cancel() 255 return 256 } 257 258 appsrc, err := gst.NewElementWithProperties("appsrc", map[string]any{ 259 "name": fmt.Sprintf("concat-appsrc-%d", mySegCount), 260 "is-live": true, 261 }) 262 if err != nil { 263 log.Error(ctx, "failed to get appsrc element from pipeline", "error", err) 264 cancel() 265 return 266 } 267 268 src := app.SrcFromElement(appsrc) 269 270 appSrcPad := appsrc.GetStaticPad("src") 271 if appSrcPad == nil { 272 log.Error(ctx, "failed to get appsrc pad") 273 cancel() 274 return 275 } 276 277 done := func() { 278 // appsrc.Unlink(demux) 279 pads, err := src.GetPads() 280 if err != nil { 281 log.Error(ctx, "failed to get pads", "error", err) 282 cancel() 283 return 284 } 285 for _, pad := range pads { 286 log.Debug(ctx, "setting pad-idle", "name", pad.GetName(), "direction", pad.GetDirection()) 287 288 pad.AddProbe(gst.PadProbeTypeIdle, func(pad *gst.Pad, info *gst.PadProbeInfo) gst.PadProbeReturn { 289 log.Debug(ctx, "pad-idle", "name", pad.GetName(), "direction", pad.GetDirection()) 290 src.EndStream() 291 return gst.PadProbeRemove 292 }) 293 } 294 } 295 296 src.SetAutomaticEOS(false) 297 src.SetCallbacks(&app.SourceCallbacks{ 298 NeedDataFunc: func(self *app.Source, length uint) { 299 bs := make([]byte, length) 300 read, err := pr.Read(bs) 301 if err != nil { 302 if errors.Is(err, io.EOF) { 303 if read > 0 { 304 log.Debug(ctx, "got data on eof???") 305 cancel() 306 return 307 } 308 log.Debug(ctx, "EOF, ending segment", "length", read) 309 done() 310 return 311 } else { 312 log.Debug(ctx, "failed to read data, ending stream", "error", err) 313 cancel() 314 return 315 } 316 } 317 toPush := bs 318 if uint(read) < length { 319 toPush = bs[:read] 320 } 321 buffer := gst.NewBufferWithSize(int64(len(toPush))) 322 buffer.Map(gst.MapWrite).WriteData(toPush) 323 defer buffer.Unmap() 324 self.PushBuffer(buffer) 325 326 if uint(read) < length { 327 log.Debug(ctx, "short write, ending segment", "length", read) 328 done() 329 } 330 }, 331 }) 332 err = pipeline.Add(appsrc) 333 if err != nil { 334 log.Error(ctx, "failed to add appsrc to pipeline", "error", err) 335 cancel() 336 return 337 } 338 339 ret := appSrcPad.Link(demuxSinkPad) 340 if ret != gst.PadLinkOK { 341 log.Error(ctx, "failed to link appsrc to demux", "error", ret) 342 cancel() 343 return 344 } 345 346 err = demux.SetState(gst.StatePlaying) 347 if err != nil { 348 log.Error(ctx, "failed to set demux state", "error", err) 349 cancel() 350 return 351 } 352 err = appsrc.SetState(gst.StatePlaying) 353 if err != nil { 354 log.Error(ctx, "failed to set appsrc state", "error", err) 355 cancel() 356 return 357 } 358 359 select { 360 case <-ctx.Done(): 361 return 362 case <-segDone: 363 } 364 365 log.Debug(ctx, "ending segment") 366 demux.SetState(gst.StateNull) 367 src.SetCallbacks(&app.SourceCallbacks{}) 368 appsrc.SetState(gst.StateNull) 369 pipeline.Remove(demux) 370 pipeline.Remove(appsrc) 371 pr.Close() 372 pw.Close() 373 } 374 375 // fire it up! 376 go nextFile() 377 378 return outputQueue, ctx.Done(), nil 379}