Live video on the AT Protocol
1package media
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "strings"
8 "sync"
9 "time"
10
11 "github.com/go-gst/go-gst/gst"
12 "github.com/go-gst/go-gst/gst/app"
13 "stream.place/streamplace/pkg/log"
14)
15
16func (mm *MediaManager) ToHLS(ctx context.Context, user string, rendition string, m3u8 *M3U8) error {
17 ctx = log.WithLogValues(ctx, "GStreamerFunc", "ToHLS", "rendition", rendition)
18
19 pipelineSlice := []string{
20 "h264parse name=videoparse",
21 "opusdec use-inband-fec=true name=audioparse ! audioresample ! audiorate ! fdkaacenc name=audioenc",
22 }
23
24 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n"))
25 if err != nil {
26 return fmt.Errorf("error creating ToHLS pipeline: %w", err)
27 }
28
29 outputQueue, done, err := ConcatStream(ctx, pipeline, user, rendition, mm)
30 if err != nil {
31 return fmt.Errorf("failed to get output queue: %w", err)
32 }
33
34 videoParse, err := pipeline.GetElementByName("videoparse")
35 if err != nil {
36 return fmt.Errorf("failed to get video sink element from pipeline: %w", err)
37 }
38 err = outputQueue.Link(videoParse)
39 if err != nil {
40 return fmt.Errorf("failed to link output queue to video parse: %w", err)
41 }
42
43 audioParse, err := pipeline.GetElementByName("audioparse")
44 if err != nil {
45 return fmt.Errorf("failed to get audio parse element from pipeline: %w", err)
46 }
47 err = outputQueue.Link(audioParse)
48 if err != nil {
49 return fmt.Errorf("failed to link output queue to audio parse: %w", err)
50 }
51
52 splitmuxsink, err := gst.NewElementWithProperties("splitmuxsink", map[string]any{
53 "name": "mux",
54 "async-finalize": true,
55 "sink-factory": "appsink",
56 "muxer-factory": "mpegtsmux",
57 "max-size-bytes": 1,
58 })
59 if err != nil {
60 return err
61 }
62
63 r := m3u8.GetRendition(rendition)
64 defer func() { r = nil }()
65 ps := NewPendingSegments(r)
66 defer func() { ps = nil }()
67
68 p := splitmuxsink.GetRequestPad("video")
69 if p == nil {
70 return fmt.Errorf("failed to get video pad")
71 }
72 p = splitmuxsink.GetRequestPad("audio_%u")
73 if p == nil {
74 return fmt.Errorf("failed to get audio pad")
75 }
76
77 err = pipeline.Add(splitmuxsink)
78 if err != nil {
79 return fmt.Errorf("error adding splitmuxsink to ToHLS pipeline: %w", err)
80 }
81
82 videoparse, err := pipeline.GetElementByName("videoparse")
83 if err != nil {
84 return fmt.Errorf("error getting videoparse from ToHLS pipeline: %w", err)
85 }
86 err = videoparse.Link(splitmuxsink)
87 if err != nil {
88 return fmt.Errorf("error linking videoparse to splitmuxsink: %w", err)
89 }
90
91 audioenc, err := pipeline.GetElementByName("audioenc")
92 if err != nil {
93 return fmt.Errorf("error getting audioenc from ToHLS pipeline: %w", err)
94 }
95 err = audioenc.Link(splitmuxsink)
96 if err != nil {
97 return fmt.Errorf("error linking audioenc to splitmuxsink: %w", err)
98 }
99
100 ctx, cancel := context.WithCancel(ctx)
101
102 go func() {
103 select {
104 case <-ctx.Done():
105 return
106 case <-done:
107 cancel()
108 }
109 }()
110
111 _, err = splitmuxsink.Connect("sink-added", func(split, sinkEle *gst.Element) {
112 log.Debug(ctx, "hls-check sink-added")
113 vf, err := ps.GetNextSegment(ctx)
114 if err != nil {
115 panic(err)
116 }
117 appsink := app.SinkFromElement(sinkEle)
118 appsink.SetCallbacks(&app.SinkCallbacks{
119 NewSampleFunc: WriterNewSample(ctx, vf.Buf),
120 EOSFunc: func(sink *app.Sink) {
121 log.Debug(ctx, "hls-check Segment EOS", "buf", vf.Buf.Len())
122 ps.CloseSegment(ctx, vf)
123 },
124 })
125 })
126 if err != nil {
127 return fmt.Errorf("failed to add hls-check to sink: %w", err)
128 }
129
130 onPadAdded := func(element *gst.Element, pad *gst.Pad) {
131 caps := pad.GetCurrentCaps()
132 if caps == nil {
133 fmt.Println("Unable to get pad caps")
134 return
135 }
136
137 log.Debug(ctx, "New pad added", "pad", pad.GetName(), "caps", caps.String())
138
139 structure := caps.GetStructureAt(0)
140 if structure == nil {
141 fmt.Println("Unable to get structure from caps")
142 return
143 }
144
145 name := structure.Name()
146 fmt.Printf("Structure Name: %s\n", name)
147 }
148
149 _, err = splitmuxsink.Connect("pad-added", onPadAdded)
150 if err != nil {
151 return fmt.Errorf("failed to add pad: %w", err)
152 }
153
154 defer cancel()
155 go func() {
156 err := HandleBusMessagesCustom(ctx, pipeline, func(msg *gst.Message) {
157 switch msg.Type() {
158 case gst.MessageElement:
159 structure := msg.GetStructure()
160 name := structure.Name()
161 if name == "splitmuxsink-fragment-opened" {
162 runningTime, err := structure.GetValue("running-time")
163 if err != nil {
164 log.Debug(ctx, "splitmuxsink-fragment-opened error", "error", err)
165 cancel()
166 }
167 runningTimeInt, ok := runningTime.(uint64)
168 if !ok {
169 log.Warn(ctx, "splitmuxsink-fragment-opened not a uint64")
170 cancel()
171 }
172 log.Debug(ctx, "hls-check splitmuxsink-fragment-opened", "runningTime", runningTimeInt)
173 if err := ps.FragmentOpened(ctx, runningTimeInt); err != nil {
174 log.Debug(ctx, "fragment open error", "error", err)
175 cancel()
176 }
177 }
178 if name == "splitmuxsink-fragment-closed" {
179 runningTime, err := structure.GetValue("running-time")
180 if err != nil {
181 log.Debug(ctx, "splitmuxsink-fragment-closed error", "error", err)
182 cancel()
183 }
184 runningTimeInt, ok := runningTime.(uint64)
185 if !ok {
186 log.Warn(ctx, "splitmuxsink-fragment-closed not a uint64")
187 cancel()
188 }
189 log.Debug(ctx, "hls-check splitmuxsink-fragment-closed", "runningTime", runningTimeInt)
190 if err := ps.FragmentClosed(ctx, runningTimeInt); err != nil {
191 log.Debug(ctx, "fragment close error", "error", err)
192 cancel()
193 }
194 }
195 }
196 })
197 if err != nil {
198 log.Log(ctx, "pipeline error", "error", err)
199 }
200 cancel()
201 }()
202
203 // Start the pipeline
204 if err := pipeline.SetState(gst.StatePlaying); err != nil {
205 return fmt.Errorf("error setting pipeline state: %w", err)
206 }
207
208 <-ctx.Done()
209
210 if err := pipeline.BlockSetState(gst.StateNull); err != nil {
211 return fmt.Errorf("error setting pipeline state: %w", err)
212 }
213
214 return nil
215}
216
217type PendingSegments struct {
218 segments []*Segment
219 lock sync.Mutex
220 rendition *M3U8Rendition
221}
222
223func NewPendingSegments(rendition *M3U8Rendition) *PendingSegments {
224 return &PendingSegments{
225 segments: []*Segment{},
226 lock: sync.Mutex{},
227 rendition: rendition,
228 }
229}
230
231func (ps *PendingSegments) GetNextSegment(ctx context.Context) (*Segment, error) {
232 ps.lock.Lock()
233 defer ps.lock.Unlock()
234 log.Debug(ctx, "next segment")
235 seg := &Segment{
236 Buf: &bytes.Buffer{},
237 Time: time.Now(),
238 Closed: false,
239 }
240 ps.segments = append(ps.segments, seg)
241 return seg, nil
242}
243
244func (ps *PendingSegments) CloseSegment(ctx context.Context, seg *Segment) {
245 ps.lock.Lock()
246 defer ps.lock.Unlock()
247 log.Debug(ctx, "close segment", "MSN", seg.MSN)
248 seg.Closed = true
249 if err := ps.checkSegments(ctx); err != nil {
250 log.Debug(ctx, "faile to check segments segment")
251 }
252}
253
254func (ps *PendingSegments) FragmentOpened(ctx context.Context, t uint64) error {
255 ps.lock.Lock()
256 defer ps.lock.Unlock()
257 log.Debug(ctx, "fragment opened", "time", t)
258 if len(ps.segments) == 0 {
259 return fmt.Errorf("no pending segments")
260 }
261 for _, seg := range ps.segments {
262 if seg.StartTS == nil {
263 seg.StartTS = &t
264 break
265 }
266 }
267 if err := ps.checkSegments(ctx); err != nil {
268 return fmt.Errorf("failed to check segments: %w", err)
269 }
270 return nil
271}
272
273func (ps *PendingSegments) FragmentClosed(ctx context.Context, t uint64) error {
274 ps.lock.Lock()
275 defer ps.lock.Unlock()
276 log.Debug(ctx, "fragment closed", "time", t)
277 if len(ps.segments) == 0 {
278 return fmt.Errorf("no pending segments")
279 }
280 for _, seg := range ps.segments {
281 if seg.EndTS == nil {
282 seg.EndTS = &t
283 dur := *seg.EndTS - *seg.StartTS
284 seg.Duration = time.Duration(dur)
285 break
286 }
287 }
288 if err := ps.checkSegments(ctx); err != nil {
289 return fmt.Errorf("failed to check segments: %w", err)
290 }
291 return nil
292}
293
294// the tricky piece of the design here is that we need to expect GetNextSegment,
295// CloseSegment, FragmentOpened, and FragmentClosed to be called in any order. So
296// all of those functions call this one, and it checks if we have the necessary information
297// to finalize a segment and add it to our playlist.
298// only call if you're holding ps.lock!
299func (ps *PendingSegments) checkSegments(ctx context.Context) error {
300 pending := ps.segments[0]
301 if pending.StartTS != nil && pending.EndTS != nil && pending.Closed {
302 if err := ps.rendition.NewSegment(pending); err != nil {
303 return fmt.Errorf("failed to add new segment: %w", err)
304 }
305 log.Debug(ctx, "finalizing segment", "MSN", pending.MSN)
306 ps.segments = ps.segments[1:]
307 }
308 return nil
309}