Live video on the AT Protocol
1package media
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "strings"
8 "sync"
9 "time"
10
11 "github.com/go-gst/go-gst/gst"
12 "github.com/go-gst/go-gst/gst/app"
13 "stream.place/streamplace/pkg/log"
14)
15
16func (mm *MediaManager) ToHLS(ctx context.Context, user string, rendition string, m3u8 *M3U8) error {
17 ctx = log.WithLogValues(ctx, "GStreamerFunc", "ToHLS", "rendition", rendition)
18
19 pipelineSlice := []string{
20 "h264parse name=videoparse",
21 "opusdec use-inband-fec=true name=audioparse ! audioresample ! audiorate ! fdkaacenc name=audioenc",
22 }
23
24 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n"))
25 if err != nil {
26 return fmt.Errorf("error creating ToHLS pipeline: %w", err)
27 }
28
29 outputQueue, done, err := ConcatStream(ctx, pipeline, user, rendition, mm.bus)
30 if err != nil {
31 return fmt.Errorf("failed to get output queue: %w", err)
32 }
33
34 videoParse, err := pipeline.GetElementByName("videoparse")
35 if err != nil {
36 return fmt.Errorf("failed to get video sink element from pipeline: %w", err)
37 }
38 err = outputQueue.Link(videoParse)
39 if err != nil {
40 return fmt.Errorf("failed to link output queue to video parse: %w", err)
41 }
42
43 audioParse, err := pipeline.GetElementByName("audioparse")
44 if err != nil {
45 return fmt.Errorf("failed to get audio parse element from pipeline: %w", err)
46 }
47 err = outputQueue.Link(audioParse)
48 if err != nil {
49 return fmt.Errorf("failed to link output queue to audio parse: %w", err)
50 }
51
52 splitmuxsink, err := gst.NewElementWithProperties("splitmuxsink", map[string]any{
53 "name": "mux",
54 "async-finalize": true,
55 "sink-factory": "appsink",
56 "muxer-factory": "mpegtsmux",
57 "max-size-bytes": 1,
58 })
59 if err != nil {
60 return err
61 }
62
63 r, err := m3u8.GetRendition(rendition)
64 if err != nil {
65 return fmt.Errorf("failed to get rendition: %w", err)
66 }
67 defer func() { r = nil }()
68 ps := NewPendingSegments(r)
69 defer func() { ps = nil }()
70
71 p := splitmuxsink.GetRequestPad("video")
72 if p == nil {
73 return fmt.Errorf("failed to get video pad")
74 }
75 p = splitmuxsink.GetRequestPad("audio_%u")
76 if p == nil {
77 return fmt.Errorf("failed to get audio pad")
78 }
79
80 err = pipeline.Add(splitmuxsink)
81 if err != nil {
82 return fmt.Errorf("error adding splitmuxsink to ToHLS pipeline: %w", err)
83 }
84
85 videoparse, err := pipeline.GetElementByName("videoparse")
86 if err != nil {
87 return fmt.Errorf("error getting videoparse from ToHLS pipeline: %w", err)
88 }
89 err = videoparse.Link(splitmuxsink)
90 if err != nil {
91 return fmt.Errorf("error linking videoparse to splitmuxsink: %w", err)
92 }
93
94 audioenc, err := pipeline.GetElementByName("audioenc")
95 if err != nil {
96 return fmt.Errorf("error getting audioenc from ToHLS pipeline: %w", err)
97 }
98 err = audioenc.Link(splitmuxsink)
99 if err != nil {
100 return fmt.Errorf("error linking audioenc to splitmuxsink: %w", err)
101 }
102
103 ctx, cancel := context.WithCancel(ctx)
104
105 go func() {
106 select {
107 case <-ctx.Done():
108 return
109 case <-done:
110 cancel()
111 }
112 }()
113
114 _, err = splitmuxsink.Connect("sink-added", func(split, sinkEle *gst.Element) {
115 log.Debug(ctx, "hls-check sink-added")
116 vf, err := ps.GetNextSegment(ctx)
117 if err != nil {
118 panic(err)
119 }
120 appsink := app.SinkFromElement(sinkEle)
121 appsink.SetCallbacks(&app.SinkCallbacks{
122 NewSampleFunc: WriterNewSample(ctx, vf.Buf),
123 EOSFunc: func(sink *app.Sink) {
124 log.Debug(ctx, "hls-check Segment EOS", "buf", vf.Buf.Len())
125 ps.CloseSegment(ctx, vf)
126 },
127 })
128 })
129 if err != nil {
130 return fmt.Errorf("failed to add hls-check to sink: %w", err)
131 }
132
133 onPadAdded := func(element *gst.Element, pad *gst.Pad) {
134 caps := pad.GetCurrentCaps()
135 if caps == nil {
136 fmt.Println("Unable to get pad caps")
137 return
138 }
139
140 log.Debug(ctx, "New pad added", "pad", pad.GetName(), "caps", caps.String())
141
142 structure := caps.GetStructureAt(0)
143 if structure == nil {
144 fmt.Println("Unable to get structure from caps")
145 return
146 }
147
148 name := structure.Name()
149 fmt.Printf("Structure Name: %s\n", name)
150 }
151
152 _, err = splitmuxsink.Connect("pad-added", onPadAdded)
153 if err != nil {
154 return fmt.Errorf("failed to add pad: %w", err)
155 }
156
157 defer cancel()
158 go func() {
159 err := HandleBusMessagesCustom(ctx, pipeline, func(msg *gst.Message) {
160 switch msg.Type() {
161 case gst.MessageElement:
162 structure := msg.GetStructure()
163 name := structure.Name()
164 if name == "splitmuxsink-fragment-opened" {
165 runningTime, err := structure.GetValue("running-time")
166 if err != nil {
167 log.Debug(ctx, "splitmuxsink-fragment-opened error", "error", err)
168 cancel()
169 }
170 runningTimeInt, ok := runningTime.(uint64)
171 if !ok {
172 log.Warn(ctx, "splitmuxsink-fragment-opened not a uint64")
173 cancel()
174 }
175 log.Debug(ctx, "hls-check splitmuxsink-fragment-opened", "runningTime", runningTimeInt)
176 if err := ps.FragmentOpened(ctx, runningTimeInt); err != nil {
177 log.Debug(ctx, "fragment open error", "error", err)
178 cancel()
179 }
180 }
181 if name == "splitmuxsink-fragment-closed" {
182 runningTime, err := structure.GetValue("running-time")
183 if err != nil {
184 log.Debug(ctx, "splitmuxsink-fragment-closed error", "error", err)
185 cancel()
186 }
187 runningTimeInt, ok := runningTime.(uint64)
188 if !ok {
189 log.Warn(ctx, "splitmuxsink-fragment-closed not a uint64")
190 cancel()
191 }
192 log.Debug(ctx, "hls-check splitmuxsink-fragment-closed", "runningTime", runningTimeInt)
193 if err := ps.FragmentClosed(ctx, runningTimeInt); err != nil {
194 log.Debug(ctx, "fragment close error", "error", err)
195 cancel()
196 }
197 }
198 }
199 })
200 if err != nil {
201 log.Log(ctx, "pipeline error", "error", err)
202 }
203 cancel()
204 }()
205
206 // Start the pipeline
207 if err := pipeline.SetState(gst.StatePlaying); err != nil {
208 return fmt.Errorf("error setting pipeline state: %w", err)
209 }
210
211 <-ctx.Done()
212
213 if err := pipeline.BlockSetState(gst.StateNull); err != nil {
214 return fmt.Errorf("error setting pipeline state: %w", err)
215 }
216
217 return nil
218}
219
220type PendingSegments struct {
221 segments []*Segment
222 lock sync.Mutex
223 rendition *M3U8Rendition
224}
225
226func NewPendingSegments(rendition *M3U8Rendition) *PendingSegments {
227 return &PendingSegments{
228 segments: []*Segment{},
229 lock: sync.Mutex{},
230 rendition: rendition,
231 }
232}
233
234func (ps *PendingSegments) GetNextSegment(ctx context.Context) (*Segment, error) {
235 ps.lock.Lock()
236 defer ps.lock.Unlock()
237 log.Debug(ctx, "next segment")
238 seg := &Segment{
239 Buf: &bytes.Buffer{},
240 Time: time.Now(),
241 Closed: false,
242 }
243 ps.segments = append(ps.segments, seg)
244 return seg, nil
245}
246
247func (ps *PendingSegments) CloseSegment(ctx context.Context, seg *Segment) {
248 ps.lock.Lock()
249 defer ps.lock.Unlock()
250 log.Debug(ctx, "close segment", "MSN", seg.MSN)
251 seg.Closed = true
252 if err := ps.checkSegments(ctx); err != nil {
253 log.Debug(ctx, "faile to check segments segment")
254 }
255}
256
257func (ps *PendingSegments) FragmentOpened(ctx context.Context, t uint64) error {
258 ps.lock.Lock()
259 defer ps.lock.Unlock()
260 log.Debug(ctx, "fragment opened", "time", t)
261 if len(ps.segments) == 0 {
262 return fmt.Errorf("no pending segments")
263 }
264 for _, seg := range ps.segments {
265 if seg.StartTS == nil {
266 seg.StartTS = &t
267 break
268 }
269 }
270 if err := ps.checkSegments(ctx); err != nil {
271 return fmt.Errorf("failed to check segments: %w", err)
272 }
273 return nil
274}
275
276func (ps *PendingSegments) FragmentClosed(ctx context.Context, t uint64) error {
277 ps.lock.Lock()
278 defer ps.lock.Unlock()
279 log.Debug(ctx, "fragment closed", "time", t)
280 if len(ps.segments) == 0 {
281 return fmt.Errorf("no pending segments")
282 }
283 for _, seg := range ps.segments {
284 if seg.EndTS == nil {
285 seg.EndTS = &t
286 dur := *seg.EndTS - *seg.StartTS
287 seg.Duration = time.Duration(dur)
288 break
289 }
290 }
291 if err := ps.checkSegments(ctx); err != nil {
292 return fmt.Errorf("failed to check segments: %w", err)
293 }
294 return nil
295}
296
297// the tricky piece of the design here is that we need to expect GetNextSegment,
298// CloseSegment, FragmentOpened, and FragmentClosed to be called in any order. So
299// all of those functions call this one, and it checks if we have the necessary information
300// to finalize a segment and add it to our playlist.
301// only call if you're holding ps.lock!
302func (ps *PendingSegments) checkSegments(ctx context.Context) error {
303 pending := ps.segments[0]
304 if pending.StartTS != nil && pending.EndTS != nil && pending.Closed {
305 if err := ps.rendition.NewSegment(pending); err != nil {
306 return fmt.Errorf("failed to add new segment: %w", err)
307 }
308 log.Debug(ctx, "finalizing segment", "MSN", pending.MSN)
309 ps.segments = ps.segments[1:]
310 }
311 return nil
312}