Live video on the AT Protocol
at eli/bump-xcode 312 lines 8.6 kB view raw
1package media 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "strings" 8 "sync" 9 "time" 10 11 "github.com/go-gst/go-gst/gst" 12 "github.com/go-gst/go-gst/gst/app" 13 "stream.place/streamplace/pkg/log" 14) 15 16func (mm *MediaManager) ToHLS(ctx context.Context, user string, rendition string, m3u8 *M3U8) error { 17 ctx = log.WithLogValues(ctx, "GStreamerFunc", "ToHLS", "rendition", rendition) 18 19 pipelineSlice := []string{ 20 "h264parse name=videoparse", 21 "opusdec use-inband-fec=true name=audioparse ! audioresample ! audiorate ! fdkaacenc name=audioenc", 22 } 23 24 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n")) 25 if err != nil { 26 return fmt.Errorf("error creating ToHLS pipeline: %w", err) 27 } 28 29 outputQueue, done, err := ConcatStream(ctx, pipeline, user, rendition, mm.bus) 30 if err != nil { 31 return fmt.Errorf("failed to get output queue: %w", err) 32 } 33 34 videoParse, err := pipeline.GetElementByName("videoparse") 35 if err != nil { 36 return fmt.Errorf("failed to get video sink element from pipeline: %w", err) 37 } 38 err = outputQueue.Link(videoParse) 39 if err != nil { 40 return fmt.Errorf("failed to link output queue to video parse: %w", err) 41 } 42 43 audioParse, err := pipeline.GetElementByName("audioparse") 44 if err != nil { 45 return fmt.Errorf("failed to get audio parse element from pipeline: %w", err) 46 } 47 err = outputQueue.Link(audioParse) 48 if err != nil { 49 return fmt.Errorf("failed to link output queue to audio parse: %w", err) 50 } 51 52 splitmuxsink, err := gst.NewElementWithProperties("splitmuxsink", map[string]any{ 53 "name": "mux", 54 "async-finalize": true, 55 "sink-factory": "appsink", 56 "muxer-factory": "mpegtsmux", 57 "max-size-bytes": 1, 58 }) 59 if err != nil { 60 return err 61 } 62 63 r, err := m3u8.GetRendition(rendition) 64 if err != nil { 65 return fmt.Errorf("failed to get rendition: %w", err) 66 } 67 defer func() { r = nil }() 68 ps := NewPendingSegments(r) 69 defer func() { ps = nil }() 70 71 p := splitmuxsink.GetRequestPad("video") 72 if p == nil { 73 return fmt.Errorf("failed to get video pad") 74 } 75 p = splitmuxsink.GetRequestPad("audio_%u") 76 if p == nil { 77 return fmt.Errorf("failed to get audio pad") 78 } 79 80 err = pipeline.Add(splitmuxsink) 81 if err != nil { 82 return fmt.Errorf("error adding splitmuxsink to ToHLS pipeline: %w", err) 83 } 84 85 videoparse, err := pipeline.GetElementByName("videoparse") 86 if err != nil { 87 return fmt.Errorf("error getting videoparse from ToHLS pipeline: %w", err) 88 } 89 err = videoparse.Link(splitmuxsink) 90 if err != nil { 91 return fmt.Errorf("error linking videoparse to splitmuxsink: %w", err) 92 } 93 94 audioenc, err := pipeline.GetElementByName("audioenc") 95 if err != nil { 96 return fmt.Errorf("error getting audioenc from ToHLS pipeline: %w", err) 97 } 98 err = audioenc.Link(splitmuxsink) 99 if err != nil { 100 return fmt.Errorf("error linking audioenc to splitmuxsink: %w", err) 101 } 102 103 ctx, cancel := context.WithCancel(ctx) 104 105 go func() { 106 select { 107 case <-ctx.Done(): 108 return 109 case <-done: 110 cancel() 111 } 112 }() 113 114 _, err = splitmuxsink.Connect("sink-added", func(split, sinkEle *gst.Element) { 115 log.Debug(ctx, "hls-check sink-added") 116 vf, err := ps.GetNextSegment(ctx) 117 if err != nil { 118 panic(err) 119 } 120 appsink := app.SinkFromElement(sinkEle) 121 appsink.SetCallbacks(&app.SinkCallbacks{ 122 NewSampleFunc: WriterNewSample(ctx, vf.Buf), 123 EOSFunc: func(sink *app.Sink) { 124 log.Debug(ctx, "hls-check Segment EOS", "buf", vf.Buf.Len()) 125 ps.CloseSegment(ctx, vf) 126 }, 127 }) 128 }) 129 if err != nil { 130 return fmt.Errorf("failed to add hls-check to sink: %w", err) 131 } 132 133 onPadAdded := func(element *gst.Element, pad *gst.Pad) { 134 caps := pad.GetCurrentCaps() 135 if caps == nil { 136 fmt.Println("Unable to get pad caps") 137 return 138 } 139 140 log.Debug(ctx, "New pad added", "pad", pad.GetName(), "caps", caps.String()) 141 142 structure := caps.GetStructureAt(0) 143 if structure == nil { 144 fmt.Println("Unable to get structure from caps") 145 return 146 } 147 148 name := structure.Name() 149 fmt.Printf("Structure Name: %s\n", name) 150 } 151 152 _, err = splitmuxsink.Connect("pad-added", onPadAdded) 153 if err != nil { 154 return fmt.Errorf("failed to add pad: %w", err) 155 } 156 157 defer cancel() 158 go func() { 159 err := HandleBusMessagesCustom(ctx, pipeline, func(msg *gst.Message) { 160 switch msg.Type() { 161 case gst.MessageElement: 162 structure := msg.GetStructure() 163 name := structure.Name() 164 if name == "splitmuxsink-fragment-opened" { 165 runningTime, err := structure.GetValue("running-time") 166 if err != nil { 167 log.Debug(ctx, "splitmuxsink-fragment-opened error", "error", err) 168 cancel() 169 } 170 runningTimeInt, ok := runningTime.(uint64) 171 if !ok { 172 log.Warn(ctx, "splitmuxsink-fragment-opened not a uint64") 173 cancel() 174 } 175 log.Debug(ctx, "hls-check splitmuxsink-fragment-opened", "runningTime", runningTimeInt) 176 if err := ps.FragmentOpened(ctx, runningTimeInt); err != nil { 177 log.Debug(ctx, "fragment open error", "error", err) 178 cancel() 179 } 180 } 181 if name == "splitmuxsink-fragment-closed" { 182 runningTime, err := structure.GetValue("running-time") 183 if err != nil { 184 log.Debug(ctx, "splitmuxsink-fragment-closed error", "error", err) 185 cancel() 186 } 187 runningTimeInt, ok := runningTime.(uint64) 188 if !ok { 189 log.Warn(ctx, "splitmuxsink-fragment-closed not a uint64") 190 cancel() 191 } 192 log.Debug(ctx, "hls-check splitmuxsink-fragment-closed", "runningTime", runningTimeInt) 193 if err := ps.FragmentClosed(ctx, runningTimeInt); err != nil { 194 log.Debug(ctx, "fragment close error", "error", err) 195 cancel() 196 } 197 } 198 } 199 }) 200 if err != nil { 201 log.Log(ctx, "pipeline error", "error", err) 202 } 203 cancel() 204 }() 205 206 // Start the pipeline 207 if err := pipeline.SetState(gst.StatePlaying); err != nil { 208 return fmt.Errorf("error setting pipeline state: %w", err) 209 } 210 211 <-ctx.Done() 212 213 if err := pipeline.BlockSetState(gst.StateNull); err != nil { 214 return fmt.Errorf("error setting pipeline state: %w", err) 215 } 216 217 return nil 218} 219 220type PendingSegments struct { 221 segments []*Segment 222 lock sync.Mutex 223 rendition *M3U8Rendition 224} 225 226func NewPendingSegments(rendition *M3U8Rendition) *PendingSegments { 227 return &PendingSegments{ 228 segments: []*Segment{}, 229 lock: sync.Mutex{}, 230 rendition: rendition, 231 } 232} 233 234func (ps *PendingSegments) GetNextSegment(ctx context.Context) (*Segment, error) { 235 ps.lock.Lock() 236 defer ps.lock.Unlock() 237 log.Debug(ctx, "next segment") 238 seg := &Segment{ 239 Buf: &bytes.Buffer{}, 240 Time: time.Now(), 241 Closed: false, 242 } 243 ps.segments = append(ps.segments, seg) 244 return seg, nil 245} 246 247func (ps *PendingSegments) CloseSegment(ctx context.Context, seg *Segment) { 248 ps.lock.Lock() 249 defer ps.lock.Unlock() 250 log.Debug(ctx, "close segment", "MSN", seg.MSN) 251 seg.Closed = true 252 if err := ps.checkSegments(ctx); err != nil { 253 log.Debug(ctx, "faile to check segments segment") 254 } 255} 256 257func (ps *PendingSegments) FragmentOpened(ctx context.Context, t uint64) error { 258 ps.lock.Lock() 259 defer ps.lock.Unlock() 260 log.Debug(ctx, "fragment opened", "time", t) 261 if len(ps.segments) == 0 { 262 return fmt.Errorf("no pending segments") 263 } 264 for _, seg := range ps.segments { 265 if seg.StartTS == nil { 266 seg.StartTS = &t 267 break 268 } 269 } 270 if err := ps.checkSegments(ctx); err != nil { 271 return fmt.Errorf("failed to check segments: %w", err) 272 } 273 return nil 274} 275 276func (ps *PendingSegments) FragmentClosed(ctx context.Context, t uint64) error { 277 ps.lock.Lock() 278 defer ps.lock.Unlock() 279 log.Debug(ctx, "fragment closed", "time", t) 280 if len(ps.segments) == 0 { 281 return fmt.Errorf("no pending segments") 282 } 283 for _, seg := range ps.segments { 284 if seg.EndTS == nil { 285 seg.EndTS = &t 286 dur := *seg.EndTS - *seg.StartTS 287 seg.Duration = time.Duration(dur) 288 break 289 } 290 } 291 if err := ps.checkSegments(ctx); err != nil { 292 return fmt.Errorf("failed to check segments: %w", err) 293 } 294 return nil 295} 296 297// the tricky piece of the design here is that we need to expect GetNextSegment, 298// CloseSegment, FragmentOpened, and FragmentClosed to be called in any order. So 299// all of those functions call this one, and it checks if we have the necessary information 300// to finalize a segment and add it to our playlist. 301// only call if you're holding ps.lock! 302func (ps *PendingSegments) checkSegments(ctx context.Context) error { 303 pending := ps.segments[0] 304 if pending.StartTS != nil && pending.EndTS != nil && pending.Closed { 305 if err := ps.rendition.NewSegment(pending); err != nil { 306 return fmt.Errorf("failed to add new segment: %w", err) 307 } 308 log.Debug(ctx, "finalizing segment", "MSN", pending.MSN) 309 ps.segments = ps.segments[1:] 310 } 311 return nil 312}