Live video on the AT Protocol
1package media
2
3import (
4 "bytes"
5 "context"
6 "errors"
7 "fmt"
8 "io"
9 "strings"
10 "sync"
11
12 "github.com/go-gst/go-gst/gst"
13 "github.com/go-gst/go-gst/gst/app"
14 "stream.place/streamplace/pkg/log"
15 "stream.place/streamplace/pkg/media/segchanman"
16)
17
18type ConcatStreamer interface {
19 SubscribeSegment(ctx context.Context, user string, rendition string) <-chan *segchanman.Seg
20 UnsubscribeSegment(ctx context.Context, user string, rendition string, ch <-chan *segchanman.Seg)
21}
22
23// This function remains in scope for the duration of a single users' playback
24func ConcatStream(ctx context.Context, pipeline *gst.Pipeline, user string, rendition string, streamer ConcatStreamer) (*gst.Element, <-chan struct{}, error) {
25 ctx = log.WithLogValues(ctx, "func", "ConcatStream")
26 ctx, cancel := context.WithCancel(ctx)
27
28 // make 1000000000000 elements!
29
30 // input multiqueue
31 inputQueue, err := gst.NewElementWithProperties("multiqueue", map[string]any{})
32 if err != nil {
33 return nil, nil, fmt.Errorf("failed to create multiqueue element: %w", err)
34 }
35 err = pipeline.Add(inputQueue)
36 if err != nil {
37 return nil, nil, fmt.Errorf("failed to add input multiqueue to pipeline: %w", err)
38 }
39 inputQueuePadVideoSink := inputQueue.GetRequestPad("sink_%u")
40 if inputQueuePadVideoSink == nil {
41 return nil, nil, fmt.Errorf("failed to get input queue video sink pad")
42 }
43 inputQueuePadAudioSink := inputQueue.GetRequestPad("sink_%u")
44 if inputQueuePadAudioSink == nil {
45 return nil, nil, fmt.Errorf("failed to get input queue audio sink pad")
46 }
47 inputQueuePadVideoSrc := inputQueue.GetStaticPad("src_0")
48 if inputQueuePadVideoSrc == nil {
49 return nil, nil, fmt.Errorf("failed to get input queue video src pad")
50 }
51 inputQueuePadAudioSrc := inputQueue.GetStaticPad("src_1")
52 if inputQueuePadAudioSrc == nil {
53 return nil, nil, fmt.Errorf("failed to get input queue audio src pad")
54 }
55 // streamsynchronizer
56 streamsynchronizer, err := gst.NewElementWithProperties("streamsynchronizer", map[string]any{})
57 if err != nil {
58 return nil, nil, fmt.Errorf("failed to create streamsynchronizer element: %w", err)
59 }
60
61 err = pipeline.Add(streamsynchronizer)
62 if err != nil {
63 return nil, nil, fmt.Errorf("failed to add streamsynchronizer to pipeline: %w", err)
64 }
65 syncPadVideoSink := streamsynchronizer.GetRequestPad("sink_%u")
66 if syncPadVideoSink == nil {
67 return nil, nil, fmt.Errorf("failed to get sync video sink pad")
68 }
69 syncPadAudioSink := streamsynchronizer.GetRequestPad("sink_%u")
70 if syncPadAudioSink == nil {
71 return nil, nil, fmt.Errorf("failed to get sync audio sink pad")
72 }
73 syncPadVideoSrc := streamsynchronizer.GetStaticPad("src_0")
74 if syncPadVideoSrc == nil {
75 return nil, nil, fmt.Errorf("failed to get sync video src pad")
76 }
77 syncPadAudioSrc := streamsynchronizer.GetStaticPad("src_1")
78 if syncPadAudioSrc == nil {
79 return nil, nil, fmt.Errorf("failed to get sync audio src pad")
80 }
81
82 // output multiqueue
83 outputQueue, err := gst.NewElementWithProperties("multiqueue", map[string]any{
84 "name": "concat-output-queue",
85 })
86 if err != nil {
87 return nil, nil, fmt.Errorf("failed to create multiqueue element: %w", err)
88 }
89 err = pipeline.Add(outputQueue)
90 if err != nil {
91 return nil, nil, fmt.Errorf("failed to add output multiqueue to pipeline: %w", err)
92 }
93 outputQueuePadVideoSink := outputQueue.GetRequestPad("sink_%u")
94 if outputQueuePadVideoSink == nil {
95 return nil, nil, fmt.Errorf("failed to get output queue video sink pad")
96 }
97 outputQueuePadAudioSink := outputQueue.GetRequestPad("sink_%u")
98 if outputQueuePadAudioSink == nil {
99 return nil, nil, fmt.Errorf("failed to get output queue audio sink pad")
100 }
101 // linking
102
103 // input queue to streamsynchronizer
104 ret := inputQueuePadVideoSrc.Link(syncPadVideoSink)
105 if ret != gst.PadLinkOK {
106 return nil, nil, fmt.Errorf("failed to link multiqueue to streamsynchronizer: %v", ret)
107 }
108 ret = inputQueuePadAudioSrc.Link(syncPadAudioSink)
109 if ret != gst.PadLinkOK {
110 return nil, nil, fmt.Errorf("failed to link multiqueue to streamsynchronizer: %v", ret)
111 }
112
113 // streamsynchronizer to output queue
114 ret = syncPadVideoSrc.Link(outputQueuePadVideoSink)
115 if ret != gst.PadLinkOK {
116 return nil, nil, fmt.Errorf("failed to link streamsynchronizer to output queue: %v", ret)
117 }
118 ret = syncPadAudioSrc.Link(outputQueuePadAudioSink)
119 if ret != gst.PadLinkOK {
120 return nil, nil, fmt.Errorf("failed to link streamsynchronizer to output queue: %v", ret)
121 }
122
123 // ok now we can start looping over input files
124
125 // this goroutine will read all the files from the segment queue and buffer
126 // them in a pipe so that we don't miss any in between iterations of the output
127 allFiles := make(chan []byte, 1024)
128 go func() {
129 ch := streamer.SubscribeSegment(ctx, user, rendition)
130 defer streamer.UnsubscribeSegment(ctx, user, rendition, ch)
131 for {
132 select {
133 case <-ctx.Done():
134 log.Debug(ctx, "exiting segment reader")
135 return
136 case file := <-ch:
137 log.Debug(ctx, "got segment", "file", file.Filepath)
138 allFiles <- file.Data
139 if len(file.Data) == 0 {
140 log.Warn(ctx, "no more segments, stopping segment reader")
141 return
142 }
143 }
144 }
145 }()
146
147 segCount := 0
148
149 // nextFile is the primary loop that pops off a file, creates new demuxer elements for it,
150 // and pushes into the pipeline
151 var nextFile func()
152 nextFile = func() {
153 mySegCount := segCount
154 segCount += 1
155 segDone := make(chan struct{})
156 log.Debug(ctx, "moving to next file", "segCount", mySegCount)
157 pr, pw := io.Pipe()
158 go func() {
159 select {
160 case <-ctx.Done():
161 pr.Close()
162 pw.Close()
163 return
164 case bs := <-allFiles:
165 if len(bs) == 0 {
166 log.Warn(ctx, "no more segments, ending stream")
167 pr.Close()
168 pw.Close()
169 cancel()
170 return
171 }
172 _, err = io.Copy(pw, bytes.NewReader(bs))
173 if err != nil {
174 log.Error(ctx, "failed to copy segment file", "error", err)
175 cancel()
176 return
177 }
178 return
179 }
180 }()
181
182 demux, err := gst.NewElementWithProperties("qtdemux", map[string]any{
183 "name": fmt.Sprintf("concat-demux-%d", mySegCount),
184 })
185 if err != nil {
186 log.Error(ctx, "failed to create demux element", "error", err)
187 cancel()
188 return
189 }
190
191 err = pipeline.Add(demux)
192 if err != nil {
193 log.Error(ctx, "failed to add demux to pipeline", "error", err)
194 cancel()
195 return
196 }
197
198 demuxSinkPad := demux.GetStaticPad("sink")
199 if demuxSinkPad == nil {
200 log.Error(ctx, "failed to get demux sink pad")
201 cancel()
202 return
203 }
204
205 mu := sync.Mutex{}
206 count := 0
207 _, err = demux.Connect("pad-added", func(self *gst.Element, pad *gst.Pad) {
208 mu.Lock()
209 count += 1
210 mu.Unlock()
211 log.Debug(ctx, "demux pad-added", "name", pad.GetName(), "direction", pad.GetDirection())
212 var downstreamPad *gst.Pad
213 if strings.HasPrefix(pad.GetName(), "video_") {
214 downstreamPad = inputQueuePadVideoSink
215 } else if strings.HasPrefix(pad.GetName(), "audio_") {
216 downstreamPad = inputQueuePadAudioSink
217 } else {
218 log.Error(ctx, "unknown pad", "name", pad.GetName(), "direction", pad.GetDirection())
219 cancel()
220 return
221 }
222 ret := pad.Link(downstreamPad)
223 if ret != gst.PadLinkOK {
224 log.Error(ctx, "failed to link demux to downstream pad", "name", pad.GetName(), "direction", pad.GetDirection(), "error", ret)
225 cancel()
226 return
227 }
228 if pad.GetDirection() == gst.PadDirectionSource {
229 pad.AddProbe(gst.PadProbeTypeEventBoth, func(pad *gst.Pad, info *gst.PadProbeInfo) gst.PadProbeReturn {
230 if info.GetEvent().Type() != gst.EventTypeEOS {
231 return gst.PadProbeOK
232 }
233 log.Debug(ctx, "demux EOS", "name", pad.GetName(), "direction", pad.GetDirection())
234 pad.Unlink(downstreamPad)
235 mu.Lock()
236 defer mu.Unlock()
237 count -= 1
238
239 if count == 0 {
240 // don't keep going if our context is done
241 if ctx.Err() == nil {
242 go nextFile()
243 segDone <- struct{}{}
244 }
245 } else {
246 log.Debug(ctx, "demux has more pads, waiting for them to close")
247 }
248 return gst.PadProbeRemove
249 })
250 }
251 })
252 if err != nil {
253 log.Error(ctx, "failed to connect demux pad-added", "error", err)
254 cancel()
255 return
256 }
257
258 appsrc, err := gst.NewElementWithProperties("appsrc", map[string]any{
259 "name": fmt.Sprintf("concat-appsrc-%d", mySegCount),
260 "is-live": true,
261 })
262 if err != nil {
263 log.Error(ctx, "failed to get appsrc element from pipeline", "error", err)
264 cancel()
265 return
266 }
267
268 src := app.SrcFromElement(appsrc)
269
270 appSrcPad := appsrc.GetStaticPad("src")
271 if appSrcPad == nil {
272 log.Error(ctx, "failed to get appsrc pad")
273 cancel()
274 return
275 }
276
277 done := func() {
278 // appsrc.Unlink(demux)
279 pads, err := src.GetPads()
280 if err != nil {
281 log.Error(ctx, "failed to get pads", "error", err)
282 cancel()
283 return
284 }
285 for _, pad := range pads {
286 log.Debug(ctx, "setting pad-idle", "name", pad.GetName(), "direction", pad.GetDirection())
287
288 pad.AddProbe(gst.PadProbeTypeIdle, func(pad *gst.Pad, info *gst.PadProbeInfo) gst.PadProbeReturn {
289 log.Debug(ctx, "pad-idle", "name", pad.GetName(), "direction", pad.GetDirection())
290 src.EndStream()
291 return gst.PadProbeRemove
292 })
293 }
294 }
295
296 src.SetAutomaticEOS(false)
297 src.SetCallbacks(&app.SourceCallbacks{
298 NeedDataFunc: func(self *app.Source, length uint) {
299 bs := make([]byte, length)
300 read, err := pr.Read(bs)
301 if err != nil {
302 if errors.Is(err, io.EOF) {
303 if read > 0 {
304 log.Debug(ctx, "got data on eof???")
305 cancel()
306 return
307 }
308 log.Debug(ctx, "EOF, ending segment", "length", read)
309 done()
310 return
311 } else {
312 log.Debug(ctx, "failed to read data, ending stream", "error", err)
313 cancel()
314 return
315 }
316 }
317 toPush := bs
318 if uint(read) < length {
319 toPush = bs[:read]
320 }
321 buffer := gst.NewBufferWithSize(int64(len(toPush)))
322 buffer.Map(gst.MapWrite).WriteData(toPush)
323 defer buffer.Unmap()
324 self.PushBuffer(buffer)
325
326 if uint(read) < length {
327 log.Debug(ctx, "short write, ending segment", "length", read)
328 done()
329 }
330 },
331 })
332 err = pipeline.Add(appsrc)
333 if err != nil {
334 log.Error(ctx, "failed to add appsrc to pipeline", "error", err)
335 cancel()
336 return
337 }
338
339 ret := appSrcPad.Link(demuxSinkPad)
340 if ret != gst.PadLinkOK {
341 log.Error(ctx, "failed to link appsrc to demux", "error", ret)
342 cancel()
343 return
344 }
345
346 err = demux.SetState(gst.StatePlaying)
347 if err != nil {
348 log.Error(ctx, "failed to set demux state", "error", err)
349 cancel()
350 return
351 }
352 err = appsrc.SetState(gst.StatePlaying)
353 if err != nil {
354 log.Error(ctx, "failed to set appsrc state", "error", err)
355 cancel()
356 return
357 }
358
359 select {
360 case <-ctx.Done():
361 return
362 case <-segDone:
363 }
364
365 log.Debug(ctx, "ending segment")
366 demux.SetState(gst.StateNull)
367 src.SetCallbacks(&app.SourceCallbacks{})
368 appsrc.SetState(gst.StateNull)
369 pipeline.Remove(demux)
370 pipeline.Remove(appsrc)
371 pr.Close()
372 pw.Close()
373 }
374
375 // fire it up!
376 go nextFile()
377
378 return outputQueue, ctx.Done(), nil
379}