Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at natb/sync-client-time 344 lines 9.3 kB view raw
1package media 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "io" 8 "strings" 9 "time" 10 11 "github.com/go-gst/go-gst/gst" 12 "github.com/go-gst/go-gst/gst/app" 13 "github.com/google/uuid" 14 "stream.place/streamplace/pkg/config" 15 "stream.place/streamplace/pkg/log" 16) 17 18type SegmentBuffer struct { 19 bytes []byte 20 pts *time.Duration 21 dur *time.Duration 22} 23 24type SegmentData struct { 25 Audio []SegmentBuffer 26 AudioCaps string 27 Video []SegmentBuffer 28 VideoCaps string 29} 30 31func RewriteAudioTimestamps(ctx context.Context, cli *config.CLI, input io.Reader, output io.Writer, doSmear bool) error { 32 bs, err := io.ReadAll(input) 33 if err != nil { 34 return err 35 } 36 seg, err := ToBuffers(ctx, bytes.NewReader(bs)) 37 if err != nil { 38 cli.DumpDebugSegment(ctx, "audio_smear_input", bytes.NewReader(bs)) 39 return err 40 } 41 42 if doSmear { 43 err = seg.Normalize(ctx) 44 if err != nil { 45 return err 46 } 47 } 48 49 return JoinAudioVideo(ctx, seg, output) 50} 51 52func (s *SegmentData) Normalize(ctx context.Context) error { 53 if len(s.Video) == 0 { 54 return fmt.Errorf("no video segments") 55 } 56 if len(s.Audio) == 0 { 57 return fmt.Errorf("no audio segments") 58 } 59 60 lastVideo := s.Video[len(s.Video)-1] 61 lastAudio := s.Audio[len(s.Audio)-1] 62 63 if lastVideo.pts == nil { 64 return fmt.Errorf("last video segment has no pts") 65 } 66 if lastAudio.pts == nil { 67 return fmt.Errorf("last audio segment has no pts") 68 } 69 70 videoEnd := lastVideo.pts.Nanoseconds() + lastVideo.dur.Nanoseconds() 71 audioEnd := lastAudio.pts.Nanoseconds() + lastAudio.dur.Nanoseconds() 72 73 diff := videoEnd - audioEnd 74 diffPerAudio := diff / int64(len(s.Audio)-1) 75 for i, audio := range s.Audio { 76 newPts := time.Duration(audio.pts.Nanoseconds() + (diffPerAudio * int64(i))) 77 audio.pts = &newPts 78 s.Audio[i] = audio 79 } 80 81 lastVideo = s.Video[len(s.Video)-1] 82 lastAudio = s.Audio[len(s.Audio)-1] 83 84 return nil 85} 86 87func ToBuffers(ctx context.Context, input io.Reader) (*SegmentData, error) { 88 89 ctx, cancel := context.WithCancel(ctx) 90 defer cancel() 91 ctx = log.WithLogValues(ctx, "func", "SplitAudioVideo") 92 93 pipelineSlice := []string{ 94 "appsrc name=mp4src ! qtdemux name=demux", 95 "demux.video_0 ! queue ! appsink sync=false name=videoappsink", 96 "demux.audio_0 ! queue ! opusparse name=audioparse ! appsink sync=false name=audioappsink", 97 } 98 99 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n")) 100 if err != nil { 101 return nil, fmt.Errorf("failed to create GStreamer pipeline: %w", err) 102 } 103 104 mp4src, err := pipeline.GetElementByName("mp4src") 105 if err != nil { 106 return nil, fmt.Errorf("failed to get mp4src element: %w", err) 107 } 108 src := app.SrcFromElement(mp4src) 109 if src == nil { 110 return nil, fmt.Errorf("failed to get mp4src element: %w", err) 111 } 112 src.SetCallbacks(&app.SourceCallbacks{ 113 NeedDataFunc: ReaderNeedData(ctx, input), 114 }) 115 116 seg := SegmentData{ 117 Audio: []SegmentBuffer{}, 118 Video: []SegmentBuffer{}, 119 } 120 121 audioSinkElem, err := pipeline.GetElementByName("audioappsink") 122 if err != nil { 123 return nil, fmt.Errorf("failed to get audioappsink element: %w", err) 124 } 125 audioSink := app.SinkFromElement(audioSinkElem) 126 if audioSink == nil { 127 return nil, fmt.Errorf("failed to get audioappsink element: %w", err) 128 } 129 130 audioSink.SetCallbacks(&app.SinkCallbacks{ 131 NewSampleFunc: func(sink *app.Sink) gst.FlowReturn { 132 sample := sink.PullSample() 133 if sample == nil { 134 return gst.FlowOK 135 } 136 137 // Retrieve the buffer from the sample. 138 buffer := sample.GetBuffer() 139 // log.Log(ctx, "audio buffer", "presentation_timestamp", buffer.PresentationTimestamp(), "duration", buffer.Duration(), "dts", buffer.DecodingTimestamp()) 140 bs := buffer.Map(gst.MapRead).Bytes() 141 defer buffer.Unmap() 142 sinkPads, err := sink.GetSinkPads() 143 if err != nil { 144 src.Error("could not get sink pads", err) 145 return gst.FlowError 146 } 147 caps := sinkPads[0].GetCurrentCaps() 148 if caps != nil { 149 seg.AudioCaps = caps.String() 150 } 151 152 seg.Audio = append(seg.Audio, SegmentBuffer{ 153 bytes: bs, 154 pts: buffer.PresentationTimestamp().AsDuration(), 155 dur: buffer.Duration().AsDuration(), 156 }) 157 158 return gst.FlowOK 159 }, 160 }) 161 162 videoSinkElem, err := pipeline.GetElementByName("videoappsink") 163 if err != nil { 164 return nil, fmt.Errorf("failed to get videoappsink element: %w", err) 165 } 166 videoSink := app.SinkFromElement(videoSinkElem) 167 if videoSink == nil { 168 return nil, fmt.Errorf("failed to get videoappsink element: %w", err) 169 } 170 videoSink.SetCallbacks(&app.SinkCallbacks{ 171 NewSampleFunc: func(sink *app.Sink) gst.FlowReturn { 172 sample := sink.PullSample() 173 if sample == nil { 174 return gst.FlowOK 175 } 176 177 // Retrieve the buffer from the sample. 178 buffer := sample.GetBuffer() 179 // log.Log(ctx, "video buffer", "presentation_timestamp", buffer.PresentationTimestamp(), "duration", buffer.Duration()) 180 bs := buffer.Map(gst.MapRead).Bytes() 181 defer buffer.Unmap() 182 sinkPads, err := sink.GetSinkPads() 183 if err != nil { 184 src.Error("could not get sink pads", err) 185 return gst.FlowError 186 } 187 caps := sinkPads[0].GetCurrentCaps() 188 if caps != nil { 189 seg.VideoCaps = caps.String() 190 } 191 192 sb := SegmentBuffer{ 193 bytes: bs, 194 pts: buffer.PresentationTimestamp().AsDuration(), 195 dur: buffer.Duration().AsDuration(), 196 } 197 198 // log.Log(ctx, "video buffer", "presentation_timestamp", sb.pts, "duration", sb.dur) 199 if sb.pts == nil { 200 sink.Error("no video pts", fmt.Errorf("no video pts")) 201 return gst.FlowError 202 } 203 204 seg.Video = append(seg.Video, sb) 205 206 return gst.FlowOK 207 }, 208 }) 209 errCh := make(chan error) 210 go func() { 211 err := HandleBusMessages(ctx, pipeline) 212 errCh <- err 213 }() 214 215 defer func() { 216 if err != nil { 217 log.Error(ctx, "bus handler error", "error", err) 218 } 219 err = pipeline.SetState(gst.StateNull) 220 if err != nil { 221 log.Error(ctx, "failed to set pipeline to null state", "error", err) 222 } 223 }() 224 225 if err := pipeline.SetState(gst.StatePlaying); err != nil { 226 return nil, fmt.Errorf("failed to set pipeline state: %w", err) 227 } 228 229 pipelineErr := <-errCh 230 231 if pipelineErr != nil { 232 return nil, fmt.Errorf("pipeline error: %w", pipelineErr) 233 } 234 235 if len(seg.Video) == 0 { 236 return nil, fmt.Errorf("no video segments when rewriting audio") 237 } 238 if len(seg.Audio) == 0 { 239 return nil, fmt.Errorf("no audio segments when rewriting audio") 240 } 241 242 return &seg, nil 243} 244 245func JoinAudioVideo(ctx context.Context, seg *SegmentData, output io.Writer) error { 246 uu, _ := uuid.NewV7() 247 ctx = log.WithLogValues(ctx, "func", "JoinAudioVideo", "uuid", uu.String()) 248 249 pipelineSlice := []string{ 250 "mp4mux name=mux ! appsink sync=false name=mp4sink", 251 "appsrc name=videosrc format=time ! queue ! mux.video_0", 252 "appsrc name=audiosrc format=time ! queue ! mux.audio_0", 253 } 254 255 ctx, cancel := context.WithCancel(ctx) 256 defer cancel() 257 258 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n")) 259 if err != nil { 260 return fmt.Errorf("failed to create GStreamer pipeline: %w", err) 261 } 262 263 errCh := make(chan error) 264 go func() { 265 err := HandleBusMessages(ctx, pipeline) 266 errCh <- err 267 }() 268 269 videoSrcElem, err := pipeline.GetElementByName("videosrc") 270 if err != nil { 271 return fmt.Errorf("failed to get videosrc element: %w", err) 272 } 273 videoSrc := app.SrcFromElement(videoSrcElem) 274 if videoSrc == nil { 275 return fmt.Errorf("failed to get videosrc element: %w", err) 276 } 277 videoSrc.SetCaps(gst.NewCapsFromString(seg.VideoCaps)) 278 for _, seg := range seg.Video { 279 buf := gst.NewBufferFromBytes(seg.bytes) 280 if seg.pts != nil { 281 buf.SetPresentationTimestamp(gst.ClockTime(uint64(seg.pts.Nanoseconds()))) 282 } else { 283 videoSrc.Error("no video pts", fmt.Errorf("no video pts")) 284 return fmt.Errorf("no video pts") 285 } 286 if seg.dur != nil { 287 buf.SetDuration(gst.ClockTime(uint64(seg.dur.Nanoseconds()))) 288 } 289 ret := videoSrc.PushBuffer(buf) 290 if ret != gst.FlowOK { 291 return fmt.Errorf("failed to push video buffer: %s", ret) 292 } 293 } 294 295 audioSrcElem, err := pipeline.GetElementByName("audiosrc") 296 if err != nil { 297 return fmt.Errorf("failed to get audiosrc element: %w", err) 298 } 299 audioSrc := app.SrcFromElement(audioSrcElem) 300 if audioSrc == nil { 301 return fmt.Errorf("failed to get audiosrc element: %w", err) 302 } 303 audioSrc.SetCaps(gst.NewCapsFromString(seg.AudioCaps)) 304 for _, seg := range seg.Audio { 305 buf := gst.NewBufferFromBytes(seg.bytes) 306 if seg.pts != nil { 307 buf.SetPresentationTimestamp(gst.ClockTime(uint64(seg.pts.Nanoseconds()))) 308 } 309 if seg.dur != nil { 310 buf.SetDuration(gst.ClockTime(uint64(seg.dur.Nanoseconds()))) 311 } 312 ret := audioSrc.PushBuffer(buf) 313 if ret != gst.FlowOK { 314 return fmt.Errorf("failed to push audio buffer: %s", ret) 315 } 316 } 317 318 videoSrc.EndStream() 319 audioSrc.EndStream() 320 mp4sinkElem, err := pipeline.GetElementByName("mp4sink") 321 if err != nil { 322 return fmt.Errorf("failed to get mp4sink element: %w", err) 323 } 324 mp4sink := app.SinkFromElement(mp4sinkElem) 325 if mp4sink == nil { 326 return fmt.Errorf("failed to get mp4sink element: %w", err) 327 } 328 mp4sink.SetCallbacks(&app.SinkCallbacks{ 329 NewSampleFunc: WriterNewSample(ctx, output), 330 }) 331 332 defer func() { 333 err = pipeline.SetState(gst.StateNull) 334 if err != nil { 335 log.Error(ctx, "failed to set pipeline to null state", "error", err) 336 } 337 }() 338 339 if err := pipeline.SetState(gst.StatePlaying); err != nil { 340 return fmt.Errorf("failed to set pipeline state: %w", err) 341 } 342 343 return <-errCh 344}