Live video on the AT Protocol
1package media
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "strings"
8
9 "github.com/go-gst/go-gst/gst"
10 "github.com/go-gst/go-gst/gst/app"
11 "stream.place/streamplace/pkg/bus"
12 "stream.place/streamplace/pkg/log"
13)
14
15// silly technique to avoid leaking pads
16func doNothing(self *gst.Element, pad *gst.Pad) {}
17
18// Function for demuxing a single segment. Needs to be handled very carefully.
19// In particular: users of this MUST cancel the passed context when they're
20// done with the bin.
21func ConcatDemuxBin(ctx context.Context, seg *bus.Seg) (*gst.Bin, error) {
22 ctx = log.WithLogValues(ctx, "func", "SegDemuxBin")
23 bin := gst.NewBin("seg-demux-bin")
24
25 appSrc, err := gst.NewElementWithProperties("appsrc", map[string]interface{}{
26 "name": "concat-appsrc",
27 })
28 if err != nil {
29 return nil, fmt.Errorf("failed to create appsrc element: %w", err)
30 }
31 err = bin.Add(appSrc)
32 if err != nil {
33 return nil, fmt.Errorf("failed to add appsrc to bin: %w", err)
34 }
35
36 demux, err := gst.NewElementWithProperties("qtdemux", map[string]interface{}{
37 "name": "concat-demux",
38 })
39 if err != nil {
40 return nil, fmt.Errorf("failed to create qtdemux element: %w", err)
41 }
42 err = bin.Add(demux)
43 if err != nil {
44 return nil, fmt.Errorf("failed to add qtdemux to bin: %w", err)
45 }
46
47 err = appSrc.Link(demux)
48 if err != nil {
49 return nil, fmt.Errorf("failed to link appsrc to qtdemux: %w", err)
50 }
51
52 tmpl := demux.GetPadTemplates()
53 if tmpl == nil {
54 return nil, fmt.Errorf("pad templates not found")
55 }
56
57 mq, err := gst.NewElementWithProperties("multiqueue", map[string]interface{}{
58 "name": "concat-demux-multiqueue",
59 })
60 if err != nil {
61 return nil, fmt.Errorf("failed to create multiqueue element: %w", err)
62 }
63 err = bin.Add(mq)
64 if err != nil {
65 return nil, fmt.Errorf("failed to add multiqueue to bin: %w", err)
66 }
67
68 mqVideoSink := mq.GetRequestPad("sink_%u")
69 if mqVideoSink == nil {
70 return nil, fmt.Errorf("video sink pad not found")
71 }
72
73 mqAudioSink := mq.GetRequestPad("sink_%u")
74 if mqAudioSink == nil {
75 return nil, fmt.Errorf("audio sink pad not found")
76 }
77
78 mqVideoSrc := mq.GetStaticPad("src_0")
79 if mqVideoSrc == nil {
80 return nil, fmt.Errorf("video source pad not found")
81 }
82
83 mqAudioSrc := mq.GetStaticPad("src_1")
84 if mqAudioSrc == nil {
85 return nil, fmt.Errorf("audio source pad not found")
86 }
87
88 videoGhost := gst.NewGhostPad("video_0", mqVideoSrc)
89 if videoGhost == nil {
90 return nil, fmt.Errorf("failed to create video ghost pad")
91 }
92
93 audioGhost := gst.NewGhostPad("audio_0", mqAudioSrc)
94 if audioGhost == nil {
95 return nil, fmt.Errorf("failed to create audio ghost pad")
96 }
97
98 needed := 2
99
100 var padAdded func(self *gst.Element, pad *gst.Pad)
101 // the defer funcs are needed to avoid leaking pads for some reason
102 padAdded = func(self *gst.Element, pad *gst.Pad) {
103 log.Debug(ctx, "demux pad-added", "name", pad.GetName(), "direction", pad.GetDirection())
104 var downstreamPad *gst.Pad
105 if strings.HasPrefix(pad.GetName(), "video_") {
106 downstreamPad = mqVideoSink
107 } else if strings.HasPrefix(pad.GetName(), "audio_") {
108 downstreamPad = mqAudioSink
109 } else {
110 log.Error(ctx, "unknown pad", "name", pad.GetName(), "direction", pad.GetDirection())
111 // cancel()
112 return
113 }
114 ret := pad.Link(downstreamPad)
115 if ret != gst.PadLinkOK {
116 log.Error(ctx, "failed to link demux to downstream pad", "name", pad.GetName(), "direction", pad.GetDirection(), "error", ret)
117 // cancel()
118 return
119 }
120 needed--
121 if needed == 0 {
122 padAdded = doNothing
123 }
124 }
125 outerPadAdded := func(self *gst.Element, pad *gst.Pad) {
126 padAdded(self, pad)
127 }
128
129 // Necessary to avoid leaking `mqVideoSink` and `mqAudioSink` from the
130 // pad-added function in the case where we hit invalid data and
131 // pad-added never fires.
132 go func() {
133 <-ctx.Done()
134 padAdded = doNothing
135 }()
136
137 _, err = demux.Connect("pad-added", outerPadAdded)
138 if err != nil {
139 return nil, fmt.Errorf("failed to connect demux pad-added signal: %w", err)
140 }
141
142 ok := bin.AddPad(videoGhost.Pad)
143 if !ok {
144 return nil, fmt.Errorf("failed to add video ghost pad to bin")
145 }
146
147 ok = bin.AddPad(audioGhost.Pad)
148 if !ok {
149 return nil, fmt.Errorf("failed to add audio ghost pad to bin")
150 }
151
152 src := app.SrcFromElement(appSrc)
153 src.SetCallbacks(&app.SourceCallbacks{
154 NeedDataFunc: ReaderNeedData(ctx, bytes.NewReader(seg.Data)),
155 })
156
157 return bin, nil
158}