Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at eli/version-label 350 lines 9.4 kB view raw
1package media 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "io" 8 "os" 9 "strings" 10 "time" 11 12 "github.com/go-gst/go-gst/gst" 13 "github.com/go-gst/go-gst/gst/app" 14 "github.com/google/uuid" 15 "stream.place/streamplace/pkg/log" 16) 17 18type SegmentBuffer struct { 19 bytes []byte 20 pts *time.Duration 21 dur *time.Duration 22} 23 24type SegmentData struct { 25 Audio []SegmentBuffer 26 AudioCaps string 27 Video []SegmentBuffer 28 VideoCaps string 29} 30 31func SmearAudioTimestamps(ctx context.Context, input io.Reader, output io.Writer) error { 32 bs, err := io.ReadAll(input) 33 if err != nil { 34 return err 35 } 36 seg, err := ToBuffers(ctx, bytes.NewReader(bs)) 37 if err != nil { 38 // Write the input bytes to a file for debugging 39 debugFile := fmt.Sprintf("audio_smear_debug_%s.mp4", uuid.New().String()) 40 err = os.WriteFile(debugFile, bs, 0644) 41 if err != nil { 42 log.Log(ctx, "failed to write debug file", "error", err, "path", debugFile) 43 } else { 44 log.Log(ctx, "wrote debug file", "path", debugFile) 45 } 46 return err 47 } 48 49 err = seg.Normalize(ctx) 50 if err != nil { 51 return err 52 } 53 54 return JoinAudioVideo(ctx, seg, output) 55} 56 57func (s *SegmentData) Normalize(ctx context.Context) error { 58 if len(s.Video) == 0 { 59 return fmt.Errorf("no video segments") 60 } 61 if len(s.Audio) == 0 { 62 return fmt.Errorf("no audio segments") 63 } 64 65 lastVideo := s.Video[len(s.Video)-1] 66 lastAudio := s.Audio[len(s.Audio)-1] 67 68 if lastVideo.pts == nil { 69 return fmt.Errorf("last video segment has no pts") 70 } 71 if lastAudio.pts == nil { 72 return fmt.Errorf("last audio segment has no pts") 73 } 74 75 videoEnd := lastVideo.pts.Nanoseconds() + lastVideo.dur.Nanoseconds() 76 audioEnd := lastAudio.pts.Nanoseconds() + lastAudio.dur.Nanoseconds() 77 78 diff := videoEnd - audioEnd 79 diffPerAudio := diff / int64(len(s.Audio)-1) 80 for i, audio := range s.Audio { 81 newPts := time.Duration(audio.pts.Nanoseconds() + (diffPerAudio * int64(i))) 82 audio.pts = &newPts 83 s.Audio[i] = audio 84 } 85 86 lastVideo = s.Video[len(s.Video)-1] 87 lastAudio = s.Audio[len(s.Audio)-1] 88 89 return nil 90} 91 92func ToBuffers(ctx context.Context, input io.Reader) (*SegmentData, error) { 93 ctx = log.WithLogValues(ctx, "func", "SplitAudioVideo") 94 95 pipelineSlice := []string{ 96 "appsrc name=mp4src ! qtdemux name=demux", 97 "demux.video_0 ! queue ! h264parse name=videoparse disable-passthrough=true config-interval=-1 ! appsink sync=false name=videoappsink", 98 "demux.audio_0 ! queue ! opusparse name=audioparse ! appsink sync=false name=audioappsink", 99 } 100 101 ctx, cancel := context.WithCancel(ctx) 102 defer cancel() 103 104 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n")) 105 if err != nil { 106 return nil, fmt.Errorf("failed to create GStreamer pipeline: %w", err) 107 } 108 109 errCh := make(chan error) 110 go func() { 111 err := HandleBusMessages(ctx, pipeline) 112 cancel() 113 errCh <- err 114 close(errCh) 115 }() 116 117 defer func() { 118 err := <-errCh 119 if err != nil { 120 log.Error(ctx, "bus handler error", "error", err) 121 } 122 err = pipeline.BlockSetState(gst.StateNull) 123 if err != nil { 124 log.Error(ctx, "failed to set pipeline to null state", "error", err) 125 } 126 }() 127 128 mp4src, err := pipeline.GetElementByName("mp4src") 129 if err != nil { 130 return nil, fmt.Errorf("failed to get mp4src element: %w", err) 131 } 132 src := app.SrcFromElement(mp4src) 133 if src == nil { 134 return nil, fmt.Errorf("failed to get mp4src element: %w", err) 135 } 136 src.SetCallbacks(&app.SourceCallbacks{ 137 NeedDataFunc: ReaderNeedData(ctx, input), 138 }) 139 140 seg := SegmentData{ 141 Audio: []SegmentBuffer{}, 142 Video: []SegmentBuffer{}, 143 } 144 145 audioSinkElem, err := pipeline.GetElementByName("audioappsink") 146 if err != nil { 147 return nil, fmt.Errorf("failed to get audioappsink element: %w", err) 148 } 149 audioSink := app.SinkFromElement(audioSinkElem) 150 if audioSink == nil { 151 return nil, fmt.Errorf("failed to get audioappsink element: %w", err) 152 } 153 154 audioSink.SetCallbacks(&app.SinkCallbacks{ 155 NewSampleFunc: func(sink *app.Sink) gst.FlowReturn { 156 sample := sink.PullSample() 157 if sample == nil { 158 return gst.FlowOK 159 } 160 161 // Retrieve the buffer from the sample. 162 buffer := sample.GetBuffer() 163 // log.Log(ctx, "audio buffer", "presentation_timestamp", buffer.PresentationTimestamp(), "duration", buffer.Duration(), "dts", buffer.DecodingTimestamp()) 164 bs := buffer.Map(gst.MapRead).Bytes() 165 defer buffer.Unmap() 166 sinkPads, err := sink.GetSinkPads() 167 if err != nil { 168 src.Error("could not get sink pads", err) 169 return gst.FlowError 170 } 171 caps := sinkPads[0].GetCurrentCaps() 172 if caps != nil { 173 seg.AudioCaps = caps.String() 174 } 175 176 seg.Audio = append(seg.Audio, SegmentBuffer{ 177 bytes: bs, 178 pts: buffer.PresentationTimestamp().AsDuration(), 179 dur: buffer.Duration().AsDuration(), 180 }) 181 182 return gst.FlowOK 183 }, 184 }) 185 186 videoSinkElem, err := pipeline.GetElementByName("videoappsink") 187 if err != nil { 188 return nil, fmt.Errorf("failed to get videoappsink element: %w", err) 189 } 190 videoSink := app.SinkFromElement(videoSinkElem) 191 if videoSink == nil { 192 return nil, fmt.Errorf("failed to get videoappsink element: %w", err) 193 } 194 videoSink.SetCallbacks(&app.SinkCallbacks{ 195 NewSampleFunc: func(sink *app.Sink) gst.FlowReturn { 196 sample := sink.PullSample() 197 if sample == nil { 198 return gst.FlowOK 199 } 200 201 // Retrieve the buffer from the sample. 202 buffer := sample.GetBuffer() 203 // log.Log(ctx, "video buffer", "presentation_timestamp", buffer.PresentationTimestamp(), "duration", buffer.Duration()) 204 bs := buffer.Map(gst.MapRead).Bytes() 205 defer buffer.Unmap() 206 sinkPads, err := sink.GetSinkPads() 207 if err != nil { 208 src.Error("could not get sink pads", err) 209 return gst.FlowError 210 } 211 caps := sinkPads[0].GetCurrentCaps() 212 if caps != nil { 213 seg.VideoCaps = caps.String() 214 } 215 216 sb := SegmentBuffer{ 217 bytes: bs, 218 pts: buffer.PresentationTimestamp().AsDuration(), 219 dur: buffer.Duration().AsDuration(), 220 } 221 222 // log.Log(ctx, "video buffer", "presentation_timestamp", sb.pts, "duration", sb.dur) 223 if sb.pts == nil { 224 sink.Error("no video pts", fmt.Errorf("no video pts")) 225 return gst.FlowError 226 } 227 228 seg.Video = append(seg.Video, sb) 229 230 return gst.FlowOK 231 }, 232 }) 233 234 if err := pipeline.SetState(gst.StatePlaying); err != nil { 235 return nil, fmt.Errorf("failed to set pipeline state: %w", err) 236 } 237 238 <-ctx.Done() 239 240 return &seg, <-errCh 241} 242 243func JoinAudioVideo(ctx context.Context, seg *SegmentData, output io.Writer) error { 244 uu, _ := uuid.NewV7() 245 ctx = log.WithLogValues(ctx, "func", "JoinAudioVideo", "uuid", uu.String()) 246 247 pipelineSlice := []string{ 248 "mp4mux name=mux ! appsink sync=false name=mp4sink", 249 "appsrc name=videosrc format=time ! queue ! mux.video_0", 250 "appsrc name=audiosrc format=time ! queue ! mux.audio_0", 251 } 252 253 ctx, cancel := context.WithCancel(ctx) 254 defer cancel() 255 256 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n")) 257 if err != nil { 258 return fmt.Errorf("failed to create GStreamer pipeline: %w", err) 259 } 260 261 errCh := make(chan error) 262 go func() { 263 err := HandleBusMessages(ctx, pipeline) 264 cancel() 265 errCh <- err 266 close(errCh) 267 }() 268 269 defer func() { 270 err := <-errCh 271 if err != nil { 272 log.Error(ctx, "bus handler error", "error", err) 273 } 274 err = pipeline.BlockSetState(gst.StateNull) 275 if err != nil { 276 log.Error(ctx, "failed to set pipeline to null state", "error", err) 277 } 278 }() 279 280 videoSrcElem, err := pipeline.GetElementByName("videosrc") 281 if err != nil { 282 return fmt.Errorf("failed to get videosrc element: %w", err) 283 } 284 videoSrc := app.SrcFromElement(videoSrcElem) 285 if videoSrc == nil { 286 return fmt.Errorf("failed to get videosrc element: %w", err) 287 } 288 videoSrc.SetCaps(gst.NewCapsFromString(seg.VideoCaps)) 289 for _, seg := range seg.Video { 290 buf := gst.NewBufferFromBytes(seg.bytes) 291 if seg.pts != nil { 292 buf.SetPresentationTimestamp(gst.ClockTime(uint64(seg.pts.Nanoseconds()))) 293 } else { 294 videoSrc.Error("no video pts", fmt.Errorf("no video pts")) 295 return fmt.Errorf("no video pts") 296 } 297 if seg.dur != nil { 298 buf.SetDuration(gst.ClockTime(uint64(seg.dur.Nanoseconds()))) 299 } 300 ret := videoSrc.PushBuffer(buf) 301 if ret != gst.FlowOK { 302 return fmt.Errorf("failed to push video buffer: %s", ret) 303 } 304 } 305 306 audioSrcElem, err := pipeline.GetElementByName("audiosrc") 307 if err != nil { 308 return fmt.Errorf("failed to get audiosrc element: %w", err) 309 } 310 audioSrc := app.SrcFromElement(audioSrcElem) 311 if audioSrc == nil { 312 return fmt.Errorf("failed to get audiosrc element: %w", err) 313 } 314 audioSrc.SetCaps(gst.NewCapsFromString(seg.AudioCaps)) 315 for _, seg := range seg.Audio { 316 buf := gst.NewBufferFromBytes(seg.bytes) 317 if seg.pts != nil { 318 buf.SetPresentationTimestamp(gst.ClockTime(uint64(seg.pts.Nanoseconds()))) 319 } 320 if seg.dur != nil { 321 buf.SetDuration(gst.ClockTime(uint64(seg.dur.Nanoseconds()))) 322 } 323 ret := audioSrc.PushBuffer(buf) 324 if ret != gst.FlowOK { 325 return fmt.Errorf("failed to push audio buffer: %s", ret) 326 } 327 } 328 329 videoSrc.EndStream() 330 audioSrc.EndStream() 331 mp4sinkElem, err := pipeline.GetElementByName("mp4sink") 332 if err != nil { 333 return fmt.Errorf("failed to get mp4sink element: %w", err) 334 } 335 mp4sink := app.SinkFromElement(mp4sinkElem) 336 if mp4sink == nil { 337 return fmt.Errorf("failed to get mp4sink element: %w", err) 338 } 339 mp4sink.SetCallbacks(&app.SinkCallbacks{ 340 NewSampleFunc: WriterNewSample(ctx, output), 341 }) 342 343 if err := pipeline.SetState(gst.StatePlaying); err != nil { 344 return fmt.Errorf("failed to set pipeline state: %w", err) 345 } 346 347 <-ctx.Done() 348 349 return <-errCh 350}