Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at v0.8.13 344 lines 9.3 kB view raw
1package media 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "io" 8 "strings" 9 "time" 10 11 "github.com/go-gst/go-gst/gst" 12 "github.com/go-gst/go-gst/gst/app" 13 "github.com/google/uuid" 14 "stream.place/streamplace/pkg/config" 15 "stream.place/streamplace/pkg/log" 16) 17 18type SegmentBuffer struct { 19 bytes []byte 20 pts *time.Duration 21 dur *time.Duration 22} 23 24type SegmentData struct { 25 Audio []SegmentBuffer 26 AudioCaps string 27 Video []SegmentBuffer 28 VideoCaps string 29} 30 31func RewriteAudioTimestamps(ctx context.Context, cli *config.CLI, input io.Reader, output io.Writer, doSmear bool) error { 32 bs, err := io.ReadAll(input) 33 if err != nil { 34 return err 35 } 36 seg, err := ToBuffers(ctx, bytes.NewReader(bs)) 37 if err != nil { 38 cli.DumpDebugSegment(ctx, "audio_smear_input", bytes.NewReader(bs)) 39 return err 40 } 41 42 if doSmear { 43 err = seg.Normalize(ctx) 44 if err != nil { 45 return err 46 } 47 } 48 49 return JoinAudioVideo(ctx, seg, output) 50} 51 52func (s *SegmentData) Normalize(ctx context.Context) error { 53 if len(s.Video) == 0 { 54 return fmt.Errorf("no video segments") 55 } 56 if len(s.Audio) == 0 { 57 return fmt.Errorf("no audio segments") 58 } 59 60 lastVideo := s.Video[len(s.Video)-1] 61 lastAudio := s.Audio[len(s.Audio)-1] 62 63 if lastVideo.pts == nil { 64 return fmt.Errorf("last video segment has no pts") 65 } 66 if lastAudio.pts == nil { 67 return fmt.Errorf("last audio segment has no pts") 68 } 69 70 videoEnd := lastVideo.pts.Nanoseconds() + lastVideo.dur.Nanoseconds() 71 audioEnd := lastAudio.pts.Nanoseconds() + lastAudio.dur.Nanoseconds() 72 73 diff := videoEnd - audioEnd 74 diffPerAudio := diff / int64(len(s.Audio)-1) 75 for i, audio := range s.Audio { 76 newPts := time.Duration(audio.pts.Nanoseconds() + (diffPerAudio * int64(i))) 77 audio.pts = &newPts 78 s.Audio[i] = audio 79 } 80 81 lastVideo = s.Video[len(s.Video)-1] 82 lastAudio = s.Audio[len(s.Audio)-1] 83 84 return nil 85} 86 87func ToBuffers(ctx context.Context, input io.Reader) (*SegmentData, error) { 88 89 ctx, cancel := context.WithCancel(ctx) 90 defer cancel() 91 ctx = log.WithLogValues(ctx, "func", "SplitAudioVideo") 92 93 pipelineSlice := []string{ 94 "appsrc name=mp4src ! qtdemux name=demux", 95 "demux.video_0 ! queue ! appsink sync=false name=videoappsink", 96 "demux.audio_0 ! queue ! opusparse name=audioparse ! appsink sync=false name=audioappsink", 97 } 98 99 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n")) 100 if err != nil { 101 return nil, fmt.Errorf("failed to create GStreamer pipeline: %w", err) 102 } 103 104 mp4src, err := pipeline.GetElementByName("mp4src") 105 if err != nil { 106 return nil, fmt.Errorf("failed to get mp4src element: %w", err) 107 } 108 src := app.SrcFromElement(mp4src) 109 if src == nil { 110 return nil, fmt.Errorf("failed to get mp4src element: %w", err) 111 } 112 src.SetCallbacks(&app.SourceCallbacks{ 113 NeedDataFunc: ReaderNeedData(ctx, input), 114 }) 115 116 seg := SegmentData{ 117 Audio: []SegmentBuffer{}, 118 Video: []SegmentBuffer{}, 119 } 120 121 audioSinkElem, err := pipeline.GetElementByName("audioappsink") 122 if err != nil { 123 return nil, fmt.Errorf("failed to get audioappsink element: %w", err) 124 } 125 audioSink := app.SinkFromElement(audioSinkElem) 126 if audioSink == nil { 127 return nil, fmt.Errorf("failed to get audioappsink element: %w", err) 128 } 129 130 audioSink.SetCallbacks(&app.SinkCallbacks{ 131 NewSampleFunc: func(sink *app.Sink) gst.FlowReturn { 132 sample := sink.PullSample() 133 if sample == nil { 134 return gst.FlowOK 135 } 136 137 // Retrieve the buffer from the sample. 138 buffer := sample.GetBuffer() 139 // log.Log(ctx, "audio buffer", "presentation_timestamp", buffer.PresentationTimestamp(), "duration", buffer.Duration(), "dts", buffer.DecodingTimestamp()) 140 bs := buffer.Map(gst.MapRead).Bytes() 141 defer buffer.Unmap() 142 sinkPads, err := sink.GetSinkPads() 143 if err != nil { 144 src.Error("could not get sink pads", err) 145 return gst.FlowError 146 } 147 caps := sinkPads[0].GetCurrentCaps() 148 if caps != nil { 149 seg.AudioCaps = caps.String() 150 } 151 152 seg.Audio = append(seg.Audio, SegmentBuffer{ 153 bytes: bs, 154 pts: buffer.PresentationTimestamp().AsDuration(), 155 dur: buffer.Duration().AsDuration(), 156 }) 157 158 return gst.FlowOK 159 }, 160 }) 161 162 videoSinkElem, err := pipeline.GetElementByName("videoappsink") 163 if err != nil { 164 return nil, fmt.Errorf("failed to get videoappsink element: %w", err) 165 } 166 videoSink := app.SinkFromElement(videoSinkElem) 167 if videoSink == nil { 168 return nil, fmt.Errorf("failed to get videoappsink element: %w", err) 169 } 170 videoSink.SetCallbacks(&app.SinkCallbacks{ 171 NewSampleFunc: func(sink *app.Sink) gst.FlowReturn { 172 sample := sink.PullSample() 173 if sample == nil { 174 return gst.FlowOK 175 } 176 177 // Retrieve the buffer from the sample. 178 buffer := sample.GetBuffer() 179 // log.Log(ctx, "video buffer", "presentation_timestamp", buffer.PresentationTimestamp(), "duration", buffer.Duration()) 180 bs := buffer.Map(gst.MapRead).Bytes() 181 defer buffer.Unmap() 182 sinkPads, err := sink.GetSinkPads() 183 if err != nil { 184 src.Error("could not get sink pads", err) 185 return gst.FlowError 186 } 187 caps := sinkPads[0].GetCurrentCaps() 188 if caps != nil { 189 seg.VideoCaps = caps.String() 190 } 191 192 sb := SegmentBuffer{ 193 bytes: bs, 194 pts: buffer.PresentationTimestamp().AsDuration(), 195 dur: buffer.Duration().AsDuration(), 196 } 197 198 // log.Log(ctx, "video buffer", "presentation_timestamp", sb.pts, "duration", sb.dur) 199 if sb.pts == nil { 200 sink.Error("no video pts", fmt.Errorf("no video pts")) 201 return gst.FlowError 202 } 203 204 seg.Video = append(seg.Video, sb) 205 206 return gst.FlowOK 207 }, 208 }) 209 errCh := make(chan error) 210 go func() { 211 err := HandleBusMessages(ctx, pipeline) 212 errCh <- err 213 }() 214 215 defer func() { 216 if err != nil { 217 log.Error(ctx, "bus handler error", "error", err) 218 } 219 err = pipeline.SetState(gst.StateNull) 220 if err != nil { 221 log.Error(ctx, "failed to set pipeline to null state", "error", err) 222 } 223 }() 224 225 if err := pipeline.SetState(gst.StatePlaying); err != nil { 226 return nil, fmt.Errorf("failed to set pipeline state: %w", err) 227 } 228 229 pipelineErr := <-errCh 230 231 if pipelineErr != nil { 232 return nil, fmt.Errorf("pipeline error: %w", pipelineErr) 233 } 234 235 if len(seg.Video) == 0 { 236 return nil, fmt.Errorf("no video segments when rewriting audio") 237 } 238 if len(seg.Audio) == 0 { 239 return nil, fmt.Errorf("no audio segments when rewriting audio") 240 } 241 242 return &seg, nil 243} 244 245func JoinAudioVideo(ctx context.Context, seg *SegmentData, output io.Writer) error { 246 uu, _ := uuid.NewV7() 247 ctx = log.WithLogValues(ctx, "func", "JoinAudioVideo", "uuid", uu.String()) 248 249 pipelineSlice := []string{ 250 "mp4mux name=mux ! appsink sync=false name=mp4sink", 251 "appsrc name=videosrc format=time ! queue ! mux.video_0", 252 "appsrc name=audiosrc format=time ! queue ! mux.audio_0", 253 } 254 255 ctx, cancel := context.WithCancel(ctx) 256 defer cancel() 257 258 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n")) 259 if err != nil { 260 return fmt.Errorf("failed to create GStreamer pipeline: %w", err) 261 } 262 263 errCh := make(chan error) 264 go func() { 265 err := HandleBusMessages(ctx, pipeline) 266 errCh <- err 267 }() 268 269 videoSrcElem, err := pipeline.GetElementByName("videosrc") 270 if err != nil { 271 return fmt.Errorf("failed to get videosrc element: %w", err) 272 } 273 videoSrc := app.SrcFromElement(videoSrcElem) 274 if videoSrc == nil { 275 return fmt.Errorf("failed to get videosrc element: %w", err) 276 } 277 videoSrc.SetCaps(gst.NewCapsFromString(seg.VideoCaps)) 278 for _, seg := range seg.Video { 279 buf := gst.NewBufferFromBytes(seg.bytes) 280 if seg.pts != nil { 281 buf.SetPresentationTimestamp(gst.ClockTime(uint64(seg.pts.Nanoseconds()))) 282 } else { 283 videoSrc.Error("no video pts", fmt.Errorf("no video pts")) 284 return fmt.Errorf("no video pts") 285 } 286 if seg.dur != nil { 287 buf.SetDuration(gst.ClockTime(uint64(seg.dur.Nanoseconds()))) 288 } 289 ret := videoSrc.PushBuffer(buf) 290 if ret != gst.FlowOK { 291 return fmt.Errorf("failed to push video buffer: %s", ret) 292 } 293 } 294 295 audioSrcElem, err := pipeline.GetElementByName("audiosrc") 296 if err != nil { 297 return fmt.Errorf("failed to get audiosrc element: %w", err) 298 } 299 audioSrc := app.SrcFromElement(audioSrcElem) 300 if audioSrc == nil { 301 return fmt.Errorf("failed to get audiosrc element: %w", err) 302 } 303 audioSrc.SetCaps(gst.NewCapsFromString(seg.AudioCaps)) 304 for _, seg := range seg.Audio { 305 buf := gst.NewBufferFromBytes(seg.bytes) 306 if seg.pts != nil { 307 buf.SetPresentationTimestamp(gst.ClockTime(uint64(seg.pts.Nanoseconds()))) 308 } 309 if seg.dur != nil { 310 buf.SetDuration(gst.ClockTime(uint64(seg.dur.Nanoseconds()))) 311 } 312 ret := audioSrc.PushBuffer(buf) 313 if ret != gst.FlowOK { 314 return fmt.Errorf("failed to push audio buffer: %s", ret) 315 } 316 } 317 318 videoSrc.EndStream() 319 audioSrc.EndStream() 320 mp4sinkElem, err := pipeline.GetElementByName("mp4sink") 321 if err != nil { 322 return fmt.Errorf("failed to get mp4sink element: %w", err) 323 } 324 mp4sink := app.SinkFromElement(mp4sinkElem) 325 if mp4sink == nil { 326 return fmt.Errorf("failed to get mp4sink element: %w", err) 327 } 328 mp4sink.SetCallbacks(&app.SinkCallbacks{ 329 NewSampleFunc: WriterNewSample(ctx, output), 330 }) 331 332 defer func() { 333 err = pipeline.SetState(gst.StateNull) 334 if err != nil { 335 log.Error(ctx, "failed to set pipeline to null state", "error", err) 336 } 337 }() 338 339 if err := pipeline.SetState(gst.StatePlaying); err != nil { 340 return fmt.Errorf("failed to set pipeline state: %w", err) 341 } 342 343 return <-errCh 344}