Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at v0.8.4 158 lines 4.3 kB view raw
1package media 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "strings" 8 9 "github.com/go-gst/go-gst/gst" 10 "github.com/go-gst/go-gst/gst/app" 11 "stream.place/streamplace/pkg/bus" 12 "stream.place/streamplace/pkg/log" 13) 14 15// silly technique to avoid leaking pads 16func doNothing(self *gst.Element, pad *gst.Pad) {} 17 18// Function for demuxing a single segment. Needs to be handled very carefully. 19// In particular: users of this MUST cancel the passed context when they're 20// done with the bin. 21func ConcatDemuxBin(ctx context.Context, seg *bus.Seg) (*gst.Bin, error) { 22 ctx = log.WithLogValues(ctx, "func", "SegDemuxBin") 23 bin := gst.NewBin("seg-demux-bin") 24 25 appSrc, err := gst.NewElementWithProperties("appsrc", map[string]interface{}{ 26 "name": "concat-appsrc", 27 }) 28 if err != nil { 29 return nil, fmt.Errorf("failed to create appsrc element: %w", err) 30 } 31 err = bin.Add(appSrc) 32 if err != nil { 33 return nil, fmt.Errorf("failed to add appsrc to bin: %w", err) 34 } 35 36 demux, err := gst.NewElementWithProperties("qtdemux", map[string]interface{}{ 37 "name": "concat-demux", 38 }) 39 if err != nil { 40 return nil, fmt.Errorf("failed to create qtdemux element: %w", err) 41 } 42 err = bin.Add(demux) 43 if err != nil { 44 return nil, fmt.Errorf("failed to add qtdemux to bin: %w", err) 45 } 46 47 err = appSrc.Link(demux) 48 if err != nil { 49 return nil, fmt.Errorf("failed to link appsrc to qtdemux: %w", err) 50 } 51 52 tmpl := demux.GetPadTemplates() 53 if tmpl == nil { 54 return nil, fmt.Errorf("pad templates not found") 55 } 56 57 mq, err := gst.NewElementWithProperties("multiqueue", map[string]interface{}{ 58 "name": "concat-demux-multiqueue", 59 }) 60 if err != nil { 61 return nil, fmt.Errorf("failed to create multiqueue element: %w", err) 62 } 63 err = bin.Add(mq) 64 if err != nil { 65 return nil, fmt.Errorf("failed to add multiqueue to bin: %w", err) 66 } 67 68 mqVideoSink := mq.GetRequestPad("sink_%u") 69 if mqVideoSink == nil { 70 return nil, fmt.Errorf("video sink pad not found") 71 } 72 73 mqAudioSink := mq.GetRequestPad("sink_%u") 74 if mqAudioSink == nil { 75 return nil, fmt.Errorf("audio sink pad not found") 76 } 77 78 mqVideoSrc := mq.GetStaticPad("src_0") 79 if mqVideoSrc == nil { 80 return nil, fmt.Errorf("video source pad not found") 81 } 82 83 mqAudioSrc := mq.GetStaticPad("src_1") 84 if mqAudioSrc == nil { 85 return nil, fmt.Errorf("audio source pad not found") 86 } 87 88 videoGhost := gst.NewGhostPad("video_0", mqVideoSrc) 89 if videoGhost == nil { 90 return nil, fmt.Errorf("failed to create video ghost pad") 91 } 92 93 audioGhost := gst.NewGhostPad("audio_0", mqAudioSrc) 94 if audioGhost == nil { 95 return nil, fmt.Errorf("failed to create audio ghost pad") 96 } 97 98 needed := 2 99 100 var padAdded func(self *gst.Element, pad *gst.Pad) 101 // the defer funcs are needed to avoid leaking pads for some reason 102 padAdded = func(self *gst.Element, pad *gst.Pad) { 103 log.Debug(ctx, "demux pad-added", "name", pad.GetName(), "direction", pad.GetDirection()) 104 var downstreamPad *gst.Pad 105 if strings.HasPrefix(pad.GetName(), "video_") { 106 downstreamPad = mqVideoSink 107 } else if strings.HasPrefix(pad.GetName(), "audio_") { 108 downstreamPad = mqAudioSink 109 } else { 110 log.Error(ctx, "unknown pad", "name", pad.GetName(), "direction", pad.GetDirection()) 111 // cancel() 112 return 113 } 114 ret := pad.Link(downstreamPad) 115 if ret != gst.PadLinkOK { 116 log.Error(ctx, "failed to link demux to downstream pad", "name", pad.GetName(), "direction", pad.GetDirection(), "error", ret) 117 // cancel() 118 return 119 } 120 needed-- 121 if needed == 0 { 122 padAdded = doNothing 123 } 124 } 125 outerPadAdded := func(self *gst.Element, pad *gst.Pad) { 126 padAdded(self, pad) 127 } 128 129 // Necessary to avoid leaking `mqVideoSink` and `mqAudioSink` from the 130 // pad-added function in the case where we hit invalid data and 131 // pad-added never fires. 132 go func() { 133 <-ctx.Done() 134 padAdded = doNothing 135 }() 136 137 _, err = demux.Connect("pad-added", outerPadAdded) 138 if err != nil { 139 return nil, fmt.Errorf("failed to connect demux pad-added signal: %w", err) 140 } 141 142 ok := bin.AddPad(videoGhost.Pad) 143 if !ok { 144 return nil, fmt.Errorf("failed to add video ghost pad to bin") 145 } 146 147 ok = bin.AddPad(audioGhost.Pad) 148 if !ok { 149 return nil, fmt.Errorf("failed to add audio ghost pad to bin") 150 } 151 152 src := app.SrcFromElement(appSrc) 153 src.SetCallbacks(&app.SourceCallbacks{ 154 NeedDataFunc: ReaderNeedData(ctx, bytes.NewReader(seg.Data)), 155 }) 156 157 return bin, nil 158}