Live video on the AT Protocol
1package media
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "io"
8 "os"
9 "strings"
10 "time"
11
12 "github.com/go-gst/go-gst/gst"
13 "github.com/go-gst/go-gst/gst/app"
14 "github.com/google/uuid"
15 "stream.place/streamplace/pkg/log"
16)
17
18type SegmentBuffer struct {
19 bytes []byte
20 pts *time.Duration
21 dur *time.Duration
22}
23
24type SegmentData struct {
25 Audio []SegmentBuffer
26 AudioCaps string
27 Video []SegmentBuffer
28 VideoCaps string
29}
30
31func SmearAudioTimestamps(ctx context.Context, input io.Reader, output io.Writer) error {
32 bs, err := io.ReadAll(input)
33 if err != nil {
34 return err
35 }
36 seg, err := ToBuffers(ctx, bytes.NewReader(bs))
37 if err != nil {
38 // Write the input bytes to a file for debugging
39 debugFile := fmt.Sprintf("audio_smear_debug_%s.mp4", uuid.New().String())
40 err = os.WriteFile(debugFile, bs, 0644)
41 if err != nil {
42 log.Log(ctx, "failed to write debug file", "error", err, "path", debugFile)
43 } else {
44 log.Log(ctx, "wrote debug file", "path", debugFile)
45 }
46 return err
47 }
48
49 err = seg.Normalize(ctx)
50 if err != nil {
51 return err
52 }
53
54 return JoinAudioVideo(ctx, seg, output)
55}
56
57func (s *SegmentData) Normalize(ctx context.Context) error {
58 if len(s.Video) == 0 {
59 return fmt.Errorf("no video segments")
60 }
61 if len(s.Audio) == 0 {
62 return fmt.Errorf("no audio segments")
63 }
64
65 lastVideo := s.Video[len(s.Video)-1]
66 lastAudio := s.Audio[len(s.Audio)-1]
67
68 if lastVideo.pts == nil {
69 return fmt.Errorf("last video segment has no pts")
70 }
71 if lastAudio.pts == nil {
72 return fmt.Errorf("last audio segment has no pts")
73 }
74
75 videoEnd := lastVideo.pts.Nanoseconds() + lastVideo.dur.Nanoseconds()
76 audioEnd := lastAudio.pts.Nanoseconds() + lastAudio.dur.Nanoseconds()
77
78 diff := videoEnd - audioEnd
79 diffPerAudio := diff / int64(len(s.Audio)-1)
80 for i, audio := range s.Audio {
81 newPts := time.Duration(audio.pts.Nanoseconds() + (diffPerAudio * int64(i)))
82 audio.pts = &newPts
83 s.Audio[i] = audio
84 }
85
86 lastVideo = s.Video[len(s.Video)-1]
87 lastAudio = s.Audio[len(s.Audio)-1]
88
89 return nil
90}
91
92func ToBuffers(ctx context.Context, input io.Reader) (*SegmentData, error) {
93 ctx = log.WithLogValues(ctx, "func", "SplitAudioVideo")
94
95 pipelineSlice := []string{
96 "appsrc name=mp4src ! qtdemux name=demux",
97 "demux.video_0 ! queue ! h264parse name=videoparse disable-passthrough=true config-interval=-1 ! appsink sync=false name=videoappsink",
98 "demux.audio_0 ! queue ! opusparse name=audioparse ! appsink sync=false name=audioappsink",
99 }
100
101 ctx, cancel := context.WithCancel(ctx)
102 defer cancel()
103
104 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n"))
105 if err != nil {
106 return nil, fmt.Errorf("failed to create GStreamer pipeline: %w", err)
107 }
108
109 errCh := make(chan error)
110 go func() {
111 err := HandleBusMessages(ctx, pipeline)
112 cancel()
113 errCh <- err
114 close(errCh)
115 }()
116
117 defer func() {
118 err := <-errCh
119 if err != nil {
120 log.Error(ctx, "bus handler error", "error", err)
121 }
122 err = pipeline.BlockSetState(gst.StateNull)
123 if err != nil {
124 log.Error(ctx, "failed to set pipeline to null state", "error", err)
125 }
126 }()
127
128 mp4src, err := pipeline.GetElementByName("mp4src")
129 if err != nil {
130 return nil, fmt.Errorf("failed to get mp4src element: %w", err)
131 }
132 src := app.SrcFromElement(mp4src)
133 if src == nil {
134 return nil, fmt.Errorf("failed to get mp4src element: %w", err)
135 }
136 src.SetCallbacks(&app.SourceCallbacks{
137 NeedDataFunc: ReaderNeedData(ctx, input),
138 })
139
140 seg := SegmentData{
141 Audio: []SegmentBuffer{},
142 Video: []SegmentBuffer{},
143 }
144
145 audioSinkElem, err := pipeline.GetElementByName("audioappsink")
146 if err != nil {
147 return nil, fmt.Errorf("failed to get audioappsink element: %w", err)
148 }
149 audioSink := app.SinkFromElement(audioSinkElem)
150 if audioSink == nil {
151 return nil, fmt.Errorf("failed to get audioappsink element: %w", err)
152 }
153
154 audioSink.SetCallbacks(&app.SinkCallbacks{
155 NewSampleFunc: func(sink *app.Sink) gst.FlowReturn {
156 sample := sink.PullSample()
157 if sample == nil {
158 return gst.FlowOK
159 }
160
161 // Retrieve the buffer from the sample.
162 buffer := sample.GetBuffer()
163 // log.Log(ctx, "audio buffer", "presentation_timestamp", buffer.PresentationTimestamp(), "duration", buffer.Duration(), "dts", buffer.DecodingTimestamp())
164 bs := buffer.Map(gst.MapRead).Bytes()
165 defer buffer.Unmap()
166 sinkPads, err := sink.GetSinkPads()
167 if err != nil {
168 src.Error("could not get sink pads", err)
169 return gst.FlowError
170 }
171 caps := sinkPads[0].GetCurrentCaps()
172 if caps != nil {
173 seg.AudioCaps = caps.String()
174 }
175
176 seg.Audio = append(seg.Audio, SegmentBuffer{
177 bytes: bs,
178 pts: buffer.PresentationTimestamp().AsDuration(),
179 dur: buffer.Duration().AsDuration(),
180 })
181
182 return gst.FlowOK
183 },
184 })
185
186 videoSinkElem, err := pipeline.GetElementByName("videoappsink")
187 if err != nil {
188 return nil, fmt.Errorf("failed to get videoappsink element: %w", err)
189 }
190 videoSink := app.SinkFromElement(videoSinkElem)
191 if videoSink == nil {
192 return nil, fmt.Errorf("failed to get videoappsink element: %w", err)
193 }
194 videoSink.SetCallbacks(&app.SinkCallbacks{
195 NewSampleFunc: func(sink *app.Sink) gst.FlowReturn {
196 sample := sink.PullSample()
197 if sample == nil {
198 return gst.FlowOK
199 }
200
201 // Retrieve the buffer from the sample.
202 buffer := sample.GetBuffer()
203 // log.Log(ctx, "video buffer", "presentation_timestamp", buffer.PresentationTimestamp(), "duration", buffer.Duration())
204 bs := buffer.Map(gst.MapRead).Bytes()
205 defer buffer.Unmap()
206 sinkPads, err := sink.GetSinkPads()
207 if err != nil {
208 src.Error("could not get sink pads", err)
209 return gst.FlowError
210 }
211 caps := sinkPads[0].GetCurrentCaps()
212 if caps != nil {
213 seg.VideoCaps = caps.String()
214 }
215
216 sb := SegmentBuffer{
217 bytes: bs,
218 pts: buffer.PresentationTimestamp().AsDuration(),
219 dur: buffer.Duration().AsDuration(),
220 }
221
222 // log.Log(ctx, "video buffer", "presentation_timestamp", sb.pts, "duration", sb.dur)
223 if sb.pts == nil {
224 sink.Error("no video pts", fmt.Errorf("no video pts"))
225 return gst.FlowError
226 }
227
228 seg.Video = append(seg.Video, sb)
229
230 return gst.FlowOK
231 },
232 })
233
234 if err := pipeline.SetState(gst.StatePlaying); err != nil {
235 return nil, fmt.Errorf("failed to set pipeline state: %w", err)
236 }
237
238 <-ctx.Done()
239
240 return &seg, <-errCh
241}
242
243func JoinAudioVideo(ctx context.Context, seg *SegmentData, output io.Writer) error {
244 uu, _ := uuid.NewV7()
245 ctx = log.WithLogValues(ctx, "func", "JoinAudioVideo", "uuid", uu.String())
246
247 pipelineSlice := []string{
248 "mp4mux name=mux ! appsink sync=false name=mp4sink",
249 "appsrc name=videosrc format=time ! queue ! mux.video_0",
250 "appsrc name=audiosrc format=time ! queue ! mux.audio_0",
251 }
252
253 ctx, cancel := context.WithCancel(ctx)
254 defer cancel()
255
256 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n"))
257 if err != nil {
258 return fmt.Errorf("failed to create GStreamer pipeline: %w", err)
259 }
260
261 errCh := make(chan error)
262 go func() {
263 err := HandleBusMessages(ctx, pipeline)
264 cancel()
265 errCh <- err
266 close(errCh)
267 }()
268
269 defer func() {
270 err := <-errCh
271 if err != nil {
272 log.Error(ctx, "bus handler error", "error", err)
273 }
274 err = pipeline.BlockSetState(gst.StateNull)
275 if err != nil {
276 log.Error(ctx, "failed to set pipeline to null state", "error", err)
277 }
278 }()
279
280 videoSrcElem, err := pipeline.GetElementByName("videosrc")
281 if err != nil {
282 return fmt.Errorf("failed to get videosrc element: %w", err)
283 }
284 videoSrc := app.SrcFromElement(videoSrcElem)
285 if videoSrc == nil {
286 return fmt.Errorf("failed to get videosrc element: %w", err)
287 }
288 videoSrc.SetCaps(gst.NewCapsFromString(seg.VideoCaps))
289 for _, seg := range seg.Video {
290 buf := gst.NewBufferFromBytes(seg.bytes)
291 if seg.pts != nil {
292 buf.SetPresentationTimestamp(gst.ClockTime(uint64(seg.pts.Nanoseconds())))
293 } else {
294 videoSrc.Error("no video pts", fmt.Errorf("no video pts"))
295 return fmt.Errorf("no video pts")
296 }
297 if seg.dur != nil {
298 buf.SetDuration(gst.ClockTime(uint64(seg.dur.Nanoseconds())))
299 }
300 ret := videoSrc.PushBuffer(buf)
301 if ret != gst.FlowOK {
302 return fmt.Errorf("failed to push video buffer: %s", ret)
303 }
304 }
305
306 audioSrcElem, err := pipeline.GetElementByName("audiosrc")
307 if err != nil {
308 return fmt.Errorf("failed to get audiosrc element: %w", err)
309 }
310 audioSrc := app.SrcFromElement(audioSrcElem)
311 if audioSrc == nil {
312 return fmt.Errorf("failed to get audiosrc element: %w", err)
313 }
314 audioSrc.SetCaps(gst.NewCapsFromString(seg.AudioCaps))
315 for _, seg := range seg.Audio {
316 buf := gst.NewBufferFromBytes(seg.bytes)
317 if seg.pts != nil {
318 buf.SetPresentationTimestamp(gst.ClockTime(uint64(seg.pts.Nanoseconds())))
319 }
320 if seg.dur != nil {
321 buf.SetDuration(gst.ClockTime(uint64(seg.dur.Nanoseconds())))
322 }
323 ret := audioSrc.PushBuffer(buf)
324 if ret != gst.FlowOK {
325 return fmt.Errorf("failed to push audio buffer: %s", ret)
326 }
327 }
328
329 videoSrc.EndStream()
330 audioSrc.EndStream()
331 mp4sinkElem, err := pipeline.GetElementByName("mp4sink")
332 if err != nil {
333 return fmt.Errorf("failed to get mp4sink element: %w", err)
334 }
335 mp4sink := app.SinkFromElement(mp4sinkElem)
336 if mp4sink == nil {
337 return fmt.Errorf("failed to get mp4sink element: %w", err)
338 }
339 mp4sink.SetCallbacks(&app.SinkCallbacks{
340 NewSampleFunc: WriterNewSample(ctx, output),
341 })
342
343 if err := pipeline.SetState(gst.StatePlaying); err != nil {
344 return fmt.Errorf("failed to set pipeline state: %w", err)
345 }
346
347 <-ctx.Done()
348
349 return <-errCh
350}