Live video on the AT Protocol
1package media
2
3import (
4 "context"
5 "errors"
6 "fmt"
7
8 "github.com/go-gst/go-gst/gst"
9 "stream.place/streamplace/pkg/bus"
10 "stream.place/streamplace/pkg/log"
11)
12
13var ErrConcatDone = errors.New("concat done")
14
15func ConcatBin(ctx context.Context, segCh <-chan *bus.Seg, doH264Parse bool) (*gst.Bin, error) {
16 ctx = log.WithLogValues(ctx, "func", "ConcatBin")
17 bin := gst.NewBin("concat-bin")
18
19 streamsynchronizer, err := gst.NewElementWithProperties("streamsynchronizer", map[string]any{
20 "name": "concat-streamsynchronizer",
21 })
22 if err != nil {
23 return nil, fmt.Errorf("failed to create streamsynchronizer element: %w", err)
24 }
25
26 err = bin.Add(streamsynchronizer)
27 if err != nil {
28 return nil, fmt.Errorf("failed to add streamsynchronizer to pipeline: %w", err)
29 }
30
31 syncPadVideoSink := streamsynchronizer.GetRequestPad("sink_%u")
32 if syncPadVideoSink == nil {
33 return nil, fmt.Errorf("failed to get sync video sink pad")
34 }
35
36 syncPadAudioSink := streamsynchronizer.GetRequestPad("sink_%u")
37 if syncPadAudioSink == nil {
38 return nil, fmt.Errorf("failed to get sync audio sink pad")
39 }
40
41 syncPadVideoSrc := streamsynchronizer.GetStaticPad("src_0")
42 if syncPadVideoSrc == nil {
43 return nil, fmt.Errorf("failed to get sync video src pad")
44 }
45
46 syncPadAudioSrc := streamsynchronizer.GetStaticPad("src_1")
47 if syncPadAudioSrc == nil {
48 return nil, fmt.Errorf("failed to get sync audio src pad")
49 }
50
51 mq, err := gst.NewElementWithProperties("multiqueue", map[string]interface{}{
52 "name": "concat-multiqueue",
53 })
54 if err != nil {
55 return nil, fmt.Errorf("failed to create multiqueue element: %w", err)
56 }
57 err = bin.Add(mq)
58 if err != nil {
59 return nil, fmt.Errorf("failed to add multiqueue to bin: %w", err)
60 }
61
62 // 10x default multiqueue size
63 err = mq.SetProperty("max-size-time", uint64(200000000000))
64 if err != nil {
65 return nil, fmt.Errorf("failed to set max-size-time: %w", err)
66 }
67 err = mq.SetProperty("max-size-bytes", uint(1048576000))
68 if err != nil {
69 return nil, fmt.Errorf("failed to set max-size-bytes: %w", err)
70 }
71 err = mq.SetProperty("max-size-buffers", uint(500))
72 if err != nil {
73 return nil, fmt.Errorf("failed to set max-size-buffers: %w", err)
74 }
75
76 mqVideoSink := mq.GetRequestPad("sink_%u")
77 if mqVideoSink == nil {
78 return nil, fmt.Errorf("video sink pad not found")
79 }
80
81 mqAudioSink := mq.GetRequestPad("sink_%u")
82 if mqAudioSink == nil {
83 return nil, fmt.Errorf("audio sink pad not found")
84 }
85
86 mqVideoSrc := mq.GetStaticPad("src_0")
87 if mqVideoSrc == nil {
88 return nil, fmt.Errorf("video source pad not found")
89 }
90
91 mqAudioSrc := mq.GetStaticPad("src_1")
92 if mqAudioSrc == nil {
93 return nil, fmt.Errorf("audio source pad not found")
94 }
95
96 linked := syncPadVideoSrc.Link(mqVideoSink)
97 if linked != gst.PadLinkOK {
98 return nil, fmt.Errorf("failed to link sync video src pad to multiqueue video sink pad: %v", linked)
99 }
100
101 linked = syncPadAudioSrc.Link(mqAudioSink)
102 if linked != gst.PadLinkOK {
103 return nil, fmt.Errorf("failed to link sync audio src pad to multiqueue audio sink pad: %v", linked)
104 }
105
106 videoGhost := gst.NewGhostPad("video_0", mqVideoSrc)
107 if videoGhost == nil {
108 return nil, fmt.Errorf("failed to create video ghost pad")
109 }
110
111 audioGhost := gst.NewGhostPad("audio_0", mqAudioSrc)
112 if audioGhost == nil {
113 return nil, fmt.Errorf("failed to create audio ghost pad")
114 }
115
116 ok := bin.AddPad(videoGhost.Pad)
117 if !ok {
118 return nil, fmt.Errorf("failed to add video ghost pad to bin")
119 }
120
121 ok = bin.AddPad(audioGhost.Pad)
122 if !ok {
123 return nil, fmt.Errorf("failed to add audio ghost pad to bin")
124 }
125
126 go func() {
127 for {
128 select {
129 case seg := <-segCh:
130 if seg == nil {
131
132 ok := syncPadVideoSrc.PushEvent(gst.NewEOSEvent())
133 if !ok {
134 log.Error(ctx, "failed to post EOS message", "error", ok)
135 }
136 ok = syncPadAudioSrc.PushEvent(gst.NewEOSEvent())
137 if !ok {
138 log.Error(ctx, "failed to post EOS message", "error", ok)
139 }
140 log.Debug(ctx, "concat completed")
141
142 return
143 }
144 err := addConcatDemuxer(ctx, bin, seg, syncPadVideoSink, syncPadAudioSink, doH264Parse)
145 if err != nil {
146 log.Error(ctx, "failed to add concat demuxer", "error", err)
147 bin.Error(err.Error(), err)
148 return
149 }
150 case <-ctx.Done():
151 return
152 }
153 }
154 }()
155
156 return bin, nil
157}
158
159func addConcatDemuxer(ctx context.Context, bin *gst.Bin, seg *bus.Seg, syncPadVideoSink *gst.Pad, syncPadAudioSink *gst.Pad, doH264Parse bool) error {
160 var cancel context.CancelFunc
161 ctx, cancel = context.WithCancel(ctx)
162 defer cancel()
163 ctx = log.WithLogValues(ctx, "func", "ConcatBin")
164
165 log.Debug(ctx, "adding concat demuxer", "seg", seg.Filepath)
166 demuxBin, err := ConcatDemuxBin(ctx, seg, doH264Parse)
167 if err != nil {
168 return fmt.Errorf("failed to create demux bin: %w", err)
169 }
170
171 err = bin.Add(demuxBin.Element)
172 if err != nil {
173 return fmt.Errorf("failed to add demux bin to bin: %w", err)
174 }
175
176 demuxBinPadVideoSrc := demuxBin.GetStaticPad("video_0")
177 if demuxBinPadVideoSrc == nil {
178 return fmt.Errorf("failed to get demux bin video src pad")
179 }
180
181 demuxBinPadAudioSrc := demuxBin.GetStaticPad("audio_0")
182 if demuxBinPadAudioSrc == nil {
183 return fmt.Errorf("failed to get demux bin audio src pad")
184 }
185
186 linked := demuxBinPadVideoSrc.Link(syncPadVideoSink)
187 if linked != gst.PadLinkOK {
188 return fmt.Errorf("failed to link demux bin video src pad to sync video sink pad: %v", linked)
189 }
190
191 linked = demuxBinPadAudioSrc.Link(syncPadAudioSink)
192 if linked != gst.PadLinkOK {
193 return fmt.Errorf("failed to link demux bin audio src pad to sync audio sink pad: %v", linked)
194 }
195
196 eosCh := make(chan struct{})
197 eos := func(pad *gst.Pad, info *gst.PadProbeInfo) gst.PadProbeReturn {
198 if pad.GetDirection() != gst.PadDirectionSource {
199 return gst.PadProbeOK
200 }
201 if info.GetEvent().Type() != gst.EventTypeEOS {
202 return gst.PadProbeOK
203 }
204 log.Debug(ctx, "demux EOS", "name", pad.GetName(), "direction", pad.GetDirection())
205 downstreamPad := pad.GetPeer()
206 unlinked := pad.Unlink(downstreamPad)
207 if !unlinked {
208 log.Error(ctx, "failed to unlink pad", "name", pad.GetName(), "direction", pad.GetDirection(), "error", unlinked)
209 }
210 go func() {
211 eosCh <- struct{}{}
212 }()
213 return gst.PadProbeRemove
214 }
215 demuxBinPadVideoSrc.AddProbe(gst.PadProbeTypeEventBoth, eos)
216 demuxBinPadAudioSrc.AddProbe(gst.PadProbeTypeEventBoth, eos)
217
218 if err := bin.SetState(gst.StatePlaying); err != nil {
219 return fmt.Errorf("failed to set state: %w", err)
220 }
221
222 <-eosCh
223 <-eosCh
224
225 err = bin.Remove(demuxBin.Element)
226 if err != nil {
227 return fmt.Errorf("failed to remove demux bin from bin: %w", err)
228 }
229
230 err = demuxBin.SetState(gst.StateNull)
231 if err != nil {
232 return fmt.Errorf("failed to set demux bin to null state: %w", err)
233 }
234
235 log.Debug(ctx, "removed concat demuxer", "seg", seg.Filepath)
236
237 return nil
238}