Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at eli/feed-error-handling 357 lines 9.7 kB view raw
1package media 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "io" 8 "os" 9 "strings" 10 "time" 11 12 "github.com/go-gst/go-gst/gst" 13 "github.com/go-gst/go-gst/gst/app" 14 "github.com/google/uuid" 15 "stream.place/streamplace/pkg/log" 16) 17 18type SegmentBuffer struct { 19 bytes []byte 20 pts *time.Duration 21 dur *time.Duration 22} 23 24type SegmentData struct { 25 Audio []SegmentBuffer 26 AudioCaps string 27 Video []SegmentBuffer 28 VideoCaps string 29} 30 31func SmearAudioTimestamps(ctx context.Context, input io.Reader, output io.Writer) error { 32 bs, err := io.ReadAll(input) 33 if err != nil { 34 return err 35 } 36 seg, err := ToBuffers(ctx, bytes.NewReader(bs)) 37 if err != nil { 38 // Write the input bytes to a file for debugging 39 debugFile := fmt.Sprintf("audio_smear_debug_%s.mp4", uuid.New().String()) 40 err = os.WriteFile(debugFile, bs, 0644) 41 if err != nil { 42 log.Log(ctx, "failed to write debug file", "error", err, "path", debugFile) 43 } else { 44 log.Log(ctx, "wrote debug file", "path", debugFile) 45 } 46 return err 47 } 48 49 err = seg.Normalize(ctx) 50 if err != nil { 51 return err 52 } 53 54 return JoinAudioVideo(ctx, seg, output) 55} 56 57func (s *SegmentData) Normalize(ctx context.Context) error { 58 if len(s.Video) == 0 { 59 return fmt.Errorf("no video segments") 60 } 61 if len(s.Audio) == 0 { 62 return fmt.Errorf("no audio segments") 63 } 64 65 lastVideo := s.Video[len(s.Video)-1] 66 lastAudio := s.Audio[len(s.Audio)-1] 67 68 if lastVideo.pts == nil { 69 return fmt.Errorf("last video segment has no pts") 70 } 71 if lastAudio.pts == nil { 72 return fmt.Errorf("last audio segment has no pts") 73 } 74 75 videoEnd := lastVideo.pts.Nanoseconds() + lastVideo.dur.Nanoseconds() 76 audioEnd := lastAudio.pts.Nanoseconds() + lastAudio.dur.Nanoseconds() 77 78 diff := videoEnd - audioEnd 79 diffPerAudio := diff / int64(len(s.Audio)-1) 80 for i, audio := range s.Audio { 81 newPts := time.Duration(audio.pts.Nanoseconds() + (diffPerAudio * int64(i))) 82 audio.pts = &newPts 83 s.Audio[i] = audio 84 } 85 86 lastVideo = s.Video[len(s.Video)-1] 87 lastAudio = s.Audio[len(s.Audio)-1] 88 videoEnd = lastVideo.pts.Nanoseconds() + lastVideo.dur.Nanoseconds() 89 audioEnd = lastAudio.pts.Nanoseconds() + lastAudio.dur.Nanoseconds() 90 91 return nil 92} 93 94func ToBuffers(ctx context.Context, input io.Reader) (*SegmentData, error) { 95 ctx = log.WithLogValues(ctx, "func", "SplitAudioVideo") 96 97 pipelineSlice := []string{ 98 "appsrc name=mp4src ! qtdemux name=demux", 99 "demux.video_0 ! queue ! h264parse name=videoparse disable-passthrough=true config-interval=-1 ! appsink sync=false name=videoappsink", 100 "demux.audio_0 ! queue ! opusparse name=audioparse ! appsink sync=false name=audioappsink", 101 } 102 103 ctx, cancel := context.WithCancel(ctx) 104 105 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n")) 106 if err != nil { 107 return nil, fmt.Errorf("failed to create GStreamer pipeline: %w", err) 108 } 109 110 errCh := make(chan error) 111 go func() { 112 err := HandleBusMessages(ctx, pipeline) 113 cancel() 114 errCh <- err 115 close(errCh) 116 }() 117 118 defer func() { 119 cancel() 120 err := <-errCh 121 if err != nil { 122 log.Error(ctx, "bus handler error", "error", err) 123 } 124 err = pipeline.BlockSetState(gst.StateNull) 125 if err != nil { 126 log.Error(ctx, "failed to set pipeline to null state", "error", err) 127 } 128 }() 129 130 mp4src, err := pipeline.GetElementByName("mp4src") 131 if err != nil { 132 return nil, fmt.Errorf("failed to get mp4src element: %w", err) 133 } 134 src := app.SrcFromElement(mp4src) 135 if src == nil { 136 return nil, fmt.Errorf("failed to get mp4src element: %w", err) 137 } 138 src.SetCallbacks(&app.SourceCallbacks{ 139 NeedDataFunc: ReaderNeedData(ctx, input), 140 }) 141 142 audioSinkElem, err := pipeline.GetElementByName("audioappsink") 143 if err != nil { 144 return nil, fmt.Errorf("failed to get audioappsink element: %w", err) 145 } 146 audioSink := app.SinkFromElement(audioSinkElem) 147 if audioSink == nil { 148 return nil, fmt.Errorf("failed to get audioappsink element: %w", err) 149 } 150 151 seg := SegmentData{ 152 Audio: []SegmentBuffer{}, 153 Video: []SegmentBuffer{}, 154 } 155 156 audioSink.SetCallbacks(&app.SinkCallbacks{ 157 NewSampleFunc: func(sink *app.Sink) gst.FlowReturn { 158 sample := sink.PullSample() 159 if sample == nil { 160 return gst.FlowOK 161 } 162 163 // Retrieve the buffer from the sample. 164 buffer := sample.GetBuffer() 165 // log.Log(ctx, "audio buffer", "presentation_timestamp", buffer.PresentationTimestamp(), "duration", buffer.Duration(), "dts", buffer.DecodingTimestamp()) 166 bs := buffer.Map(gst.MapRead).Bytes() 167 defer buffer.Unmap() 168 sinkPads, err := sink.GetSinkPads() 169 if err != nil { 170 src.Error("could not get sink pads", err) 171 return gst.FlowError 172 } 173 caps := sinkPads[0].GetCurrentCaps() 174 if caps != nil { 175 seg.AudioCaps = caps.String() 176 } 177 178 seg.Audio = append(seg.Audio, SegmentBuffer{ 179 bytes: bs, 180 pts: buffer.PresentationTimestamp().AsDuration(), 181 dur: buffer.Duration().AsDuration(), 182 }) 183 184 if err != nil { 185 src.Error("could not get sink pads", err) 186 return gst.FlowError 187 } 188 189 return gst.FlowOK 190 }, 191 }) 192 193 videoSinkElem, err := pipeline.GetElementByName("videoappsink") 194 if err != nil { 195 return nil, fmt.Errorf("failed to get videoappsink element: %w", err) 196 } 197 videoSink := app.SinkFromElement(videoSinkElem) 198 if videoSink == nil { 199 return nil, fmt.Errorf("failed to get videoappsink element: %w", err) 200 } 201 videoSink.SetCallbacks(&app.SinkCallbacks{ 202 NewSampleFunc: func(sink *app.Sink) gst.FlowReturn { 203 sample := sink.PullSample() 204 if sample == nil { 205 return gst.FlowOK 206 } 207 208 // Retrieve the buffer from the sample. 209 buffer := sample.GetBuffer() 210 // log.Log(ctx, "video buffer", "presentation_timestamp", buffer.PresentationTimestamp(), "duration", buffer.Duration()) 211 bs := buffer.Map(gst.MapRead).Bytes() 212 defer buffer.Unmap() 213 sinkPads, err := sink.GetSinkPads() 214 if err != nil { 215 src.Error("could not get sink pads", err) 216 return gst.FlowError 217 } 218 caps := sinkPads[0].GetCurrentCaps() 219 if caps != nil { 220 seg.VideoCaps = caps.String() 221 } 222 223 sb := SegmentBuffer{ 224 bytes: bs, 225 pts: buffer.PresentationTimestamp().AsDuration(), 226 dur: buffer.Duration().AsDuration(), 227 } 228 229 // log.Log(ctx, "video buffer", "presentation_timestamp", sb.pts, "duration", sb.dur) 230 if sb.pts == nil { 231 sink.Error("no video pts", fmt.Errorf("no video pts")) 232 return gst.FlowError 233 } 234 235 seg.Video = append(seg.Video, sb) 236 237 return gst.FlowOK 238 }, 239 }) 240 241 pipeline.SetState(gst.StatePlaying) 242 243 <-ctx.Done() 244 245 return &seg, <-errCh 246} 247 248func JoinAudioVideo(ctx context.Context, seg *SegmentData, output io.Writer) error { 249 uu, _ := uuid.NewV7() 250 ctx = log.WithLogValues(ctx, "func", "JoinAudioVideo", "uuid", uu.String()) 251 252 pipelineSlice := []string{ 253 "mp4mux name=mux ! appsink sync=false name=mp4sink", 254 "appsrc name=videosrc format=time ! queue ! mux.video_0", 255 "appsrc name=audiosrc format=time ! queue ! mux.audio_0", 256 } 257 258 ctx, cancel := context.WithCancel(ctx) 259 260 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n")) 261 if err != nil { 262 return fmt.Errorf("failed to create GStreamer pipeline: %w", err) 263 } 264 265 errCh := make(chan error) 266 go func() { 267 err := HandleBusMessages(ctx, pipeline) 268 cancel() 269 errCh <- err 270 close(errCh) 271 }() 272 273 defer func() { 274 cancel() 275 err := <-errCh 276 if err != nil { 277 log.Error(ctx, "bus handler error", "error", err) 278 } 279 err = pipeline.BlockSetState(gst.StateNull) 280 if err != nil { 281 log.Error(ctx, "failed to set pipeline to null state", "error", err) 282 } 283 }() 284 285 videoSrcElem, err := pipeline.GetElementByName("videosrc") 286 if err != nil { 287 return fmt.Errorf("failed to get videosrc element: %w", err) 288 } 289 videoSrc := app.SrcFromElement(videoSrcElem) 290 if videoSrc == nil { 291 return fmt.Errorf("failed to get videosrc element: %w", err) 292 } 293 videoSrc.SetCaps(gst.NewCapsFromString(seg.VideoCaps)) 294 for _, seg := range seg.Video { 295 buf := gst.NewBufferFromBytes(seg.bytes) 296 if seg.pts != nil { 297 buf.SetPresentationTimestamp(gst.ClockTime(uint64(seg.pts.Nanoseconds()))) 298 } else { 299 videoSrc.Error("no video pts", fmt.Errorf("no video pts")) 300 return fmt.Errorf("no video pts") 301 } 302 if seg.dur != nil { 303 buf.SetDuration(gst.ClockTime(uint64(seg.dur.Nanoseconds()))) 304 } 305 ret := videoSrc.PushBuffer(buf) 306 if ret != gst.FlowOK { 307 return fmt.Errorf("failed to push video buffer: %s", ret) 308 } else { 309 // log.Log(ctx, "pushed video buffer", "presentation_timestamp", seg.pts, "duration", seg.dur) 310 } 311 } 312 313 audioSrcElem, err := pipeline.GetElementByName("audiosrc") 314 if err != nil { 315 return fmt.Errorf("failed to get audiosrc element: %w", err) 316 } 317 audioSrc := app.SrcFromElement(audioSrcElem) 318 if audioSrc == nil { 319 return fmt.Errorf("failed to get audiosrc element: %w", err) 320 } 321 audioSrc.SetCaps(gst.NewCapsFromString(seg.AudioCaps)) 322 for _, seg := range seg.Audio { 323 buf := gst.NewBufferFromBytes(seg.bytes) 324 if seg.pts != nil { 325 buf.SetPresentationTimestamp(gst.ClockTime(uint64(seg.pts.Nanoseconds()))) 326 } 327 if seg.dur != nil { 328 buf.SetDuration(gst.ClockTime(uint64(seg.dur.Nanoseconds()))) 329 } 330 ret := audioSrc.PushBuffer(buf) 331 if ret != gst.FlowOK { 332 return fmt.Errorf("failed to push audio buffer: %s", ret) 333 } else { 334 // log.Log(ctx, "pushed audio buffer", "presentation_timestamp", seg.pts, "duration", seg.dur) 335 } 336 } 337 338 videoSrc.EndStream() 339 audioSrc.EndStream() 340 mp4sinkElem, err := pipeline.GetElementByName("mp4sink") 341 if err != nil { 342 return fmt.Errorf("failed to get mp4sink element: %w", err) 343 } 344 mp4sink := app.SinkFromElement(mp4sinkElem) 345 if mp4sink == nil { 346 return fmt.Errorf("failed to get mp4sink element: %w", err) 347 } 348 mp4sink.SetCallbacks(&app.SinkCallbacks{ 349 NewSampleFunc: WriterNewSample(ctx, output), 350 }) 351 352 pipeline.SetState(gst.StatePlaying) 353 354 <-ctx.Done() 355 356 return <-errCh 357}