Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at eli/postgres 149 lines 4.0 kB view raw
1package media 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "strings" 8 9 "github.com/go-gst/go-gst/gst" 10 "github.com/go-gst/go-gst/gst/app" 11 "stream.place/streamplace/pkg/bus" 12 "stream.place/streamplace/pkg/log" 13) 14 15// silly technique to avoid leaking pads 16func doNothing(self *gst.Element, pad *gst.Pad) {} 17 18func ConcatDemuxBin(ctx context.Context, seg *bus.Seg) (*gst.Bin, error) { 19 ctx = log.WithLogValues(ctx, "func", "SegDemuxBin") 20 bin := gst.NewBin("seg-demux-bin") 21 22 appSrc, err := gst.NewElementWithProperties("appsrc", map[string]interface{}{ 23 "name": "concat-appsrc", 24 }) 25 if err != nil { 26 return nil, fmt.Errorf("failed to create appsrc element: %w", err) 27 } 28 err = bin.Add(appSrc) 29 if err != nil { 30 return nil, fmt.Errorf("failed to add appsrc to bin: %w", err) 31 } 32 33 demux, err := gst.NewElementWithProperties("qtdemux", map[string]interface{}{ 34 "name": "concat-demux", 35 }) 36 if err != nil { 37 return nil, fmt.Errorf("failed to create qtdemux element: %w", err) 38 } 39 err = bin.Add(demux) 40 if err != nil { 41 return nil, fmt.Errorf("failed to add qtdemux to bin: %w", err) 42 } 43 44 err = appSrc.Link(demux) 45 if err != nil { 46 return nil, fmt.Errorf("failed to link appsrc to qtdemux: %w", err) 47 } 48 49 tmpl := demux.GetPadTemplates() 50 if tmpl == nil { 51 return nil, fmt.Errorf("pad templates not found") 52 } 53 54 mq, err := gst.NewElementWithProperties("multiqueue", map[string]interface{}{ 55 "name": "concat-demux-multiqueue", 56 }) 57 if err != nil { 58 return nil, fmt.Errorf("failed to create multiqueue element: %w", err) 59 } 60 err = bin.Add(mq) 61 if err != nil { 62 return nil, fmt.Errorf("failed to add multiqueue to bin: %w", err) 63 } 64 65 mqVideoSink := mq.GetRequestPad("sink_%u") 66 if mqVideoSink == nil { 67 return nil, fmt.Errorf("video sink pad not found") 68 } 69 70 mqAudioSink := mq.GetRequestPad("sink_%u") 71 if mqAudioSink == nil { 72 return nil, fmt.Errorf("audio sink pad not found") 73 } 74 75 mqVideoSrc := mq.GetStaticPad("src_0") 76 if mqVideoSrc == nil { 77 return nil, fmt.Errorf("video source pad not found") 78 } 79 80 mqAudioSrc := mq.GetStaticPad("src_1") 81 if mqAudioSrc == nil { 82 return nil, fmt.Errorf("audio source pad not found") 83 } 84 85 videoGhost := gst.NewGhostPad("video_0", mqVideoSrc) 86 if videoGhost == nil { 87 return nil, fmt.Errorf("failed to create video ghost pad") 88 } 89 90 audioGhost := gst.NewGhostPad("audio_0", mqAudioSrc) 91 if audioGhost == nil { 92 return nil, fmt.Errorf("failed to create audio ghost pad") 93 } 94 95 needed := 2 96 97 var padAdded func(self *gst.Element, pad *gst.Pad) 98 // the defer funcs are needed to avoid leaking pads for some reason 99 padAdded = func(self *gst.Element, pad *gst.Pad) { 100 log.Debug(ctx, "demux pad-added", "name", pad.GetName(), "direction", pad.GetDirection()) 101 var downstreamPad *gst.Pad 102 if strings.HasPrefix(pad.GetName(), "video_") { 103 downstreamPad = mqVideoSink 104 // defer func() { mqVideoSink = nil }() 105 } else if strings.HasPrefix(pad.GetName(), "audio_") { 106 downstreamPad = mqAudioSink 107 // defer func() { mqAudioSink = nil }() 108 } else { 109 log.Error(ctx, "unknown pad", "name", pad.GetName(), "direction", pad.GetDirection()) 110 // cancel() 111 return 112 } 113 ret := pad.Link(downstreamPad) 114 if ret != gst.PadLinkOK { 115 log.Error(ctx, "failed to link demux to downstream pad", "name", pad.GetName(), "direction", pad.GetDirection(), "error", ret) 116 // cancel() 117 return 118 } 119 needed-- 120 if needed == 0 { 121 padAdded = doNothing 122 } 123 } 124 outerPadAdded := func(self *gst.Element, pad *gst.Pad) { 125 padAdded(self, pad) 126 } 127 128 _, err = demux.Connect("pad-added", outerPadAdded) 129 if err != nil { 130 return nil, fmt.Errorf("failed to connect demux pad-added signal: %w", err) 131 } 132 133 ok := bin.AddPad(videoGhost.Pad) 134 if !ok { 135 return nil, fmt.Errorf("failed to add video ghost pad to bin") 136 } 137 138 ok = bin.AddPad(audioGhost.Pad) 139 if !ok { 140 return nil, fmt.Errorf("failed to add audio ghost pad to bin") 141 } 142 143 src := app.SrcFromElement(appSrc) 144 src.SetCallbacks(&app.SourceCallbacks{ 145 NeedDataFunc: ReaderNeedData(ctx, bytes.NewReader(seg.Data)), 146 }) 147 148 return bin, nil 149}