Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at build-4 309 lines 8.5 kB view raw
1package media 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "strings" 8 "sync" 9 "time" 10 11 "github.com/go-gst/go-gst/gst" 12 "github.com/go-gst/go-gst/gst/app" 13 "stream.place/streamplace/pkg/log" 14) 15 16func (mm *MediaManager) ToHLS(ctx context.Context, user string, rendition string, m3u8 *M3U8) error { 17 ctx = log.WithLogValues(ctx, "GStreamerFunc", "ToHLS", "rendition", rendition) 18 19 pipelineSlice := []string{ 20 "h264parse name=videoparse", 21 "opusdec use-inband-fec=true name=audioparse ! audioresample ! audiorate ! fdkaacenc name=audioenc", 22 } 23 24 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n")) 25 if err != nil { 26 return fmt.Errorf("error creating ToHLS pipeline: %w", err) 27 } 28 29 outputQueue, done, err := ConcatStream(ctx, pipeline, user, rendition, mm) 30 if err != nil { 31 return fmt.Errorf("failed to get output queue: %w", err) 32 } 33 34 videoParse, err := pipeline.GetElementByName("videoparse") 35 if err != nil { 36 return fmt.Errorf("failed to get video sink element from pipeline: %w", err) 37 } 38 err = outputQueue.Link(videoParse) 39 if err != nil { 40 return fmt.Errorf("failed to link output queue to video parse: %w", err) 41 } 42 43 audioParse, err := pipeline.GetElementByName("audioparse") 44 if err != nil { 45 return fmt.Errorf("failed to get audio parse element from pipeline: %w", err) 46 } 47 err = outputQueue.Link(audioParse) 48 if err != nil { 49 return fmt.Errorf("failed to link output queue to audio parse: %w", err) 50 } 51 52 splitmuxsink, err := gst.NewElementWithProperties("splitmuxsink", map[string]any{ 53 "name": "mux", 54 "async-finalize": true, 55 "sink-factory": "appsink", 56 "muxer-factory": "mpegtsmux", 57 "max-size-bytes": 1, 58 }) 59 if err != nil { 60 return err 61 } 62 63 r := m3u8.GetRendition(rendition) 64 defer func() { r = nil }() 65 ps := NewPendingSegments(r) 66 defer func() { ps = nil }() 67 68 p := splitmuxsink.GetRequestPad("video") 69 if p == nil { 70 return fmt.Errorf("failed to get video pad") 71 } 72 p = splitmuxsink.GetRequestPad("audio_%u") 73 if p == nil { 74 return fmt.Errorf("failed to get audio pad") 75 } 76 77 err = pipeline.Add(splitmuxsink) 78 if err != nil { 79 return fmt.Errorf("error adding splitmuxsink to ToHLS pipeline: %w", err) 80 } 81 82 videoparse, err := pipeline.GetElementByName("videoparse") 83 if err != nil { 84 return fmt.Errorf("error getting videoparse from ToHLS pipeline: %w", err) 85 } 86 err = videoparse.Link(splitmuxsink) 87 if err != nil { 88 return fmt.Errorf("error linking videoparse to splitmuxsink: %w", err) 89 } 90 91 audioenc, err := pipeline.GetElementByName("audioenc") 92 if err != nil { 93 return fmt.Errorf("error getting audioenc from ToHLS pipeline: %w", err) 94 } 95 err = audioenc.Link(splitmuxsink) 96 if err != nil { 97 return fmt.Errorf("error linking audioenc to splitmuxsink: %w", err) 98 } 99 100 ctx, cancel := context.WithCancel(ctx) 101 102 go func() { 103 select { 104 case <-ctx.Done(): 105 return 106 case <-done: 107 cancel() 108 } 109 }() 110 111 _, err = splitmuxsink.Connect("sink-added", func(split, sinkEle *gst.Element) { 112 log.Debug(ctx, "hls-check sink-added") 113 vf, err := ps.GetNextSegment(ctx) 114 if err != nil { 115 panic(err) 116 } 117 appsink := app.SinkFromElement(sinkEle) 118 appsink.SetCallbacks(&app.SinkCallbacks{ 119 NewSampleFunc: WriterNewSample(ctx, vf.Buf), 120 EOSFunc: func(sink *app.Sink) { 121 log.Debug(ctx, "hls-check Segment EOS", "buf", vf.Buf.Len()) 122 ps.CloseSegment(ctx, vf) 123 }, 124 }) 125 }) 126 if err != nil { 127 return fmt.Errorf("failed to add hls-check to sink: %w", err) 128 } 129 130 onPadAdded := func(element *gst.Element, pad *gst.Pad) { 131 caps := pad.GetCurrentCaps() 132 if caps == nil { 133 fmt.Println("Unable to get pad caps") 134 return 135 } 136 137 log.Debug(ctx, "New pad added", "pad", pad.GetName(), "caps", caps.String()) 138 139 structure := caps.GetStructureAt(0) 140 if structure == nil { 141 fmt.Println("Unable to get structure from caps") 142 return 143 } 144 145 name := structure.Name() 146 fmt.Printf("Structure Name: %s\n", name) 147 } 148 149 _, err = splitmuxsink.Connect("pad-added", onPadAdded) 150 if err != nil { 151 return fmt.Errorf("failed to add pad: %w", err) 152 } 153 154 defer cancel() 155 go func() { 156 err := HandleBusMessagesCustom(ctx, pipeline, func(msg *gst.Message) { 157 switch msg.Type() { 158 case gst.MessageElement: 159 structure := msg.GetStructure() 160 name := structure.Name() 161 if name == "splitmuxsink-fragment-opened" { 162 runningTime, err := structure.GetValue("running-time") 163 if err != nil { 164 log.Debug(ctx, "splitmuxsink-fragment-opened error", "error", err) 165 cancel() 166 } 167 runningTimeInt, ok := runningTime.(uint64) 168 if !ok { 169 log.Warn(ctx, "splitmuxsink-fragment-opened not a uint64") 170 cancel() 171 } 172 log.Debug(ctx, "hls-check splitmuxsink-fragment-opened", "runningTime", runningTimeInt) 173 if err := ps.FragmentOpened(ctx, runningTimeInt); err != nil { 174 log.Debug(ctx, "fragment open error", "error", err) 175 cancel() 176 } 177 } 178 if name == "splitmuxsink-fragment-closed" { 179 runningTime, err := structure.GetValue("running-time") 180 if err != nil { 181 log.Debug(ctx, "splitmuxsink-fragment-closed error", "error", err) 182 cancel() 183 } 184 runningTimeInt, ok := runningTime.(uint64) 185 if !ok { 186 log.Warn(ctx, "splitmuxsink-fragment-closed not a uint64") 187 cancel() 188 } 189 log.Debug(ctx, "hls-check splitmuxsink-fragment-closed", "runningTime", runningTimeInt) 190 if err := ps.FragmentClosed(ctx, runningTimeInt); err != nil { 191 log.Debug(ctx, "fragment close error", "error", err) 192 cancel() 193 } 194 } 195 } 196 }) 197 if err != nil { 198 log.Log(ctx, "pipeline error", "error", err) 199 } 200 cancel() 201 }() 202 203 // Start the pipeline 204 if err := pipeline.SetState(gst.StatePlaying); err != nil { 205 return fmt.Errorf("error setting pipeline state: %w", err) 206 } 207 208 <-ctx.Done() 209 210 if err := pipeline.BlockSetState(gst.StateNull); err != nil { 211 return fmt.Errorf("error setting pipeline state: %w", err) 212 } 213 214 return nil 215} 216 217type PendingSegments struct { 218 segments []*Segment 219 lock sync.Mutex 220 rendition *M3U8Rendition 221} 222 223func NewPendingSegments(rendition *M3U8Rendition) *PendingSegments { 224 return &PendingSegments{ 225 segments: []*Segment{}, 226 lock: sync.Mutex{}, 227 rendition: rendition, 228 } 229} 230 231func (ps *PendingSegments) GetNextSegment(ctx context.Context) (*Segment, error) { 232 ps.lock.Lock() 233 defer ps.lock.Unlock() 234 log.Debug(ctx, "next segment") 235 seg := &Segment{ 236 Buf: &bytes.Buffer{}, 237 Time: time.Now(), 238 Closed: false, 239 } 240 ps.segments = append(ps.segments, seg) 241 return seg, nil 242} 243 244func (ps *PendingSegments) CloseSegment(ctx context.Context, seg *Segment) { 245 ps.lock.Lock() 246 defer ps.lock.Unlock() 247 log.Debug(ctx, "close segment", "MSN", seg.MSN) 248 seg.Closed = true 249 if err := ps.checkSegments(ctx); err != nil { 250 log.Debug(ctx, "faile to check segments segment") 251 } 252} 253 254func (ps *PendingSegments) FragmentOpened(ctx context.Context, t uint64) error { 255 ps.lock.Lock() 256 defer ps.lock.Unlock() 257 log.Debug(ctx, "fragment opened", "time", t) 258 if len(ps.segments) == 0 { 259 return fmt.Errorf("no pending segments") 260 } 261 for _, seg := range ps.segments { 262 if seg.StartTS == nil { 263 seg.StartTS = &t 264 break 265 } 266 } 267 if err := ps.checkSegments(ctx); err != nil { 268 return fmt.Errorf("failed to check segments: %w", err) 269 } 270 return nil 271} 272 273func (ps *PendingSegments) FragmentClosed(ctx context.Context, t uint64) error { 274 ps.lock.Lock() 275 defer ps.lock.Unlock() 276 log.Debug(ctx, "fragment closed", "time", t) 277 if len(ps.segments) == 0 { 278 return fmt.Errorf("no pending segments") 279 } 280 for _, seg := range ps.segments { 281 if seg.EndTS == nil { 282 seg.EndTS = &t 283 dur := *seg.EndTS - *seg.StartTS 284 seg.Duration = time.Duration(dur) 285 break 286 } 287 } 288 if err := ps.checkSegments(ctx); err != nil { 289 return fmt.Errorf("failed to check segments: %w", err) 290 } 291 return nil 292} 293 294// the tricky piece of the design here is that we need to expect GetNextSegment, 295// CloseSegment, FragmentOpened, and FragmentClosed to be called in any order. So 296// all of those functions call this one, and it checks if we have the necessary information 297// to finalize a segment and add it to our playlist. 298// only call if you're holding ps.lock! 299func (ps *PendingSegments) checkSegments(ctx context.Context) error { 300 pending := ps.segments[0] 301 if pending.StartTS != nil && pending.EndTS != nil && pending.Closed { 302 if err := ps.rendition.NewSegment(pending); err != nil { 303 return fmt.Errorf("failed to add new segment: %w", err) 304 } 305 log.Debug(ctx, "finalizing segment", "MSN", pending.MSN) 306 ps.segments = ps.segments[1:] 307 } 308 return nil 309}