Live video on the AT Protocol
1package media
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "strings"
8
9 "github.com/go-gst/go-gst/gst"
10 "github.com/go-gst/go-gst/gst/app"
11 "stream.place/streamplace/pkg/bus"
12 "stream.place/streamplace/pkg/log"
13)
14
15// silly technique to avoid leaking pads
16func doNothing(self *gst.Element, pad *gst.Pad) {}
17
18func ConcatDemuxBin(ctx context.Context, seg *bus.Seg) (*gst.Bin, error) {
19 ctx = log.WithLogValues(ctx, "func", "SegDemuxBin")
20 bin := gst.NewBin("seg-demux-bin")
21
22 appSrc, err := gst.NewElementWithProperties("appsrc", map[string]interface{}{
23 "name": "concat-appsrc",
24 })
25 if err != nil {
26 return nil, fmt.Errorf("failed to create appsrc element: %w", err)
27 }
28 err = bin.Add(appSrc)
29 if err != nil {
30 return nil, fmt.Errorf("failed to add appsrc to bin: %w", err)
31 }
32
33 demux, err := gst.NewElementWithProperties("qtdemux", map[string]interface{}{
34 "name": "concat-demux",
35 })
36 if err != nil {
37 return nil, fmt.Errorf("failed to create qtdemux element: %w", err)
38 }
39 err = bin.Add(demux)
40 if err != nil {
41 return nil, fmt.Errorf("failed to add qtdemux to bin: %w", err)
42 }
43
44 err = appSrc.Link(demux)
45 if err != nil {
46 return nil, fmt.Errorf("failed to link appsrc to qtdemux: %w", err)
47 }
48
49 tmpl := demux.GetPadTemplates()
50 if tmpl == nil {
51 return nil, fmt.Errorf("pad templates not found")
52 }
53
54 mq, err := gst.NewElementWithProperties("multiqueue", map[string]interface{}{
55 "name": "concat-demux-multiqueue",
56 })
57 if err != nil {
58 return nil, fmt.Errorf("failed to create multiqueue element: %w", err)
59 }
60 err = bin.Add(mq)
61 if err != nil {
62 return nil, fmt.Errorf("failed to add multiqueue to bin: %w", err)
63 }
64
65 mqVideoSink := mq.GetRequestPad("sink_%u")
66 if mqVideoSink == nil {
67 return nil, fmt.Errorf("video sink pad not found")
68 }
69
70 mqAudioSink := mq.GetRequestPad("sink_%u")
71 if mqAudioSink == nil {
72 return nil, fmt.Errorf("audio sink pad not found")
73 }
74
75 mqVideoSrc := mq.GetStaticPad("src_0")
76 if mqVideoSrc == nil {
77 return nil, fmt.Errorf("video source pad not found")
78 }
79
80 mqAudioSrc := mq.GetStaticPad("src_1")
81 if mqAudioSrc == nil {
82 return nil, fmt.Errorf("audio source pad not found")
83 }
84
85 videoGhost := gst.NewGhostPad("video_0", mqVideoSrc)
86 if videoGhost == nil {
87 return nil, fmt.Errorf("failed to create video ghost pad")
88 }
89
90 audioGhost := gst.NewGhostPad("audio_0", mqAudioSrc)
91 if audioGhost == nil {
92 return nil, fmt.Errorf("failed to create audio ghost pad")
93 }
94
95 needed := 2
96
97 var padAdded func(self *gst.Element, pad *gst.Pad)
98 // the defer funcs are needed to avoid leaking pads for some reason
99 padAdded = func(self *gst.Element, pad *gst.Pad) {
100 log.Debug(ctx, "demux pad-added", "name", pad.GetName(), "direction", pad.GetDirection())
101 var downstreamPad *gst.Pad
102 if strings.HasPrefix(pad.GetName(), "video_") {
103 downstreamPad = mqVideoSink
104 // defer func() { mqVideoSink = nil }()
105 } else if strings.HasPrefix(pad.GetName(), "audio_") {
106 downstreamPad = mqAudioSink
107 // defer func() { mqAudioSink = nil }()
108 } else {
109 log.Error(ctx, "unknown pad", "name", pad.GetName(), "direction", pad.GetDirection())
110 // cancel()
111 return
112 }
113 ret := pad.Link(downstreamPad)
114 if ret != gst.PadLinkOK {
115 log.Error(ctx, "failed to link demux to downstream pad", "name", pad.GetName(), "direction", pad.GetDirection(), "error", ret)
116 // cancel()
117 return
118 }
119 needed--
120 if needed == 0 {
121 padAdded = doNothing
122 }
123 }
124 outerPadAdded := func(self *gst.Element, pad *gst.Pad) {
125 padAdded(self, pad)
126 }
127
128 _, err = demux.Connect("pad-added", outerPadAdded)
129 if err != nil {
130 return nil, fmt.Errorf("failed to connect demux pad-added signal: %w", err)
131 }
132
133 ok := bin.AddPad(videoGhost.Pad)
134 if !ok {
135 return nil, fmt.Errorf("failed to add video ghost pad to bin")
136 }
137
138 ok = bin.AddPad(audioGhost.Pad)
139 if !ok {
140 return nil, fmt.Errorf("failed to add audio ghost pad to bin")
141 }
142
143 src := app.SrcFromElement(appSrc)
144 src.SetCallbacks(&app.SourceCallbacks{
145 NeedDataFunc: ReaderNeedData(ctx, bytes.NewReader(seg.Data)),
146 })
147
148 return bin, nil
149}