Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at natb/analytics 231 lines 6.9 kB view raw
1package media 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "strings" 8 9 "github.com/go-gst/go-gst/gst" 10 "github.com/go-gst/go-gst/gst/app" 11 "stream.place/streamplace/pkg/bus" 12 "stream.place/streamplace/pkg/log" 13) 14 15// silly technique to avoid leaking pads 16func doNothing(self *gst.Element, pad *gst.Pad) {} 17 18// Function for demuxing a single segment. Needs to be handled very carefully. 19// In particular: users of this MUST cancel the passed context when they're 20// done with the bin. 21func ConcatDemuxBin(ctx context.Context, seg *bus.Seg, doH264Parse bool) (*gst.Bin, error) { 22 ctx = log.WithLogValues(ctx, "func", "ConcatDemuxBin") 23 bin := gst.NewBin("seg-demux-bin") 24 25 appSrc, err := gst.NewElementWithProperties("appsrc", map[string]interface{}{ 26 "name": "concat-appsrc", 27 }) 28 if err != nil { 29 return nil, fmt.Errorf("failed to create appsrc element: %w", err) 30 } 31 err = bin.Add(appSrc) 32 if err != nil { 33 return nil, fmt.Errorf("failed to add appsrc to bin: %w", err) 34 } 35 36 demux, err := gst.NewElementWithProperties("qtdemux", map[string]interface{}{ 37 "name": "concat-demux", 38 }) 39 if err != nil { 40 return nil, fmt.Errorf("failed to create qtdemux element: %w", err) 41 } 42 err = bin.Add(demux) 43 if err != nil { 44 return nil, fmt.Errorf("failed to add qtdemux to bin: %w", err) 45 } 46 47 err = appSrc.Link(demux) 48 if err != nil { 49 return nil, fmt.Errorf("failed to link appsrc to qtdemux: %w", err) 50 } 51 52 tmpl := demux.GetPadTemplates() 53 if tmpl == nil { 54 return nil, fmt.Errorf("pad templates not found") 55 } 56 57 mq, err := gst.NewElementWithProperties("multiqueue", map[string]interface{}{ 58 "name": "concat-demux-multiqueue", 59 // "max-size-time": uint(0), // default: 2000000000, 2 seconds 60 // "max-size-bytes": uint(0), // default: 10485760, 10MiB 61 // "max-size-buffers": uint(0), // default: 5, 5 buffers 62 }) 63 if err != nil { 64 return nil, fmt.Errorf("failed to create multiqueue element: %w", err) 65 } 66 err = bin.Add(mq) 67 if err != nil { 68 return nil, fmt.Errorf("failed to add multiqueue to bin: %w", err) 69 } 70 // err = mq.SetProperty("max-size-time", uint64(200000000000)) 71 // if err != nil { 72 // return nil, fmt.Errorf("failed to set max-size-time: %w", err) 73 // } 74 // err = mq.SetProperty("max-size-bytes", uint(1048576000)) 75 // if err != nil { 76 // return nil, fmt.Errorf("failed to set max-size-bytes: %w", err) 77 // } 78 // err = mq.SetProperty("max-size-buffers", uint(500)) 79 // if err != nil { 80 // return nil, fmt.Errorf("failed to set max-size-buffers: %w", err) 81 // } 82 83 opusparse, err := gst.NewElementWithProperties("opusparse", map[string]interface{}{ 84 "name": "concat-demux-opusparse", 85 "disable-passthrough": true, 86 }) 87 if err != nil { 88 return nil, fmt.Errorf("failed to create opusparse element: %w", err) 89 } 90 err = bin.Add(opusparse) 91 if err != nil { 92 return nil, fmt.Errorf("failed to add opusparse to bin: %w", err) 93 } 94 opusparseSinkPad := opusparse.GetStaticPad("sink") 95 if opusparseSinkPad == nil { 96 return nil, fmt.Errorf("failed to get opusparse sink pad") 97 } 98 opusparseSrcPad := opusparse.GetStaticPad("src") 99 if opusparseSrcPad == nil { 100 return nil, fmt.Errorf("failed to get opusparse source pad") 101 } 102 103 mqVideoSink := mq.GetRequestPad("sink_%u") 104 if mqVideoSink == nil { 105 return nil, fmt.Errorf("video sink pad not found") 106 } 107 108 mqAudioSink := mq.GetRequestPad("sink_%u") 109 if mqAudioSink == nil { 110 return nil, fmt.Errorf("audio sink pad not found") 111 } 112 113 mqVideoSrc := mq.GetStaticPad("src_0") 114 if mqVideoSrc == nil { 115 return nil, fmt.Errorf("video source pad not found") 116 } 117 118 mqAudioSrc := mq.GetStaticPad("src_1") 119 if mqAudioSrc == nil { 120 return nil, fmt.Errorf("audio source pad not found") 121 } 122 123 linked := mqAudioSrc.Link(opusparseSinkPad) 124 if linked != gst.PadLinkOK { 125 return nil, fmt.Errorf("failed to link opusparse sink pad to mq audio sink pad") 126 } 127 128 var videoGhost *gst.GhostPad 129 if doH264Parse { 130 h264parse, err := gst.NewElementWithProperties("h264parse", map[string]interface{}{ 131 "name": "concat-demux-h264parse", 132 "config-interval": -1, 133 "disable-passthrough": true, 134 }) 135 if err != nil { 136 return nil, fmt.Errorf("failed to create h264parse element: %w", err) 137 } 138 err = bin.Add(h264parse) 139 if err != nil { 140 return nil, fmt.Errorf("failed to add h264parse to bin: %w", err) 141 } 142 h264parseSinkPad := h264parse.GetStaticPad("sink") 143 if h264parseSinkPad == nil { 144 return nil, fmt.Errorf("failed to get h264parse sink pad") 145 } 146 h264parseSrcPad := h264parse.GetStaticPad("src") 147 if h264parseSrcPad == nil { 148 return nil, fmt.Errorf("failed to get h264parse source pad") 149 } 150 linked := mqVideoSrc.Link(h264parseSinkPad) 151 if linked != gst.PadLinkOK { 152 return nil, fmt.Errorf("failed to link h264parse sink pad to mq video sink pad") 153 } 154 155 videoGhost = gst.NewGhostPad("video_0", h264parseSrcPad) 156 if videoGhost == nil { 157 return nil, fmt.Errorf("failed to create video ghost pad") 158 } 159 } else { 160 videoGhost = gst.NewGhostPad("video_0", mqVideoSrc) 161 if videoGhost == nil { 162 return nil, fmt.Errorf("failed to create video ghost pad") 163 } 164 } 165 166 audioGhost := gst.NewGhostPad("audio_0", opusparseSrcPad) 167 if audioGhost == nil { 168 return nil, fmt.Errorf("failed to create audio ghost pad") 169 } 170 171 needed := 2 172 173 var padAdded func(self *gst.Element, pad *gst.Pad) 174 // the defer funcs are needed to avoid leaking pads for some reason 175 padAdded = func(self *gst.Element, pad *gst.Pad) { 176 log.Debug(ctx, "demux pad-added", "name", pad.GetName(), "direction", pad.GetDirection()) 177 var downstreamPad *gst.Pad 178 if strings.HasPrefix(pad.GetName(), "video_") { 179 downstreamPad = mqVideoSink 180 } else if strings.HasPrefix(pad.GetName(), "audio_") { 181 downstreamPad = mqAudioSink 182 } else { 183 log.Error(ctx, "unknown pad", "name", pad.GetName(), "direction", pad.GetDirection()) 184 // cancel() 185 return 186 } 187 ret := pad.Link(downstreamPad) 188 if ret != gst.PadLinkOK { 189 log.Error(ctx, "failed to link demux to downstream pad", "name", pad.GetName(), "direction", pad.GetDirection(), "error", ret) 190 // cancel() 191 return 192 } 193 needed-- 194 if needed == 0 { 195 padAdded = doNothing 196 } 197 } 198 outerPadAdded := func(self *gst.Element, pad *gst.Pad) { 199 padAdded(self, pad) 200 } 201 202 // Necessary to avoid leaking `mqVideoSink` and `mqAudioSink` from the 203 // pad-added function in the case where we hit invalid data and 204 // pad-added never fires. 205 go func() { 206 <-ctx.Done() 207 padAdded = doNothing 208 }() 209 210 _, err = demux.Connect("pad-added", outerPadAdded) 211 if err != nil { 212 return nil, fmt.Errorf("failed to connect demux pad-added signal: %w", err) 213 } 214 215 ok := bin.AddPad(videoGhost.Pad) 216 if !ok { 217 return nil, fmt.Errorf("failed to add video ghost pad to bin") 218 } 219 220 ok = bin.AddPad(audioGhost.Pad) 221 if !ok { 222 return nil, fmt.Errorf("failed to add audio ghost pad to bin") 223 } 224 225 src := app.SrcFromElement(appSrc) 226 src.SetCallbacks(&app.SourceCallbacks{ 227 NeedDataFunc: ReaderNeedData(ctx, bytes.NewReader(seg.Data)), 228 }) 229 230 return bin, nil 231}