Live video on the AT Protocol
1package media
2
3import (
4 "context"
5 "errors"
6 "fmt"
7
8 "github.com/go-gst/go-gst/gst"
9 "stream.place/streamplace/pkg/bus"
10 "stream.place/streamplace/pkg/log"
11)
12
13var ErrConcatDone = errors.New("concat done")
14
15func ConcatBin(ctx context.Context, segCh <-chan *bus.Seg) (*gst.Bin, error) {
16 ctx = log.WithLogValues(ctx, "func", "ConcatBin")
17 bin := gst.NewBin("concat-bin")
18
19 streamsynchronizer, err := gst.NewElementWithProperties("streamsynchronizer", map[string]any{
20 "name": "concat-streamsynchronizer",
21 })
22 if err != nil {
23 return nil, fmt.Errorf("failed to create streamsynchronizer element: %w", err)
24 }
25
26 err = bin.Add(streamsynchronizer)
27 if err != nil {
28 return nil, fmt.Errorf("failed to add streamsynchronizer to pipeline: %w", err)
29 }
30
31 syncPadVideoSink := streamsynchronizer.GetRequestPad("sink_%u")
32 if syncPadVideoSink == nil {
33 return nil, fmt.Errorf("failed to get sync video sink pad")
34 }
35
36 syncPadAudioSink := streamsynchronizer.GetRequestPad("sink_%u")
37 if syncPadAudioSink == nil {
38 return nil, fmt.Errorf("failed to get sync audio sink pad")
39 }
40
41 syncPadVideoSrc := streamsynchronizer.GetStaticPad("src_0")
42 if syncPadVideoSrc == nil {
43 return nil, fmt.Errorf("failed to get sync video src pad")
44 }
45
46 syncPadAudioSrc := streamsynchronizer.GetStaticPad("src_1")
47 if syncPadAudioSrc == nil {
48 return nil, fmt.Errorf("failed to get sync audio src pad")
49 }
50
51 mq, err := gst.NewElementWithProperties("multiqueue", map[string]interface{}{
52 "name": "concat-multiqueue",
53 })
54 if err != nil {
55 return nil, fmt.Errorf("failed to create multiqueue element: %w", err)
56 }
57 err = bin.Add(mq)
58 if err != nil {
59 return nil, fmt.Errorf("failed to add multiqueue to bin: %w", err)
60 }
61
62 mqVideoSink := mq.GetRequestPad("sink_%u")
63 if mqVideoSink == nil {
64 return nil, fmt.Errorf("video sink pad not found")
65 }
66
67 mqAudioSink := mq.GetRequestPad("sink_%u")
68 if mqAudioSink == nil {
69 return nil, fmt.Errorf("audio sink pad not found")
70 }
71
72 mqVideoSrc := mq.GetStaticPad("src_0")
73 if mqVideoSrc == nil {
74 return nil, fmt.Errorf("video source pad not found")
75 }
76
77 mqAudioSrc := mq.GetStaticPad("src_1")
78 if mqAudioSrc == nil {
79 return nil, fmt.Errorf("audio source pad not found")
80 }
81
82 linked := syncPadVideoSrc.Link(mqVideoSink)
83 if linked != gst.PadLinkOK {
84 return nil, fmt.Errorf("failed to link sync video src pad to multiqueue video sink pad: %v", linked)
85 }
86
87 linked = syncPadAudioSrc.Link(mqAudioSink)
88 if linked != gst.PadLinkOK {
89 return nil, fmt.Errorf("failed to link sync audio src pad to multiqueue audio sink pad: %v", linked)
90 }
91
92 videoGhost := gst.NewGhostPad("video_0", mqVideoSrc)
93 if videoGhost == nil {
94 return nil, fmt.Errorf("failed to create video ghost pad")
95 }
96
97 audioGhost := gst.NewGhostPad("audio_0", mqAudioSrc)
98 if audioGhost == nil {
99 return nil, fmt.Errorf("failed to create audio ghost pad")
100 }
101
102 ok := bin.AddPad(videoGhost.Pad)
103 if !ok {
104 return nil, fmt.Errorf("failed to add video ghost pad to bin")
105 }
106
107 ok = bin.AddPad(audioGhost.Pad)
108 if !ok {
109 return nil, fmt.Errorf("failed to add audio ghost pad to bin")
110 }
111
112 go func() {
113 for {
114 select {
115 case seg := <-segCh:
116 if seg == nil {
117 bin.Error(ErrConcatDone.Error(), ErrConcatDone)
118 return
119 }
120 err := addConcatDemuxer(ctx, bin, seg, syncPadVideoSink, syncPadAudioSink)
121 if err != nil {
122 log.Error(ctx, "failed to add concat demuxer", "error", err)
123 bin.Error(err.Error(), err)
124 return
125 }
126 case <-ctx.Done():
127 return
128 }
129 }
130 }()
131
132 return bin, nil
133}
134
135func addConcatDemuxer(ctx context.Context, bin *gst.Bin, seg *bus.Seg, syncPadVideoSink *gst.Pad, syncPadAudioSink *gst.Pad) error {
136
137 log.Debug(ctx, "adding concat demuxer", "seg", seg.Filepath)
138 demuxBin, err := ConcatDemuxBin(ctx, seg)
139 if err != nil {
140 return fmt.Errorf("failed to create demux bin: %w", err)
141 }
142
143 err = bin.Add(demuxBin.Element)
144 if err != nil {
145 return fmt.Errorf("failed to add demux bin to bin: %w", err)
146 }
147
148 demuxBinPadVideoSrc := demuxBin.GetStaticPad("video_0")
149 if demuxBinPadVideoSrc == nil {
150 return fmt.Errorf("failed to get demux bin video src pad")
151 }
152
153 demuxBinPadAudioSrc := demuxBin.GetStaticPad("audio_0")
154 if demuxBinPadAudioSrc == nil {
155 return fmt.Errorf("failed to get demux bin audio src pad")
156 }
157
158 linked := demuxBinPadVideoSrc.Link(syncPadVideoSink)
159 if linked != gst.PadLinkOK {
160 return fmt.Errorf("failed to link demux bin video src pad to sync video sink pad: %v", linked)
161 }
162
163 linked = demuxBinPadAudioSrc.Link(syncPadAudioSink)
164 if linked != gst.PadLinkOK {
165 return fmt.Errorf("failed to link demux bin audio src pad to sync audio sink pad: %v", linked)
166 }
167
168 eosCh := make(chan struct{})
169 eos := func(pad *gst.Pad, info *gst.PadProbeInfo) gst.PadProbeReturn {
170 if pad.GetDirection() != gst.PadDirectionSource {
171 return gst.PadProbeOK
172 }
173 if info.GetEvent().Type() != gst.EventTypeEOS {
174 return gst.PadProbeOK
175 }
176 log.Debug(ctx, "demux EOS", "name", pad.GetName(), "direction", pad.GetDirection())
177 downstreamPad := pad.GetPeer()
178 unlinked := pad.Unlink(downstreamPad)
179 if !unlinked {
180 log.Error(ctx, "failed to unlink pad", "name", pad.GetName(), "direction", pad.GetDirection(), "error", unlinked)
181 }
182 go func() {
183 eosCh <- struct{}{}
184 }()
185 return gst.PadProbeRemove
186 }
187 demuxBinPadVideoSrc.AddProbe(gst.PadProbeTypeEventBoth, eos)
188 demuxBinPadAudioSrc.AddProbe(gst.PadProbeTypeEventBoth, eos)
189
190 if err := bin.SetState(gst.StatePlaying); err != nil {
191 return fmt.Errorf("failed to set state: %w", err)
192 }
193
194 <-eosCh
195 <-eosCh
196
197 err = bin.Remove(demuxBin.Element)
198 if err != nil {
199 return fmt.Errorf("failed to remove demux bin from bin: %w", err)
200 }
201
202 err = demuxBin.SetState(gst.StateNull)
203 if err != nil {
204 return fmt.Errorf("failed to set demux bin to null state: %w", err)
205 }
206
207 log.Debug(ctx, "removed concat demuxer", "seg", seg.Filepath)
208
209 return nil
210}