Live video on the AT Protocol
1package media
2
3import (
4 "bytes"
5 "context"
6 "errors"
7 "fmt"
8 "io"
9 "strings"
10 "sync"
11
12 "github.com/go-gst/go-gst/gst"
13 "github.com/go-gst/go-gst/gst/app"
14 "stream.place/streamplace/pkg/bus"
15 "stream.place/streamplace/pkg/log"
16)
17
18type ConcatStreamer interface {
19 SubscribeSegment(ctx context.Context, user string, rendition string) *bus.SegChan
20 UnsubscribeSegment(ctx context.Context, user string, rendition string, ch *bus.SegChan)
21}
22
23// This function remains in scope for the duration of a single users' playback
24func ConcatStream(ctx context.Context, pipeline *gst.Pipeline, user string, rendition string, streamer ConcatStreamer) (*gst.Element, <-chan struct{}, error) {
25 ctx = log.WithLogValues(ctx, "func", "ConcatStream")
26 ctx, cancel := context.WithCancel(ctx)
27 defer cancel()
28
29 // make 1000000000000 elements!
30
31 // input multiqueue
32 inputQueue, err := gst.NewElementWithProperties("multiqueue", map[string]any{})
33 if err != nil {
34 return nil, nil, fmt.Errorf("failed to create multiqueue element: %w", err)
35 }
36 err = pipeline.Add(inputQueue)
37 if err != nil {
38 return nil, nil, fmt.Errorf("failed to add input multiqueue to pipeline: %w", err)
39 }
40 inputQueuePadVideoSink := inputQueue.GetRequestPad("sink_%u")
41 if inputQueuePadVideoSink == nil {
42 return nil, nil, fmt.Errorf("failed to get input queue video sink pad")
43 }
44 inputQueuePadAudioSink := inputQueue.GetRequestPad("sink_%u")
45 if inputQueuePadAudioSink == nil {
46 return nil, nil, fmt.Errorf("failed to get input queue audio sink pad")
47 }
48 inputQueuePadVideoSrc := inputQueue.GetStaticPad("src_0")
49 if inputQueuePadVideoSrc == nil {
50 return nil, nil, fmt.Errorf("failed to get input queue video src pad")
51 }
52 inputQueuePadAudioSrc := inputQueue.GetStaticPad("src_1")
53 if inputQueuePadAudioSrc == nil {
54 return nil, nil, fmt.Errorf("failed to get input queue audio src pad")
55 }
56 // streamsynchronizer
57 streamsynchronizer, err := gst.NewElementWithProperties("streamsynchronizer", map[string]any{})
58 if err != nil {
59 return nil, nil, fmt.Errorf("failed to create streamsynchronizer element: %w", err)
60 }
61
62 err = pipeline.Add(streamsynchronizer)
63 if err != nil {
64 return nil, nil, fmt.Errorf("failed to add streamsynchronizer to pipeline: %w", err)
65 }
66 syncPadVideoSink := streamsynchronizer.GetRequestPad("sink_%u")
67 if syncPadVideoSink == nil {
68 return nil, nil, fmt.Errorf("failed to get sync video sink pad")
69 }
70 syncPadAudioSink := streamsynchronizer.GetRequestPad("sink_%u")
71 if syncPadAudioSink == nil {
72 return nil, nil, fmt.Errorf("failed to get sync audio sink pad")
73 }
74 syncPadVideoSrc := streamsynchronizer.GetStaticPad("src_0")
75 if syncPadVideoSrc == nil {
76 return nil, nil, fmt.Errorf("failed to get sync video src pad")
77 }
78 syncPadAudioSrc := streamsynchronizer.GetStaticPad("src_1")
79 if syncPadAudioSrc == nil {
80 return nil, nil, fmt.Errorf("failed to get sync audio src pad")
81 }
82
83 // output multiqueue
84 outputQueue, err := gst.NewElementWithProperties("multiqueue", map[string]any{
85 "name": "concat-output-queue",
86 })
87 if err != nil {
88 return nil, nil, fmt.Errorf("failed to create multiqueue element: %w", err)
89 }
90 err = pipeline.Add(outputQueue)
91 if err != nil {
92 return nil, nil, fmt.Errorf("failed to add output multiqueue to pipeline: %w", err)
93 }
94 outputQueuePadVideoSink := outputQueue.GetRequestPad("sink_%u")
95 if outputQueuePadVideoSink == nil {
96 return nil, nil, fmt.Errorf("failed to get output queue video sink pad")
97 }
98 outputQueuePadAudioSink := outputQueue.GetRequestPad("sink_%u")
99 if outputQueuePadAudioSink == nil {
100 return nil, nil, fmt.Errorf("failed to get output queue audio sink pad")
101 }
102 // linking
103
104 // input queue to streamsynchronizer
105 ret := inputQueuePadVideoSrc.Link(syncPadVideoSink)
106 if ret != gst.PadLinkOK {
107 return nil, nil, fmt.Errorf("failed to link multiqueue to streamsynchronizer: %v", ret)
108 }
109 ret = inputQueuePadAudioSrc.Link(syncPadAudioSink)
110 if ret != gst.PadLinkOK {
111 return nil, nil, fmt.Errorf("failed to link multiqueue to streamsynchronizer: %v", ret)
112 }
113
114 // streamsynchronizer to output queue
115 ret = syncPadVideoSrc.Link(outputQueuePadVideoSink)
116 if ret != gst.PadLinkOK {
117 return nil, nil, fmt.Errorf("failed to link streamsynchronizer to output queue: %v", ret)
118 }
119 ret = syncPadAudioSrc.Link(outputQueuePadAudioSink)
120 if ret != gst.PadLinkOK {
121 return nil, nil, fmt.Errorf("failed to link streamsynchronizer to output queue: %v", ret)
122 }
123
124 // ok now we can start looping over input files
125
126 // this goroutine will read all the files from the segment queue and buffer
127 // them in a pipe so that we don't miss any in between iterations of the output
128 allFiles := make(chan []byte, 1024)
129 go func() {
130 ch := streamer.SubscribeSegment(ctx, user, rendition)
131 defer streamer.UnsubscribeSegment(ctx, user, rendition, ch)
132 for {
133 select {
134 case <-ctx.Done():
135 log.Debug(ctx, "exiting segment reader")
136 return
137 case file := <-ch.C:
138 log.Debug(ctx, "got segment", "file", file.Filepath)
139 allFiles <- file.Data
140 if len(file.Data) == 0 {
141 log.Warn(ctx, "no more segments, stopping segment reader")
142 return
143 }
144 }
145 }
146 }()
147
148 segCount := 0
149
150 // nextFile is the primary loop that pops off a file, creates new demuxer elements for it,
151 // and pushes into the pipeline
152 var nextFile func()
153 nextFile = func() {
154 mySegCount := segCount
155 segCount += 1
156 segDone := make(chan struct{})
157 log.Debug(ctx, "moving to next file", "segCount", mySegCount)
158 pr, pw := io.Pipe()
159 go func() {
160 select {
161 case <-ctx.Done():
162 pr.Close()
163 pw.Close()
164 return
165 case bs := <-allFiles:
166 if len(bs) == 0 {
167 log.Warn(ctx, "no more segments, ending stream")
168 pr.Close()
169 pw.Close()
170 cancel()
171 return
172 }
173 _, err = io.Copy(pw, bytes.NewReader(bs))
174 if err != nil {
175 log.Error(ctx, "failed to copy segment file", "error", err)
176 cancel()
177 return
178 }
179 return
180 }
181 }()
182
183 demux, err := gst.NewElementWithProperties("qtdemux", map[string]any{
184 "name": fmt.Sprintf("concat-demux-%d", mySegCount),
185 })
186 if err != nil {
187 log.Error(ctx, "failed to create demux element", "error", err)
188 cancel()
189 return
190 }
191
192 err = pipeline.Add(demux)
193 if err != nil {
194 log.Error(ctx, "failed to add demux to pipeline", "error", err)
195 cancel()
196 return
197 }
198
199 demuxSinkPad := demux.GetStaticPad("sink")
200 if demuxSinkPad == nil {
201 log.Error(ctx, "failed to get demux sink pad")
202 cancel()
203 return
204 }
205
206 mu := sync.Mutex{}
207 count := 0
208 _, err = demux.Connect("pad-added", func(self *gst.Element, pad *gst.Pad) {
209 mu.Lock()
210 count += 1
211 mu.Unlock()
212 log.Debug(ctx, "demux pad-added", "name", pad.GetName(), "direction", pad.GetDirection())
213 var downstreamPad *gst.Pad
214 if strings.HasPrefix(pad.GetName(), "video_") {
215 downstreamPad = inputQueuePadVideoSink
216 } else if strings.HasPrefix(pad.GetName(), "audio_") {
217 downstreamPad = inputQueuePadAudioSink
218 } else {
219 log.Error(ctx, "unknown pad", "name", pad.GetName(), "direction", pad.GetDirection())
220 cancel()
221 return
222 }
223 ret := pad.Link(downstreamPad)
224 if ret != gst.PadLinkOK {
225 log.Error(ctx, "failed to link demux to downstream pad", "name", pad.GetName(), "direction", pad.GetDirection(), "error", ret)
226 cancel()
227 return
228 }
229 if pad.GetDirection() == gst.PadDirectionSource {
230 pad.AddProbe(gst.PadProbeTypeEventBoth, func(pad *gst.Pad, info *gst.PadProbeInfo) gst.PadProbeReturn {
231 if info.GetEvent().Type() != gst.EventTypeEOS {
232 return gst.PadProbeOK
233 }
234 log.Debug(ctx, "demux EOS", "name", pad.GetName(), "direction", pad.GetDirection())
235 pad.Unlink(downstreamPad)
236 mu.Lock()
237 defer mu.Unlock()
238 count -= 1
239
240 if count == 0 {
241 // don't keep going if our context is done
242 if ctx.Err() == nil {
243 go nextFile()
244 segDone <- struct{}{}
245 }
246 } else {
247 log.Debug(ctx, "demux has more pads, waiting for them to close")
248 }
249 return gst.PadProbeRemove
250 })
251 }
252 })
253 if err != nil {
254 log.Error(ctx, "failed to connect demux pad-added", "error", err)
255 cancel()
256 return
257 }
258
259 appsrc, err := gst.NewElementWithProperties("appsrc", map[string]any{
260 "name": fmt.Sprintf("concat-appsrc-%d", mySegCount),
261 "is-live": true,
262 })
263 if err != nil {
264 log.Error(ctx, "failed to get appsrc element from pipeline", "error", err)
265 cancel()
266 return
267 }
268
269 src := app.SrcFromElement(appsrc)
270
271 appSrcPad := appsrc.GetStaticPad("src")
272 if appSrcPad == nil {
273 log.Error(ctx, "failed to get appsrc pad")
274 cancel()
275 return
276 }
277
278 done := func() {
279 // appsrc.Unlink(demux)
280 pads, err := src.GetPads()
281 if err != nil {
282 log.Error(ctx, "failed to get pads", "error", err)
283 cancel()
284 return
285 }
286 for _, pad := range pads {
287 log.Debug(ctx, "setting pad-idle", "name", pad.GetName(), "direction", pad.GetDirection())
288
289 pad.AddProbe(gst.PadProbeTypeIdle, func(pad *gst.Pad, info *gst.PadProbeInfo) gst.PadProbeReturn {
290 log.Debug(ctx, "pad-idle", "name", pad.GetName(), "direction", pad.GetDirection())
291 src.EndStream()
292 return gst.PadProbeRemove
293 })
294 }
295 }
296
297 src.SetAutomaticEOS(false)
298 src.SetCallbacks(&app.SourceCallbacks{
299 NeedDataFunc: func(self *app.Source, length uint) {
300 bs := make([]byte, length)
301 read, err := pr.Read(bs)
302 if err != nil {
303 if errors.Is(err, io.EOF) {
304 if read > 0 {
305 log.Debug(ctx, "got data on eof???")
306 cancel()
307 return
308 }
309 log.Debug(ctx, "EOF, ending segment", "length", read)
310 done()
311 return
312 } else {
313 log.Debug(ctx, "failed to read data, ending stream", "error", err)
314 cancel()
315 return
316 }
317 }
318 toPush := bs
319 if uint(read) < length {
320 toPush = bs[:read]
321 }
322 buffer := gst.NewBufferWithSize(int64(len(toPush)))
323 buffer.Map(gst.MapWrite).WriteData(toPush)
324 defer buffer.Unmap()
325 self.PushBuffer(buffer)
326
327 if uint(read) < length {
328 log.Debug(ctx, "short write, ending segment", "length", read)
329 done()
330 }
331 },
332 })
333 err = pipeline.Add(appsrc)
334 if err != nil {
335 log.Error(ctx, "failed to add appsrc to pipeline", "error", err)
336 cancel()
337 return
338 }
339
340 ret := appSrcPad.Link(demuxSinkPad)
341 if ret != gst.PadLinkOK {
342 log.Error(ctx, "failed to link appsrc to demux", "error", ret)
343 cancel()
344 return
345 }
346
347 err = demux.SetState(gst.StatePlaying)
348 if err != nil {
349 log.Error(ctx, "failed to set demux state", "error", err)
350 cancel()
351 return
352 }
353 err = appsrc.SetState(gst.StatePlaying)
354 if err != nil {
355 log.Error(ctx, "failed to set appsrc state", "error", err)
356 cancel()
357 return
358 }
359
360 select {
361 case <-ctx.Done():
362 return
363 case <-segDone:
364 }
365
366 log.Debug(ctx, "ending segment")
367 if err := demux.SetState(gst.StateNull); err != nil {
368 log.Error(ctx, "failed to set demux state", "error", err)
369 return
370 }
371 src.SetCallbacks(&app.SourceCallbacks{})
372 if err := appsrc.SetState(gst.StateNull); err != nil {
373 log.Error(ctx, "failed to set appsrc state", "error", err)
374 return
375 }
376 if err := pipeline.Remove(demux); err != nil {
377 log.Error(ctx, "failed to remove demux from pipleine", "error", err)
378 return
379 }
380 if err := pipeline.Remove(appsrc); err != nil {
381 log.Error(ctx, "failed to remove appsrc from pipleine", "error", err)
382 return
383 }
384 pr.Close()
385 pw.Close()
386 }
387
388 // fire it up!
389 go nextFile()
390
391 return outputQueue, ctx.Done(), nil
392}