Live video on the AT Protocol
1package media
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "strings"
8
9 "github.com/go-gst/go-gst/gst"
10 "github.com/go-gst/go-gst/gst/app"
11 "stream.place/streamplace/pkg/bus"
12 "stream.place/streamplace/pkg/log"
13)
14
15// silly technique to avoid leaking pads
16func doNothing(self *gst.Element, pad *gst.Pad) {}
17
18// Function for demuxing a single segment. Needs to be handled very carefully.
19// In particular: users of this MUST cancel the passed context when they're
20// done with the bin.
21func ConcatDemuxBin(ctx context.Context, seg *bus.Seg, doH264Parse bool) (*gst.Bin, error) {
22 ctx = log.WithLogValues(ctx, "func", "ConcatDemuxBin")
23 bin := gst.NewBin("seg-demux-bin")
24
25 appSrc, err := gst.NewElementWithProperties("appsrc", map[string]interface{}{
26 "name": "concat-appsrc",
27 })
28 if err != nil {
29 return nil, fmt.Errorf("failed to create appsrc element: %w", err)
30 }
31 err = bin.Add(appSrc)
32 if err != nil {
33 return nil, fmt.Errorf("failed to add appsrc to bin: %w", err)
34 }
35
36 demux, err := gst.NewElementWithProperties("qtdemux", map[string]interface{}{
37 "name": "concat-demux",
38 })
39 if err != nil {
40 return nil, fmt.Errorf("failed to create qtdemux element: %w", err)
41 }
42 err = bin.Add(demux)
43 if err != nil {
44 return nil, fmt.Errorf("failed to add qtdemux to bin: %w", err)
45 }
46
47 err = appSrc.Link(demux)
48 if err != nil {
49 return nil, fmt.Errorf("failed to link appsrc to qtdemux: %w", err)
50 }
51
52 tmpl := demux.GetPadTemplates()
53 if tmpl == nil {
54 return nil, fmt.Errorf("pad templates not found")
55 }
56
57 mq, err := gst.NewElementWithProperties("multiqueue", map[string]interface{}{
58 "name": "concat-demux-multiqueue",
59 // "max-size-time": uint(0), // default: 2000000000, 2 seconds
60 // "max-size-bytes": uint(0), // default: 10485760, 10MiB
61 // "max-size-buffers": uint(0), // default: 5, 5 buffers
62 })
63 if err != nil {
64 return nil, fmt.Errorf("failed to create multiqueue element: %w", err)
65 }
66 err = bin.Add(mq)
67 if err != nil {
68 return nil, fmt.Errorf("failed to add multiqueue to bin: %w", err)
69 }
70 // err = mq.SetProperty("max-size-time", uint64(200000000000))
71 // if err != nil {
72 // return nil, fmt.Errorf("failed to set max-size-time: %w", err)
73 // }
74 // err = mq.SetProperty("max-size-bytes", uint(1048576000))
75 // if err != nil {
76 // return nil, fmt.Errorf("failed to set max-size-bytes: %w", err)
77 // }
78 // err = mq.SetProperty("max-size-buffers", uint(500))
79 // if err != nil {
80 // return nil, fmt.Errorf("failed to set max-size-buffers: %w", err)
81 // }
82
83 opusparse, err := gst.NewElementWithProperties("opusparse", map[string]interface{}{
84 "name": "concat-demux-opusparse",
85 "disable-passthrough": true,
86 })
87 if err != nil {
88 return nil, fmt.Errorf("failed to create opusparse element: %w", err)
89 }
90 err = bin.Add(opusparse)
91 if err != nil {
92 return nil, fmt.Errorf("failed to add opusparse to bin: %w", err)
93 }
94 opusparseSinkPad := opusparse.GetStaticPad("sink")
95 if opusparseSinkPad == nil {
96 return nil, fmt.Errorf("failed to get opusparse sink pad")
97 }
98 opusparseSrcPad := opusparse.GetStaticPad("src")
99 if opusparseSrcPad == nil {
100 return nil, fmt.Errorf("failed to get opusparse source pad")
101 }
102
103 mqVideoSink := mq.GetRequestPad("sink_%u")
104 if mqVideoSink == nil {
105 return nil, fmt.Errorf("video sink pad not found")
106 }
107
108 mqAudioSink := mq.GetRequestPad("sink_%u")
109 if mqAudioSink == nil {
110 return nil, fmt.Errorf("audio sink pad not found")
111 }
112
113 mqVideoSrc := mq.GetStaticPad("src_0")
114 if mqVideoSrc == nil {
115 return nil, fmt.Errorf("video source pad not found")
116 }
117
118 mqAudioSrc := mq.GetStaticPad("src_1")
119 if mqAudioSrc == nil {
120 return nil, fmt.Errorf("audio source pad not found")
121 }
122
123 linked := mqAudioSrc.Link(opusparseSinkPad)
124 if linked != gst.PadLinkOK {
125 return nil, fmt.Errorf("failed to link opusparse sink pad to mq audio sink pad")
126 }
127
128 var videoGhost *gst.GhostPad
129 if doH264Parse {
130 h264parse, err := gst.NewElementWithProperties("h264parse", map[string]interface{}{
131 "name": "concat-demux-h264parse",
132 "config-interval": -1,
133 "disable-passthrough": true,
134 })
135 if err != nil {
136 return nil, fmt.Errorf("failed to create h264parse element: %w", err)
137 }
138 err = bin.Add(h264parse)
139 if err != nil {
140 return nil, fmt.Errorf("failed to add h264parse to bin: %w", err)
141 }
142 h264parseSinkPad := h264parse.GetStaticPad("sink")
143 if h264parseSinkPad == nil {
144 return nil, fmt.Errorf("failed to get h264parse sink pad")
145 }
146 h264parseSrcPad := h264parse.GetStaticPad("src")
147 if h264parseSrcPad == nil {
148 return nil, fmt.Errorf("failed to get h264parse source pad")
149 }
150 linked := mqVideoSrc.Link(h264parseSinkPad)
151 if linked != gst.PadLinkOK {
152 return nil, fmt.Errorf("failed to link h264parse sink pad to mq video sink pad")
153 }
154
155 videoGhost = gst.NewGhostPad("video_0", h264parseSrcPad)
156 if videoGhost == nil {
157 return nil, fmt.Errorf("failed to create video ghost pad")
158 }
159 } else {
160 videoGhost = gst.NewGhostPad("video_0", mqVideoSrc)
161 if videoGhost == nil {
162 return nil, fmt.Errorf("failed to create video ghost pad")
163 }
164 }
165
166 audioGhost := gst.NewGhostPad("audio_0", opusparseSrcPad)
167 if audioGhost == nil {
168 return nil, fmt.Errorf("failed to create audio ghost pad")
169 }
170
171 needed := 2
172
173 var padAdded func(self *gst.Element, pad *gst.Pad)
174 // the defer funcs are needed to avoid leaking pads for some reason
175 padAdded = func(self *gst.Element, pad *gst.Pad) {
176 log.Debug(ctx, "demux pad-added", "name", pad.GetName(), "direction", pad.GetDirection())
177 var downstreamPad *gst.Pad
178 if strings.HasPrefix(pad.GetName(), "video_") {
179 downstreamPad = mqVideoSink
180 } else if strings.HasPrefix(pad.GetName(), "audio_") {
181 downstreamPad = mqAudioSink
182 } else {
183 log.Error(ctx, "unknown pad", "name", pad.GetName(), "direction", pad.GetDirection())
184 // cancel()
185 return
186 }
187 ret := pad.Link(downstreamPad)
188 if ret != gst.PadLinkOK {
189 log.Error(ctx, "failed to link demux to downstream pad", "name", pad.GetName(), "direction", pad.GetDirection(), "error", ret)
190 // cancel()
191 return
192 }
193 needed--
194 if needed == 0 {
195 padAdded = doNothing
196 }
197 }
198 outerPadAdded := func(self *gst.Element, pad *gst.Pad) {
199 padAdded(self, pad)
200 }
201
202 // Necessary to avoid leaking `mqVideoSink` and `mqAudioSink` from the
203 // pad-added function in the case where we hit invalid data and
204 // pad-added never fires.
205 go func() {
206 <-ctx.Done()
207 padAdded = doNothing
208 }()
209
210 _, err = demux.Connect("pad-added", outerPadAdded)
211 if err != nil {
212 return nil, fmt.Errorf("failed to connect demux pad-added signal: %w", err)
213 }
214
215 ok := bin.AddPad(videoGhost.Pad)
216 if !ok {
217 return nil, fmt.Errorf("failed to add video ghost pad to bin")
218 }
219
220 ok = bin.AddPad(audioGhost.Pad)
221 if !ok {
222 return nil, fmt.Errorf("failed to add audio ghost pad to bin")
223 }
224
225 src := app.SrcFromElement(appSrc)
226 src.SetCallbacks(&app.SourceCallbacks{
227 NeedDataFunc: ReaderNeedData(ctx, bytes.NewReader(seg.Data)),
228 })
229
230 return bin, nil
231}